123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452 |
- /*
- Copyright 2021 The Rook Authors. All rights reserved.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package integration
- import (
- "context"
- "testing"
- "time"
- "github.com/rook/rook/pkg/daemon/ceph/client"
- rgw "github.com/rook/rook/pkg/operator/ceph/object"
- "github.com/rook/rook/tests/framework/clients"
- "github.com/rook/rook/tests/framework/utils"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/suite"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- )
- const (
- httpServerName = "sample-http-server"
- httpServerNameSpace = "default"
- httpServerPort = "8080"
- httpEndpointService = "http://" + httpServerName + "." + httpServerNameSpace + ":" + httpServerPort
- putEvent = "ObjectCreated:Put"
- deleteEvent = "ObjectRemoved:Delete"
- )
- func testBucketNotifications(s *suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
- if utils.IsPlatformOpenShift() {
- s.T().Skip("bucket notification tests skipped on openshift")
- }
- bucketNotificationLabelPrefix := "bucket-notification-"
- obcNamespace := "default"
- ctx := context.TODO()
- clusterInfo := client.AdminTestClusterInfo(namespace)
- t := s.T()
- context := k8sh.MakeContext()
- objectStore, err := k8sh.RookClientset.CephV1().CephObjectStores(namespace).Get(ctx, storeName, metav1.GetOptions{})
- assert.Nil(t, err)
- rgwcontext, err := rgw.NewMultisiteContext(context, clusterInfo, objectStore)
- assert.Nil(t, err)
- notificationName := "my-notification"
- topicName := "my-topic"
- appLabel := "app=" + httpServerName
- logger.Infof("Testing Bucket Notifications on %s", storeName)
- t.Run("create HTTP Endpoint for receiving notifications", func(t *testing.T) {
- err := helper.TopicClient.CreateHTTPServer(httpServerName, httpServerNameSpace, httpServerPort)
- assert.Nil(t, err)
- })
- t.Run("create CephBucketTopic", func(t *testing.T) {
- err := helper.TopicClient.CreateTopic(topicName, storeName, httpEndpointService)
- assert.Nil(t, err)
- created := utils.Retry(30, 5*time.Second, "topic is created", func() bool {
- return helper.TopicClient.CheckTopic(topicName)
- })
- assert.True(t, created)
- logger.Info("CephBucketTopic created successfully")
- })
- t.Run("create CephBucketNotification", func(t *testing.T) {
- err := helper.NotificationClient.CreateNotification(notificationName, topicName)
- assert.Nil(t, err)
- created := utils.Retry(12, 2*time.Second, "notification is created", func() bool {
- return helper.NotificationClient.CheckNotificationCR(notificationName)
- })
- assert.True(t, created)
- logger.Info("CephBucketNotification created successfully")
- })
- t.Run("create ObjectBucketClaim", func(t *testing.T) {
- logger.Infof("create OBC %q with storageclass %q and notification %q", obcName, bucketStorageClassName, notificationName)
- cobErr := helper.BucketClient.CreateBucketStorageClass(namespace, storeName, bucketStorageClassName, "Delete")
- assert.Nil(t, cobErr)
- cobcErr := helper.BucketClient.CreateObcNotification(obcName, bucketStorageClassName, bucketname, notificationName, true)
- assert.Nil(t, cobcErr)
- created := utils.Retry(12, 2*time.Second, "OBC is created", func() bool {
- return helper.BucketClient.CheckOBC(obcName, "bound")
- })
- assert.True(t, created)
- logger.Info("OBC created successfully")
- var bkt rgw.ObjectBucket
- i := 0
- for i = 0; i < 4; i++ {
- b, code, err := rgw.GetBucket(rgwcontext, bucketname)
- if b != nil && err == nil {
- bkt = *b
- break
- }
- logger.Warningf("cannot get bucket %q, retrying... bucket: %v. code: %d, err: %v", bucketname, b, code, err)
- logger.Infof("(%d) check bucket exists, sleeping for 5 seconds ...", i)
- time.Sleep(5 * time.Second)
- }
- assert.NotEqual(t, 4, i)
- assert.Equal(t, bucketname, bkt.Name)
- logger.Info("OBC, Secret and ConfigMap created")
- })
- t.Run("perform s3 operations and check for notifications", func(t *testing.T) {
- var s3client *rgw.S3Agent
- s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
- s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
- s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
- if objectStore.Spec.IsTLSEnabled() {
- s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true)
- } else {
- s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true, nil)
- }
- assert.Nil(t, err)
- logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3endpoint, s3AccessKey, s3SecretKey)
- t.Run("put object", func(t *testing.T) {
- _, err := s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey1, contentType)
- assert.Nil(t, err)
- })
- t.Run("check for put bucket notification", func(t *testing.T) {
- notificationReceived, err := helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, putEvent, ObjectKey1)
- assert.True(t, notificationReceived)
- assert.Nil(t, err)
- // negative test case to confirm didn't receive any delete event notification
- notificationReceived, err = helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, deleteEvent, ObjectKey1)
- assert.False(t, notificationReceived)
- assert.Nil(t, err)
- })
- t.Run("delete objects", func(t *testing.T) {
- _, err := s3client.DeleteObjectInBucket(bucketname, ObjectKey1)
- assert.Nil(t, err)
- })
- t.Run("check for delete bucket notification", func(t *testing.T) {
- notificationReceived, err := helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, deleteEvent, ObjectKey1)
- assert.True(t, notificationReceived)
- assert.Nil(t, err)
- // negative test case to confirm didn't receive any put event notification
- notificationReceived, err = helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, putEvent, ObjectKey1)
- assert.False(t, notificationReceived)
- assert.Nil(t, err)
- })
- })
- t.Run("check CephBucketNotification created for bucket", func(t *testing.T) {
- notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
- return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
- })
- assert.True(t, notificationPresent)
- logger.Info("CephBucketNotification created successfully on bucket")
- })
- t.Run("add non-notification label to OBC", func(t *testing.T) {
- obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
- assert.Nil(t, err)
- obc.Labels["test-label"] = "test-value"
- _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
- assert.Nil(t, err)
- // check whether existing bucket notification unaffected
- notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
- // TODO : add api to fetch all the notification from backend to see if it is unaffected
- t.Skipped()
- return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
- })
- assert.True(t, notificationPresent)
- })
- t.Run("remove non-notification label from OBC", func(t *testing.T) {
- obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
- assert.Nil(t, err)
- delete(obc.Labels, "test-label")
- _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
- assert.Nil(t, err)
- // check whether existing bucket notification unaffected
- notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
- // TODO : add api to fetch all the notification from backend to see if it is unaffected
- t.Skipped()
- return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
- })
- assert.True(t, notificationPresent)
- })
- t.Run("remove notification label from OBC", func(t *testing.T) {
- // TODO: add remove notification label support in OBC
- t.Skipped()
- obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
- assert.Nil(t, err)
- delete(obc.Labels, bucketNotificationLabelPrefix+notificationName)
- _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
- assert.Nil(t, err)
- // check whether existing bucket notification uneffected
- var notificationPresent bool
- for i := 0; i < 4; i++ {
- notificationPresent = helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
- if !notificationPresent {
- break
- }
- time.Sleep(5 * time.Second)
- }
- assert.False(t, notificationPresent)
- })
- t.Run("perform s3 operations and confirm notifications are no longer received", func(t *testing.T) {
- var s3client *rgw.S3Agent
- s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
- s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
- s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
- if objectStore.Spec.IsTLSEnabled() {
- s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true)
- } else {
- s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, true, nil)
- }
- assert.Nil(t, err)
- logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3endpoint, s3AccessKey, s3SecretKey)
- t.Run("put object", func(t *testing.T) {
- _, err := s3client.PutObjectInBucket(bucketname, ObjBody, ObjectKey2, contentType)
- assert.Nil(t, err)
- })
- t.Run("check for put bucket notification", func(t *testing.T) {
- notificationReceived, err := helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, putEvent, ObjectKey2)
- assert.False(t, notificationReceived)
- assert.Nil(t, err)
- })
- t.Run("delete objects", func(t *testing.T) {
- _, err := s3client.DeleteObjectInBucket(bucketname, ObjectKey1)
- assert.Nil(t, err)
- })
- t.Run("check for delete bucket notification", func(t *testing.T) {
- notificationReceived, err := helper.NotificationClient.CheckNotificationFromHTTPEndPoint(appLabel, deleteEvent, ObjectKey2)
- assert.False(t, notificationReceived)
- assert.Nil(t, err)
- })
- })
- t.Run("add topic, notification to existing OBC", func(t *testing.T) {
- newNotificationName := "new-notification"
- newTopicName := "new-topic"
- t.Run("create CephBucketTopic: new-topic", func(t *testing.T) {
- err := helper.TopicClient.CreateTopic(newTopicName, storeName, httpEndpointService)
- assert.Nil(t, err)
- created := utils.Retry(12, 2*time.Second, "topic is created", func() bool {
- return helper.TopicClient.CheckTopic(newTopicName)
- })
- assert.True(t, created)
- })
- t.Run("create CephBucketNotification: new-notification", func(t *testing.T) {
- err = helper.NotificationClient.CreateNotification(newNotificationName, newTopicName)
- assert.Nil(t, err)
- created := utils.Retry(12, 2*time.Second, "notification is created", func() bool {
- return helper.NotificationClient.CheckNotificationCR(newNotificationName)
- })
- assert.True(t, created)
- })
- t.Run("add notification label to OBC", func(t *testing.T) {
- obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
- assert.Nil(t, err)
- obc.Labels[bucketNotificationLabelPrefix+newNotificationName] = newNotificationName
- _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
- assert.Nil(t, err)
- })
- t.Run("new-notification should be configured for bucket", func(t *testing.T) {
- // check whether bucket notification added
- notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
- return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, newNotificationName, helper, objectStore.Spec.IsTLSEnabled())
- })
- assert.True(t, notificationPresent)
- })
- t.Run("delete CephBucketNotification: new-notification", func(t *testing.T) {
- err = helper.NotificationClient.DeleteNotification(newNotificationName, topicName)
- assert.Nil(t, err)
- t.Run("check notification removed from backend Bucket", func(t *testing.T) {
- // TODO: need to add that support
- t.Skipped()
- })
- })
- t.Run("delete CephBucketTopic: new-topic", func(t *testing.T) {
- err = helper.TopicClient.DeleteTopic(newTopicName, storeName, httpEndpointService)
- assert.Nil(t, err)
- })
- })
- t.Run("reverse order of creating notification,topic and adding it to ObjectBucketClaim", func(t *testing.T) {
- reverseNotificationName := "reverse-notification"
- reverseTopicName := "reverse-topic"
- reverseOBCName := "reverse-obc"
- reverseBucketName := "reverse-bucket"
- i := 0
- t.Run("create ObjectBucketClaim: reverse-obc", func(t *testing.T) {
- err := helper.BucketClient.CreateObcNotification(reverseOBCName, bucketStorageClassName, reverseBucketName, reverseNotificationName, true)
- assert.Nil(t, err)
- created := utils.Retry(12, 2*time.Second, "OBC is created", func() bool {
- return helper.BucketClient.CheckOBC(reverseOBCName, "bound")
- })
- assert.True(t, created)
- logger.Info("OBC created successfully")
- var bkt rgw.ObjectBucket
- for i = 0; i < 4; i++ {
- b, code, err := rgw.GetBucket(rgwcontext, reverseBucketName)
- if b != nil && err == nil {
- bkt = *b
- break
- }
- logger.Warningf("cannot get bucket %q, retrying... bucket: %v. code: %d, err: %v", reverseBucketName, b, code, err)
- logger.Infof("(%d) check bucket exists, sleeping for 5 seconds ...", i)
- time.Sleep(5 * time.Second)
- }
- assert.NotEqual(t, 4, i)
- assert.Equal(t, reverseBucketName, bkt.Name)
- })
- t.Run("create CephBucketNotification: reverse-notification", func(t *testing.T) {
- err = helper.NotificationClient.CreateNotification(reverseNotificationName, reverseTopicName)
- assert.Nil(t, err)
- created := utils.Retry(12, 2*time.Second, "notification is created", func() bool {
- return helper.NotificationClient.CheckNotificationCR(reverseNotificationName)
- })
- assert.True(t, created)
- })
- t.Run("the notification should not configured for the backend bucket until topic is created", func(t *testing.T) {
- // check whether bucket notification added, should fail since topic is not created
- // TODO: make below check valid with help of status field in NotificationCR
- t.Skipped()
- })
- t.Run("create CephBucketTopic: reverse-topic", func(t *testing.T) {
- err = helper.TopicClient.CreateTopic(reverseTopicName, storeName, httpEndpointService)
- assert.Nil(t, err)
- created := utils.Retry(12, 2*time.Second, "topic is created", func() bool {
- return helper.TopicClient.CheckTopic(reverseTopicName)
- })
- assert.True(t, created)
- })
- t.Run("notification should be configured after creating the topic", func(t *testing.T) {
- // check whether bucket notification added, should pass since topic got created
- notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
- return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, helper, objectStore.Spec.IsTLSEnabled())
- })
- assert.True(t, notificationPresent)
- })
- t.Run("delete CephBucketNotification: reverse-notification", func(t *testing.T) {
- err = helper.NotificationClient.DeleteNotification(reverseNotificationName, reverseTopicName)
- assert.Nil(t, err)
- t.Run("check notification removed from backend Bucket", func(t *testing.T) {
- // TODO: need to add that support
- t.Skipped()
- })
- })
- t.Run("delete CephBucketTopic: reverse-topic", func(t *testing.T) {
- err = helper.TopicClient.DeleteTopic(reverseTopicName, storeName, httpEndpointService)
- assert.Nil(t, err)
- })
- t.Run("delete ObjectBucketClaim: reverse-obc", func(t *testing.T) {
- err = helper.BucketClient.DeleteObc(reverseOBCName, bucketStorageClassName, reverseBucketName, maxObject, true)
- assert.Nil(t, err)
- logger.Info("Checking to see if the obc, secret, and cm have all been deleted")
- for i = 0; i < 4 && !helper.BucketClient.CheckOBC(reverseOBCName, "deleted"); i++ {
- logger.Infof("(%d) obc deleted check, sleeping for 5 seconds ...", i)
- time.Sleep(5 * time.Second)
- }
- assert.NotEqual(t, 4, i)
- logger.Info("ensure OBC bucket was deleted")
- var rgwErr int
- for i = 0; i < 4; i++ {
- _, rgwErr, _ = rgw.GetBucket(rgwcontext, reverseBucketName)
- if rgwErr == rgw.RGWErrorNotFound {
- break
- }
- logger.Infof("(%d) check bucket deleted, sleeping for 5 seconds ...", i)
- time.Sleep(5 * time.Second)
- }
- assert.NotEqual(t, 4, i)
- assert.Equal(t, rgwErr, rgw.RGWErrorNotFound)
- })
- })
- t.Run("delete ObjectBucketClaim", func(t *testing.T) {
- i := 0
- dobcErr := helper.BucketClient.DeleteObc(obcName, bucketStorageClassName, bucketname, maxObject, true)
- assert.Nil(t, dobcErr)
- logger.Info("Checking to see if the obc, secret, and cm have all been deleted")
- for i = 0; i < 4 && !helper.BucketClient.CheckOBC(obcName, "deleted"); i++ {
- logger.Infof("(%d) obc deleted check, sleeping for 5 seconds ...", i)
- time.Sleep(5 * time.Second)
- }
- assert.NotEqual(t, 4, i)
- logger.Info("ensure OBC bucket was deleted")
- var rgwErr int
- for i = 0; i < 4; i++ {
- _, rgwErr, _ = rgw.GetBucket(rgwcontext, bucketname)
- if rgwErr == rgw.RGWErrorNotFound {
- break
- }
- logger.Infof("(%d) check bucket deleted, sleeping for 5 seconds ...", i)
- time.Sleep(5 * time.Second)
- }
- assert.NotEqual(t, 4, i)
- assert.Equal(t, rgwErr, rgw.RGWErrorNotFound)
- dobErr := helper.BucketClient.DeleteBucketStorageClass(namespace, storeName, bucketStorageClassName, "Delete")
- assert.Nil(t, dobErr)
- })
- t.Run("delete CephBucketNotification", func(t *testing.T) {
- err := helper.NotificationClient.DeleteNotification(notificationName, topicName)
- assert.Nil(t, err)
- t.Run("check notification removed from backend Bucket", func(t *testing.T) {
- // TODO: need to add that support
- t.Skipped()
- })
- })
- t.Run("delete CephBucketTopic", func(t *testing.T) {
- err := helper.TopicClient.DeleteTopic(topicName, storeName, httpEndpointService)
- assert.Nil(t, err)
- })
- t.Run("delete CephObjectStore", func(t *testing.T) {
- deleteObjectStore(t, k8sh, namespace, storeName)
- assertObjectStoreDeletion(t, k8sh, namespace, storeName)
- })
- }
|