ceph_bucket_notification_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. /*
  2. Copyright 2021 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package integration
  14. import (
  15. "context"
  16. "testing"
  17. "time"
  18. "github.com/rook/rook/pkg/daemon/ceph/client"
  19. rgw "github.com/rook/rook/pkg/operator/ceph/object"
  20. "github.com/rook/rook/tests/framework/clients"
  21. "github.com/rook/rook/tests/framework/utils"
  22. "github.com/stretchr/testify/assert"
  23. "github.com/stretchr/testify/suite"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. )
  26. const (
  27. httpServerName = "sample-http-server"
  28. httpServerNameSpace = "default"
  29. httpServerPort = "8080"
  30. httpEndpointService = "http://" + httpServerName + "." + httpServerNameSpace + ":" + httpServerPort
  31. putEvent = "ObjectCreated:Put"
  32. deleteEvent = "ObjectRemoved:Delete"
  33. )
  34. func testBucketNotifications(s *suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
  35. if utils.IsPlatformOpenShift() {
  36. s.T().Skip("bucket notification tests skipped on openshift")
  37. }
  38. bucketNotificationLabelPrefix := "bucket-notification-"
  39. obcNamespace := "default"
  40. ctx := context.TODO()
  41. clusterInfo := client.AdminTestClusterInfo(namespace)
  42. t := s.T()
  43. context := k8sh.MakeContext()
  44. objectStore, err := k8sh.RookClientset.CephV1().CephObjectStores(namespace).Get(ctx, storeName, metav1.GetOptions{})
  45. assert.Nil(t, err)
  46. rgwcontext, err := rgw.NewMultisiteContext(context, clusterInfo, objectStore)
  47. assert.Nil(t, err)
  48. notificationName := "my-notification"
  49. topicName := "my-topic"
  50. appLabel := "app=" + httpServerName
  51. logger.Infof("Testing Bucket Notifications on %s", storeName)
  52. t.Run("create HTTP Endpoint for receiving notifications", func(t *testing.T) {
  53. err := helper.TopicClient.CreateHTTPServer(httpServerName, httpServerNameSpace, httpServerPort)
  54. assert.Nil(t, err)
  55. })
  56. t.Run("create CephBucketTopic", func(t *testing.T) {
  57. err := helper.TopicClient.CreateTopic(topicName, storeName, httpEndpointService)
  58. assert.Nil(t, err)
  59. created := utils.Retry(30, 5*time.Second, "topic is created", func() bool {
  60. return helper.TopicClient.CheckTopic(topicName)
  61. })
  62. assert.True(t, created)
  63. logger.Info("CephBucketTopic created successfully")
  64. })
  65. t.Run("create CephBucketNotification", func(t *testing.T) {
  66. err := helper.NotificationClient.CreateNotification(notificationName, topicName)
  67. assert.Nil(t, err)
  68. created := utils.Retry(12, 2*time.Second, "notification is created", func() bool {
  69. return helper.NotificationClient.CheckNotificationCR(notificationName)
  70. })
  71. assert.True(t, created)
  72. logger.Info("CephBucketNotification created successfully")
  73. })
  74. t.Run("create ObjectBucketClaim", func(t *testing.T) {
  75. logger.Infof("create OBC %q with storageclass %q and notification %q", obcName, bucketStorageClassName, notificationName)
  76. cobErr := helper.BucketClient.CreateBucketStorageClass(namespace, storeName, bucketStorageClassName, "Delete")
  77. assert.Nil(t, cobErr)
  78. cobcErr := helper.BucketClient.CreateObcNotification(obcName, bucketStorageClassName, bucketname, notificationName, true)
  79. assert.Nil(t, cobcErr)
  80. created := utils.Retry(12, 2*time.Second, "OBC is created", func() bool {
  81. return helper.BucketClient.CheckOBC(obcName, "bound")
  82. })
  83. assert.True(t, created)
  84. logger.Info("OBC created successfully")
  85. var bkt rgw.ObjectBucket
  86. i := 0
  87. for i = 0; i < 4; i++ {
  88. b, code, err := rgw.GetBucket(rgwcontext, bucketname)
  89. if b != nil && err == nil {
  90. bkt = *b
  91. break
  92. }
  93. logger.Warningf("cannot get bucket %q, retrying... bucket: %v. code: %d, err: %v", bucketname, b, code, err)
  94. logger.Infof("(%d) check bucket exists, sleeping for 5 seconds ...", i)
  95. time.Sleep(5 * time.Second)
  96. }
  97. assert.NotEqual(t, 4, i)
  98. assert.Equal(t, bucketname, bkt.Name)
  99. logger.Info("OBC, Secret and ConfigMap created")
  100. })
  101. t.Run("perform s3 operations and check for notifications", func(t *testing.T) {
  102. var s3client *rgw.S3Agent
  103. s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
  104. s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
  105. s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
  106. if objectStore.Spec.IsTLSEnabled() {
  107. s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true)
  108. } else {
  109. s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true, nil)
  110. }
  111. assert.Nil(t, err)
  112. logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3endpoint, s3AccessKey, s3SecretKey)
  113. t.Run("put object", func(t *testing.T) {
  114. _, err := s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey1, contentType)
  115. assert.Nil(t, err)
  116. })
  117. t.Run("check for put bucket notification", func(t *testing.T) {
  118. notificationReceived, err := helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, putEvent, ObjectKey1)
  119. assert.True(t, notificationReceived)
  120. assert.Nil(t, err)
  121. // negative test case to confirm didn't receive any delete event notification
  122. notificationReceived, err = helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, deleteEvent, ObjectKey1)
  123. assert.False(t, notificationReceived)
  124. assert.Nil(t, err)
  125. })
  126. t.Run("delete objects", func(t *testing.T) {
  127. _, err := s3client.DeleteObjectInBucket(bucketname, ObjectKey1)
  128. assert.Nil(t, err)
  129. })
  130. t.Run("check for delete bucket notification", func(t *testing.T) {
  131. notificationReceived, err := helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, deleteEvent, ObjectKey1)
  132. assert.True(t, notificationReceived)
  133. assert.Nil(t, err)
  134. // negative test case to confirm didn't receive any put event notification
  135. notificationReceived, err = helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, putEvent, ObjectKey1)
  136. assert.False(t, notificationReceived)
  137. assert.Nil(t, err)
  138. })
  139. })
  140. t.Run("check CephBucketNotification created for bucket", func(t *testing.T) {
  141. notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
  142. return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
  143. })
  144. assert.True(t, notificationPresent)
  145. logger.Info("CephBucketNotification created successfully on bucket")
  146. })
  147. t.Run("add non-notification label to OBC", func(t *testing.T) {
  148. obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
  149. assert.Nil(t, err)
  150. obc.Labels["test-label"] = "test-value"
  151. _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
  152. assert.Nil(t, err)
  153. // check whether existing bucket notification unaffected
  154. notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
  155. // TODO : add api to fetch all the notification from backend to see if it is unaffected
  156. t.Skipped()
  157. return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
  158. })
  159. assert.True(t, notificationPresent)
  160. })
  161. t.Run("remove non-notification label from OBC", func(t *testing.T) {
  162. obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
  163. assert.Nil(t, err)
  164. delete(obc.Labels, "test-label")
  165. _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
  166. assert.Nil(t, err)
  167. // check whether existing bucket notification unaffected
  168. notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
  169. // TODO : add api to fetch all the notification from backend to see if it is unaffected
  170. t.Skipped()
  171. return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
  172. })
  173. assert.True(t, notificationPresent)
  174. })
  175. t.Run("remove notification label from OBC", func(t *testing.T) {
  176. // TODO: add remove notification label support in OBC
  177. t.Skipped()
  178. obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
  179. assert.Nil(t, err)
  180. delete(obc.Labels, bucketNotificationLabelPrefix+notificationName)
  181. _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
  182. assert.Nil(t, err)
  183. // check whether existing bucket notification uneffected
  184. var notificationPresent bool
  185. for i := 0; i < 4; i++ {
  186. notificationPresent = helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
  187. if !notificationPresent {
  188. break
  189. }
  190. time.Sleep(5 * time.Second)
  191. }
  192. assert.False(t, notificationPresent)
  193. })
  194. t.Run("perform s3 operations and confirm notifications are no longer received", func(t *testing.T) {
  195. var s3client *rgw.S3Agent
  196. s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
  197. s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
  198. s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
  199. if objectStore.Spec.IsTLSEnabled() {
  200. s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true)
  201. } else {
  202. s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true, nil)
  203. }
  204. assert.Nil(t, err)
  205. logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3endpoint, s3AccessKey, s3SecretKey)
  206. t.Run("put object", func(t *testing.T) {
  207. _, err := s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey2, contentType)
  208. assert.Nil(t, err)
  209. })
  210. t.Run("check for put bucket notification", func(t *testing.T) {
  211. notificationReceived, err := helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, putEvent, ObjectKey2)
  212. assert.False(t, notificationReceived)
  213. assert.Nil(t, err)
  214. })
  215. t.Run("delete objects", func(t *testing.T) {
  216. _, err := s3client.DeleteObjectInBucket(bucketname, ObjectKey1)
  217. assert.Nil(t, err)
  218. })
  219. t.Run("check for delete bucket notification", func(t *testing.T) {
  220. notificationReceived, err := helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, deleteEvent, ObjectKey2)
  221. assert.False(t, notificationReceived)
  222. assert.Nil(t, err)
  223. })
  224. })
  225. t.Run("add topic, notification to existing OBC", func(t *testing.T) {
  226. newNotificationName := "new-notification"
  227. newTopicName := "new-topic"
  228. t.Run("create CephBucketTopic: new-topic", func(t *testing.T) {
  229. err := helper.TopicClient.CreateTopic(newTopicName, storeName, httpEndpointService)
  230. assert.Nil(t, err)
  231. created := utils.Retry(12, 2*time.Second, "topic is created", func() bool {
  232. return helper.TopicClient.CheckTopic(newTopicName)
  233. })
  234. assert.True(t, created)
  235. })
  236. t.Run("create CephBucketNotification: new-notification", func(t *testing.T) {
  237. err = helper.NotificationClient.CreateNotification(newNotificationName, newTopicName)
  238. assert.Nil(t, err)
  239. created := utils.Retry(12, 2*time.Second, "notification is created", func() bool {
  240. return helper.NotificationClient.CheckNotificationCR(newNotificationName)
  241. })
  242. assert.True(t, created)
  243. })
  244. t.Run("add notification label to OBC", func(t *testing.T) {
  245. obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
  246. assert.Nil(t, err)
  247. obc.Labels[bucketNotificationLabelPrefix+newNotificationName] = newNotificationName
  248. _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
  249. assert.Nil(t, err)
  250. })
  251. t.Run("new-notification should be configured for bucket", func(t *testing.T) {
  252. // check whether bucket notification added
  253. notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
  254. return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, newNotificationName, helper, objectStore.Spec.IsTLSEnabled())
  255. })
  256. assert.True(t, notificationPresent)
  257. })
  258. t.Run("delete CephBucketNotification: new-notification", func(t *testing.T) {
  259. err = helper.NotificationClient.DeleteNotification(newNotificationName, topicName)
  260. assert.Nil(t, err)
  261. t.Run("check notification removed from backend Bucket", func(t *testing.T) {
  262. // TODO: need to add that support
  263. t.Skipped()
  264. })
  265. })
  266. t.Run("delete CephBucketTopic: new-topic", func(t *testing.T) {
  267. err = helper.TopicClient.DeleteTopic(newTopicName, storeName, httpEndpointService)
  268. assert.Nil(t, err)
  269. })
  270. })
  271. t.Run("reverse order of creating notification,topic and adding it to ObjectBucketClaim", func(t *testing.T) {
  272. reverseNotificationName := "reverse-notification"
  273. reverseTopicName := "reverse-topic"
  274. reverseOBCName := "reverse-obc"
  275. reverseBucketName := "reverse-bucket"
  276. i := 0
  277. t.Run("create ObjectBucketClaim: reverse-obc", func(t *testing.T) {
  278. err := helper.BucketClient.CreateObcNotification(reverseOBCName, bucketStorageClassName, reverseBucketName, reverseNotificationName, true)
  279. assert.Nil(t, err)
  280. created := utils.Retry(12, 2*time.Second, "OBC is created", func() bool {
  281. return helper.BucketClient.CheckOBC(reverseOBCName, "bound")
  282. })
  283. assert.True(t, created)
  284. logger.Info("OBC created successfully")
  285. var bkt rgw.ObjectBucket
  286. for i = 0; i < 4; i++ {
  287. b, code, err := rgw.GetBucket(rgwcontext, reverseBucketName)
  288. if b != nil && err == nil {
  289. bkt = *b
  290. break
  291. }
  292. logger.Warningf("cannot get bucket %q, retrying... bucket: %v. code: %d, err: %v", reverseBucketName, b, code, err)
  293. logger.Infof("(%d) check bucket exists, sleeping for 5 seconds ...", i)
  294. time.Sleep(5 * time.Second)
  295. }
  296. assert.NotEqual(t, 4, i)
  297. assert.Equal(t, reverseBucketName, bkt.Name)
  298. })
  299. t.Run("create CephBucketNotification: reverse-notification", func(t *testing.T) {
  300. err = helper.NotificationClient.CreateNotification(reverseNotificationName, reverseTopicName)
  301. assert.Nil(t, err)
  302. created := utils.Retry(12, 2*time.Second, "notification is created", func() bool {
  303. return helper.NotificationClient.CheckNotificationCR(reverseNotificationName)
  304. })
  305. assert.True(t, created)
  306. })
  307. t.Run("the notification should not configured for the backend bucket until topic is created", func(t *testing.T) {
  308. // check whether bucket notification added, should fail since topic is not created
  309. // TODO: make below check valid with help of status field in NotificationCR
  310. t.Skipped()
  311. })
  312. t.Run("create CephBucketTopic: reverse-topic", func(t *testing.T) {
  313. err = helper.TopicClient.CreateTopic(reverseTopicName, storeName, httpEndpointService)
  314. assert.Nil(t, err)
  315. created := utils.Retry(12, 2*time.Second, "topic is created", func() bool {
  316. return helper.TopicClient.CheckTopic(reverseTopicName)
  317. })
  318. assert.True(t, created)
  319. })
  320. t.Run("notification should be configured after creating the topic", func(t *testing.T) {
  321. // check whether bucket notification added, should pass since topic got created
  322. notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
  323. return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, helper, objectStore.Spec.IsTLSEnabled())
  324. })
  325. assert.True(t, notificationPresent)
  326. })
  327. t.Run("delete CephBucketNotification: reverse-notification", func(t *testing.T) {
  328. err = helper.NotificationClient.DeleteNotification(reverseNotificationName, reverseTopicName)
  329. assert.Nil(t, err)
  330. t.Run("check notification removed from backend Bucket", func(t *testing.T) {
  331. // TODO: need to add that support
  332. t.Skipped()
  333. })
  334. })
  335. t.Run("delete CephBucketTopic: reverse-topic", func(t *testing.T) {
  336. err = helper.TopicClient.DeleteTopic(reverseTopicName, storeName, httpEndpointService)
  337. assert.Nil(t, err)
  338. })
  339. t.Run("delete ObjectBucketClaim: reverse-obc", func(t *testing.T) {
  340. err = helper.BucketClient.DeleteObc(reverseOBCName, bucketStorageClassName, reverseBucketName, maxObject, true)
  341. assert.Nil(t, err)
  342. logger.Info("Checking to see if the obc, secret, and cm have all been deleted")
  343. for i = 0; i < 4 && !helper.BucketClient.CheckOBC(reverseOBCName, "deleted"); i++ {
  344. logger.Infof("(%d) obc deleted check, sleeping for 5 seconds ...", i)
  345. time.Sleep(5 * time.Second)
  346. }
  347. assert.NotEqual(t, 4, i)
  348. logger.Info("ensure OBC bucket was deleted")
  349. var rgwErr int
  350. for i = 0; i < 4; i++ {
  351. _, rgwErr, _ = rgw.GetBucket(rgwcontext, reverseBucketName)
  352. if rgwErr == rgw.RGWErrorNotFound {
  353. break
  354. }
  355. logger.Infof("(%d) check bucket deleted, sleeping for 5 seconds ...", i)
  356. time.Sleep(5 * time.Second)
  357. }
  358. assert.NotEqual(t, 4, i)
  359. assert.Equal(t, rgwErr, rgw.RGWErrorNotFound)
  360. })
  361. })
  362. t.Run("delete ObjectBucketClaim", func(t *testing.T) {
  363. i := 0
  364. dobcErr := helper.BucketClient.DeleteObc(obcName, bucketStorageClassName, bucketname, maxObject, true)
  365. assert.Nil(t, dobcErr)
  366. logger.Info("Checking to see if the obc, secret, and cm have all been deleted")
  367. for i = 0; i < 4 && !helper.BucketClient.CheckOBC(obcName, "deleted"); i++ {
  368. logger.Infof("(%d) obc deleted check, sleeping for 5 seconds ...", i)
  369. time.Sleep(5 * time.Second)
  370. }
  371. assert.NotEqual(t, 4, i)
  372. logger.Info("ensure OBC bucket was deleted")
  373. var rgwErr int
  374. for i = 0; i < 4; i++ {
  375. _, rgwErr, _ = rgw.GetBucket(rgwcontext, bucketname)
  376. if rgwErr == rgw.RGWErrorNotFound {
  377. break
  378. }
  379. logger.Infof("(%d) check bucket deleted, sleeping for 5 seconds ...", i)
  380. time.Sleep(5 * time.Second)
  381. }
  382. assert.NotEqual(t, 4, i)
  383. assert.Equal(t, rgwErr, rgw.RGWErrorNotFound)
  384. dobErr := helper.BucketClient.DeleteBucketStorageClass(namespace, storeName, bucketStorageClassName, "Delete")
  385. assert.Nil(t, dobErr)
  386. })
  387. t.Run("delete CephBucketNotification", func(t *testing.T) {
  388. err := helper.NotificationClient.DeleteNotification(notificationName, topicName)
  389. assert.Nil(t, err)
  390. t.Run("check notification removed from backend Bucket", func(t *testing.T) {
  391. // TODO: need to add that support
  392. t.Skipped()
  393. })
  394. })
  395. t.Run("delete CephBucketTopic", func(t *testing.T) {
  396. err := helper.TopicClient.DeleteTopic(topicName, storeName, httpEndpointService)
  397. assert.Nil(t, err)
  398. })
  399. t.Run("delete CephObjectStore", func(t *testing.T) {
  400. deleteObjectStore(t, k8sh, namespace, storeName)
  401. assertObjectStoreDeletion(t, k8sh, namespace, storeName)
  402. })
  403. }