ceph_object_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. /*
  2. Copyright 2021 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package integration
  14. import (
  15. "context"
  16. "encoding/json"
  17. "testing"
  18. "time"
  19. cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
  20. "github.com/rook/rook/pkg/daemon/ceph/client"
  21. rgw "github.com/rook/rook/pkg/operator/ceph/object"
  22. "github.com/rook/rook/tests/framework/clients"
  23. "github.com/rook/rook/tests/framework/installer"
  24. "github.com/rook/rook/tests/framework/utils"
  25. "github.com/stretchr/testify/assert"
  26. "github.com/stretchr/testify/suite"
  27. v1 "k8s.io/api/core/v1"
  28. "k8s.io/apimachinery/pkg/api/errors"
  29. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  30. )
  31. const (
  32. objectStoreServicePrefixUniq = "rook-ceph-rgw-"
  33. objectStoreTLSName = "tls-test-store"
  34. )
  35. var (
  36. objectStoreServicePrefix = "rook-ceph-rgw-"
  37. )
  38. func TestCephObjectSuite(t *testing.T) {
  39. s := new(ObjectSuite)
  40. defer func(s *ObjectSuite) {
  41. HandlePanics(recover(), s.TearDownSuite, s.T)
  42. }(s)
  43. suite.Run(t, s)
  44. }
  45. type ObjectSuite struct {
  46. suite.Suite
  47. helper *clients.TestClient
  48. settings *installer.TestCephSettings
  49. installer *installer.CephInstaller
  50. k8sh *utils.K8sHelper
  51. }
  52. func (s *ObjectSuite) SetupSuite() {
  53. namespace := "object-ns"
  54. s.settings = &installer.TestCephSettings{
  55. ClusterName: "object-cluster",
  56. Namespace: namespace,
  57. OperatorNamespace: installer.SystemNamespace(namespace),
  58. StorageClassName: installer.StorageClassName(),
  59. UseHelm: false,
  60. UsePVC: installer.UsePVC(),
  61. Mons: 3,
  62. SkipOSDCreation: false,
  63. UseCrashPruner: true,
  64. EnableVolumeReplication: false,
  65. RookVersion: installer.LocalBuildTag,
  66. CephVersion: installer.ReturnCephVersion(),
  67. }
  68. s.settings.ApplyEnvVars()
  69. s.installer, s.k8sh = StartTestCluster(s.T, s.settings)
  70. s.helper = clients.CreateTestClient(s.k8sh, s.installer.Manifests)
  71. }
  72. func (s *ObjectSuite) AfterTest(suiteName, testName string) {
  73. s.installer.CollectOperatorLog(suiteName, testName)
  74. }
  75. func (s *ObjectSuite) TearDownSuite() {
  76. s.installer.UninstallRook()
  77. }
  78. func (s *ObjectSuite) TestWithTLS() {
  79. if utils.IsPlatformOpenShift() {
  80. s.T().Skip("object store tests skipped on openshift")
  81. }
  82. tls := true
  83. objectStoreServicePrefix = objectStoreServicePrefixUniq
  84. runObjectE2ETest(s.helper, s.k8sh, s.installer, &s.Suite, s.settings.Namespace, tls)
  85. err := s.k8sh.Clientset.CoreV1().Secrets(s.settings.Namespace).Delete(context.TODO(), objectTLSSecretName, metav1.DeleteOptions{})
  86. if err != nil {
  87. if !errors.IsNotFound(err) {
  88. logger.Fatal("failed to deleted store TLS secret")
  89. }
  90. }
  91. logger.Info("successfully deleted store TLS secret")
  92. }
  93. func (s *ObjectSuite) TestWithoutTLS() {
  94. if utils.IsPlatformOpenShift() {
  95. s.T().Skip("object store tests skipped on openshift")
  96. }
  97. tls := false
  98. objectStoreServicePrefix = objectStoreServicePrefixUniq
  99. runObjectE2ETest(s.helper, s.k8sh, s.installer, &s.Suite, s.settings.Namespace, tls)
  100. }
  101. // Smoke Test for ObjectStore - Test check the following operations on ObjectStore in order
  102. // Create object store, Create User, Connect to Object Store, Create Bucket, Read/Write/Delete to bucket,
  103. // Check issues in MGRs, Delete Bucket and Delete user
  104. // Test for ObjectStore with and without TLS enabled
  105. func runObjectE2ETest(helper *clients.TestClient, k8sh *utils.K8sHelper, installer *installer.CephInstaller, s *suite.Suite, namespace string, tlsEnable bool) {
  106. storeName := "test-store"
  107. if tlsEnable {
  108. storeName = objectStoreTLSName
  109. }
  110. logger.Infof("Running on Rook Cluster %s", namespace)
  111. createCephObjectStore(s.T(), helper, k8sh, installer, namespace, storeName, 3, tlsEnable)
  112. // test that a second object store can be created (and deleted) while the first exists
  113. s.T().Run("run a second object store", func(t *testing.T) {
  114. otherStoreName := "other-" + storeName
  115. // The lite e2e test is perfect, as it only creates a cluster, checks that it is healthy,
  116. // and then deletes it.
  117. deleteStore := true
  118. runObjectE2ETestLite(t, helper, k8sh, installer, namespace, otherStoreName, 1, deleteStore, tlsEnable)
  119. })
  120. // now test operation of the first object store
  121. testObjectStoreOperations(s, helper, k8sh, namespace, storeName)
  122. bucketNotificationTestStoreName := "bucket-notification-" + storeName
  123. createCephObjectStore(s.T(), helper, k8sh, installer, namespace, bucketNotificationTestStoreName, 1, tlsEnable)
  124. testBucketNotifications(s, helper, k8sh, namespace, bucketNotificationTestStoreName)
  125. if !tlsEnable {
  126. // TODO : need to fix COSI driver to support TLS
  127. logger.Info("Testing COSI driver")
  128. testCOSIDriver(s, helper, k8sh, installer, namespace)
  129. } else {
  130. logger.Info("Skipping COSI driver test as TLS is enabled")
  131. }
  132. }
  133. func testObjectStoreOperations(s *suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
  134. ctx := context.TODO()
  135. clusterInfo := client.AdminTestClusterInfo(namespace)
  136. t := s.T()
  137. logger.Infof("Testing Object Operations on %s", storeName)
  138. t.Run("create CephObjectStoreUser", func(t *testing.T) {
  139. createCephObjectUser(s, helper, k8sh, namespace, storeName, userid, true, true)
  140. i := 0
  141. for i = 0; i < 4; i++ {
  142. if helper.ObjectUserClient.UserSecretExists(namespace, storeName, userid) {
  143. break
  144. }
  145. logger.Info("waiting 5 more seconds for user secret to exist")
  146. time.Sleep(5 * time.Second)
  147. }
  148. assert.NotEqual(t, 4, i)
  149. })
  150. context := k8sh.MakeContext()
  151. objectStore, err := k8sh.RookClientset.CephV1().CephObjectStores(namespace).Get(ctx, storeName, metav1.GetOptions{})
  152. assert.Nil(t, err)
  153. rgwcontext, err := rgw.NewMultisiteContext(context, clusterInfo, objectStore)
  154. assert.Nil(t, err)
  155. t.Run("create ObjectBucketClaim", func(t *testing.T) {
  156. logger.Infof("create OBC %q with storageclass %q - using reclaim policy 'delete' so buckets don't block deletion", obcName, bucketStorageClassName)
  157. cobErr := helper.BucketClient.CreateBucketStorageClass(namespace, storeName, bucketStorageClassName, "Delete")
  158. assert.Nil(t, cobErr)
  159. cobcErr := helper.BucketClient.CreateObc(obcName, bucketStorageClassName, bucketname, maxObject, true)
  160. assert.Nil(t, cobcErr)
  161. created := utils.Retry(20, 2*time.Second, "OBC is created", func() bool {
  162. return helper.BucketClient.CheckOBC(obcName, "bound")
  163. })
  164. assert.True(t, created)
  165. logger.Info("OBC created successfully")
  166. var bkt rgw.ObjectBucket
  167. i := 0
  168. for i = 0; i < 4; i++ {
  169. b, code, err := rgw.GetBucket(rgwcontext, bucketname)
  170. if b != nil && err == nil {
  171. bkt = *b
  172. break
  173. }
  174. logger.Warningf("cannot get bucket %q, retrying... bucket: %v. code: %d, err: %v", bucketname, b, code, err)
  175. logger.Infof("(%d) check bucket exists, sleeping for 5 seconds ...", i)
  176. time.Sleep(5 * time.Second)
  177. }
  178. assert.NotEqual(t, 4, i)
  179. assert.Equal(t, bucketname, bkt.Name)
  180. logger.Info("OBC, Secret and ConfigMap created")
  181. })
  182. t.Run("S3 access to OBC bucket", func(t *testing.T) {
  183. var s3client *rgw.S3Agent
  184. s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
  185. s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
  186. s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
  187. if objectStore.Spec.IsTLSEnabled() {
  188. s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true)
  189. } else {
  190. s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true, nil)
  191. }
  192. assert.Nil(t, err)
  193. logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3endpoint, s3AccessKey, s3SecretKey)
  194. t.Run("put object", func(t *testing.T) {
  195. _, poErr := s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey1, contentType)
  196. assert.Nil(t, poErr)
  197. })
  198. t.Run("get object", func(t *testing.T) {
  199. read, err := s3client.GetObjectInBucket(bucketname, ObjectKey1)
  200. assert.Nil(t, err)
  201. assert.Equal(t, ObjBody, read)
  202. })
  203. t.Run("quota enforcement", func(t *testing.T) {
  204. _, poErr := s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey2, contentType)
  205. assert.Nil(t, poErr)
  206. logger.Infof("Testing the max object limit")
  207. _, poErr = s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey3, contentType)
  208. assert.Error(t, poErr)
  209. })
  210. t.Run("update quota limits", func(t *testing.T) {
  211. poErr := helper.BucketClient.UpdateObc(obcName, bucketStorageClassName, bucketname, newMaxObject, true)
  212. assert.Nil(t, poErr)
  213. updated := utils.Retry(20, 2*time.Second, "OBC is updated", func() bool {
  214. return helper.BucketClient.CheckOBMaxObject(obcName, newMaxObject)
  215. })
  216. assert.True(t, updated)
  217. logger.Infof("Testing the updated object limit")
  218. _, poErr = s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey3, contentType)
  219. assert.NoError(t, poErr)
  220. _, poErr = s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey4, contentType)
  221. assert.Error(t, poErr)
  222. })
  223. t.Run("delete objects", func(t *testing.T) {
  224. _, delobjErr := s3client.DeleteObjectInBucket(bucketname, ObjectKey1)
  225. assert.Nil(t, delobjErr)
  226. _, delobjErr = s3client.DeleteObjectInBucket(bucketname, ObjectKey2)
  227. assert.Nil(t, delobjErr)
  228. _, delobjErr = s3client.DeleteObjectInBucket(bucketname, ObjectKey3)
  229. assert.Nil(t, delobjErr)
  230. logger.Info("Objects deleted on bucket successfully")
  231. })
  232. })
  233. t.Run("Regression check: OBC does not revert to Pending phase", func(t *testing.T) {
  234. // A bug exists in older versions of lib-bucket-provisioner that will revert a bucket and claim
  235. // back to "Pending" phase after being created and initially "Bound" by looping infinitely in
  236. // the bucket provision/creation loop. Verify that the OBC is "Bound" and stays that way.
  237. // The OBC reconcile loop runs again immediately b/c the OBC is modified to refer to its OB.
  238. // Wait a short amount of time before checking just to be safe.
  239. created := utils.Retry(15, 2*time.Second, "OBC is created", func() bool {
  240. return helper.BucketClient.CheckOBC(obcName, "bound")
  241. })
  242. assert.True(t, created)
  243. })
  244. t.Run("delete CephObjectStore should be blocked by OBC bucket and CephObjectStoreUser", func(t *testing.T) {
  245. deleteObjectStore(t, k8sh, namespace, storeName)
  246. store := &cephv1.CephObjectStore{}
  247. i := 0
  248. for i = 0; i < 4; i++ {
  249. storeStr, err := k8sh.GetResource("-n", namespace, "CephObjectStore", storeName, "-o", "json")
  250. assert.NoError(t, err)
  251. err = json.Unmarshal([]byte(storeStr), &store)
  252. assert.NoError(t, err)
  253. cond := cephv1.FindStatusCondition(store.Status.Conditions, cephv1.ConditionDeletionIsBlocked)
  254. if cond != nil {
  255. break
  256. }
  257. logger.Info("waiting 2 more seconds for CephObjectStore to reach Deleting state")
  258. time.Sleep(2 * time.Second)
  259. }
  260. assert.NotEqual(t, 4, i)
  261. assert.Equal(t, cephv1.ConditionDeleting, store.Status.Phase) // phase == "Deleting"
  262. // verify deletion is blocked b/c object has dependents
  263. cond := cephv1.FindStatusCondition(store.Status.Conditions, cephv1.ConditionDeletionIsBlocked)
  264. logger.Infof("condition: %+v", cond)
  265. assert.Equal(t, v1.ConditionTrue, cond.Status)
  266. assert.Equal(t, cephv1.ObjectHasDependentsReason, cond.Reason)
  267. // the CephObjectStoreUser and the bucket should both block deletion
  268. assert.Contains(t, cond.Message, "CephObjectStoreUsers")
  269. assert.Contains(t, cond.Message, userid)
  270. assert.Contains(t, cond.Message, "buckets")
  271. assert.Contains(t, cond.Message, bucketname)
  272. // The event is created by the same method that adds that condition, so we can be pretty
  273. // sure it exists here. No need to do extra work to validate the event.
  274. })
  275. t.Run("delete OBC", func(t *testing.T) {
  276. i := 0
  277. dobcErr := helper.BucketClient.DeleteObc(obcName, bucketStorageClassName, bucketname, maxObject, true)
  278. assert.Nil(t, dobcErr)
  279. logger.Info("Checking to see if the obc, secret, and cm have all been deleted")
  280. for i = 0; i < 4 && !helper.BucketClient.CheckOBC(obcName, "deleted"); i++ {
  281. logger.Infof("(%d) obc deleted check, sleeping for 5 seconds ...", i)
  282. time.Sleep(5 * time.Second)
  283. }
  284. assert.NotEqual(t, 4, i)
  285. logger.Info("ensure OBC bucket was deleted")
  286. var rgwErr int
  287. for i = 0; i < 4; i++ {
  288. _, rgwErr, _ = rgw.GetBucket(rgwcontext, bucketname)
  289. if rgwErr == rgw.RGWErrorNotFound {
  290. break
  291. }
  292. logger.Infof("(%d) check bucket deleted, sleeping for 5 seconds ...", i)
  293. time.Sleep(5 * time.Second)
  294. }
  295. assert.NotEqual(t, 4, i)
  296. assert.Equal(t, rgwErr, rgw.RGWErrorNotFound)
  297. dobErr := helper.BucketClient.DeleteBucketStorageClass(namespace, storeName, bucketStorageClassName, "Delete")
  298. assert.Nil(t, dobErr)
  299. })
  300. t.Run("delete CephObjectStoreUser", func(t *testing.T) {
  301. dosuErr := helper.ObjectUserClient.Delete(namespace, userid)
  302. assert.Nil(t, dosuErr)
  303. logger.Info("Object store user deleted successfully")
  304. logger.Info("Checking to see if the user secret has been deleted")
  305. i := 0
  306. for i = 0; i < 4 && helper.ObjectUserClient.UserSecretExists(namespace, storeName, userid) == true; i++ {
  307. logger.Infof("(%d) secret check sleeping for 5 seconds ...", i)
  308. time.Sleep(5 * time.Second)
  309. }
  310. assert.False(t, helper.ObjectUserClient.UserSecretExists(namespace, storeName, userid))
  311. })
  312. t.Run("Regression check: mgrs are not in a crashloop", func(t *testing.T) {
  313. assert.True(t, k8sh.CheckPodCountAndState("rook-ceph-mgr", namespace, 1, "Running"))
  314. })
  315. // tests are complete, now delete the objectstore
  316. s.T().Run("CephObjectStore should delete now that dependents are gone", func(t *testing.T) {
  317. // wait initially since it will almost never detect on the first try without this.
  318. time.Sleep(3 * time.Second)
  319. assertObjectStoreDeletion(t, k8sh, namespace, storeName)
  320. })
  321. // TODO : Add case for brownfield/cleanup s3 client}
  322. }