123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589 |
- /*
- Copyright 2016 The Rook Authors. All rights reserved.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package integration
- import (
- "context"
- "fmt"
- "strings"
- "testing"
- "time"
- cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
- cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
- "github.com/rook/rook/pkg/operator/k8sutil"
- "github.com/rook/rook/tests/framework/clients"
- "github.com/rook/rook/tests/framework/installer"
- "github.com/rook/rook/tests/framework/utils"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "github.com/stretchr/testify/suite"
- v1 "k8s.io/api/core/v1"
- kerrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/wait"
- )
- const (
- filePodName = "file-test"
- )
- func fileSystemCSICloneTest(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, storageClassName, systemNamespace string) {
- // create pvc and app
- pvcSize := "1Gi"
- pvcName := "parent-pvc"
- podName := "demo-pod"
- readOnly := false
- mountPoint := "/var/lib/test"
- logger.Infof("create a PVC")
- err := helper.FSClient.CreatePVC(defaultNamespace, pvcName, storageClassName, "ReadWriteOnce", pvcSize)
- require.NoError(s.T(), err)
- require.True(s.T(), k8sh.WaitUntilPVCIsBound(defaultNamespace, pvcName), "Make sure PVC is Bound")
- logger.Infof("bind PVC to application")
- err = helper.FSClient.CreatePod(podName, pvcName, defaultNamespace, mountPoint, readOnly)
- assert.NoError(s.T(), err)
- logger.Infof("check pod is in running state")
- require.True(s.T(), k8sh.IsPodRunning(podName, defaultNamespace), "make sure pod is in running state")
- logger.Infof("Storage Mounted successfully")
- // write data to pvc get the checksum value
- logger.Infof("write data to pvc")
- cmd := fmt.Sprintf("dd if=/dev/zero of=%s/file.out bs=1MB count=10 status=none conv=fsync && md5sum %s/file.out", mountPoint, mountPoint)
- resp, err := k8sh.RunCommandInPod(defaultNamespace, podName, cmd)
- require.NoError(s.T(), err)
- pvcChecksum := strings.Fields(resp)
- require.Equal(s.T(), len(pvcChecksum), 2)
- clonePVCName := "clone-pvc"
- logger.Infof("create a new pvc from pvc")
- err = helper.FSClient.CreatePVCClone(defaultNamespace, clonePVCName, pvcName, storageClassName, "ReadWriteOnce", pvcSize)
- require.NoError(s.T(), err)
- require.True(s.T(), k8sh.WaitUntilPVCIsBound(defaultNamespace, clonePVCName), "Make sure PVC is Bound")
- clonePodName := "clone-pod"
- logger.Infof("bind PVC clone to application")
- err = helper.FSClient.CreatePod(clonePodName, clonePVCName, defaultNamespace, mountPoint, readOnly)
- assert.NoError(s.T(), err)
- logger.Infof("check pod is in running state")
- require.True(s.T(), k8sh.IsPodRunning(clonePodName, defaultNamespace), "make sure pod is in running state")
- logger.Infof("Storage Mounted successfully")
- // get the checksum of the data and validate it
- logger.Infof("check md5sum of both pvc and clone data is same")
- cmd = fmt.Sprintf("md5sum %s/file.out", mountPoint)
- resp, err = k8sh.RunCommandInPod(defaultNamespace, clonePodName, cmd)
- require.NoError(s.T(), err)
- clonePVCChecksum := strings.Fields(resp)
- require.Equal(s.T(), len(clonePVCChecksum), 2)
- // compare the checksum value and verify the values are equal
- assert.Equal(s.T(), clonePVCChecksum[0], pvcChecksum[0])
- // delete clone PVC and app
- logger.Infof("delete clone pod")
- err = k8sh.DeletePod(k8sutil.DefaultNamespace, clonePodName)
- require.NoError(s.T(), err)
- logger.Infof("delete clone pvc")
- err = helper.FSClient.DeletePVC(defaultNamespace, clonePVCName)
- assertNoErrorUnlessNotFound(s, err)
- assert.True(s.T(), k8sh.WaitUntilPVCIsDeleted(defaultNamespace, clonePVCName))
- // delete the parent PVC and app
- err = k8sh.DeletePod(k8sutil.DefaultNamespace, podName)
- require.NoError(s.T(), err)
- logger.Infof("delete parent pvc")
- err = helper.FSClient.DeletePVC(defaultNamespace, pvcName)
- assertNoErrorUnlessNotFound(s, err)
- assert.True(s.T(), k8sh.WaitUntilPVCIsDeleted(defaultNamespace, pvcName))
- }
- func fileSystemCSISnapshotTest(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, storageClassName, namespace string, testNFS bool) {
- logger.Infof("install snapshot CRD")
- err := k8sh.CreateSnapshotCRD()
- require.NoError(s.T(), err)
- logger.Infof("install snapshot controller")
- err = k8sh.CreateSnapshotController()
- require.NoError(s.T(), err)
- // cleanup the CRD and controller in defer to make sure the CRD and
- // controller are removed as block test also install CRD and controller.
- defer func() {
- logger.Infof("delete snapshot-controller")
- err = k8sh.DeleteSnapshotController()
- require.NoError(s.T(), err)
- logger.Infof("delete snapshot CRD")
- err = k8sh.DeleteSnapshotCRD()
- require.NoError(s.T(), err)
- }()
- logger.Infof("check snapshot controller is running")
- err = k8sh.WaitForSnapshotController(15)
- require.NoError(s.T(), err)
- // create snapshot class
- snapshotDeletePolicy := "Delete"
- snapshotClassName := "snapshot-testing"
- logger.Infof("create snapshotclass")
- if !testNFS {
- err = helper.FSClient.CreateSnapshotClass(snapshotClassName, snapshotDeletePolicy, namespace)
- } else {
- err = helper.NFSClient.CreateSnapshotClass(snapshotClassName, snapshotDeletePolicy)
- }
- require.NoError(s.T(), err)
- // create pvc and app
- pvcSize := "1Gi"
- pvcName := "snap-pvc"
- podName := "demo-pod"
- readOnly := false
- mountPoint := "/var/lib/test"
- logger.Infof("create a PVC")
- err = helper.FSClient.CreatePVC(defaultNamespace, pvcName, storageClassName, "ReadWriteOnce", pvcSize)
- require.NoError(s.T(), err)
- require.True(s.T(), k8sh.WaitUntilPVCIsBound(defaultNamespace, pvcName), "Make sure PVC is Bound")
- logger.Infof("bind PVC to application")
- err = helper.FSClient.CreatePod(podName, pvcName, defaultNamespace, mountPoint, readOnly)
- assert.NoError(s.T(), err)
- logger.Infof("check pod is in running state")
- require.True(s.T(), k8sh.IsPodRunning(podName, defaultNamespace), "make sure pod is in running state")
- logger.Infof("Storage Mounted successfully")
- // write data to pvc get the checksum value
- logger.Infof("write data to pvc")
- cmd := fmt.Sprintf("dd if=/dev/zero of=%s/file.out bs=1MB count=10 status=none conv=fsync && md5sum %s/file.out", mountPoint, mountPoint)
- resp, err := k8sh.RunCommandInPod(defaultNamespace, podName, cmd)
- require.NoError(s.T(), err)
- pvcChecksum := strings.Fields(resp)
- require.Equal(s.T(), len(pvcChecksum), 2)
- // create a snapshot
- snapshotName := "fs-pvc-snapshot"
- logger.Infof("create a snapshot from pvc")
- err = helper.FSClient.CreateSnapshot(snapshotName, pvcName, snapshotClassName, defaultNamespace)
- require.NoError(s.T(), err)
- restorePVCName := "restore-fs-pvc"
- // check snapshot is in ready state
- ready, err := k8sh.CheckSnapshotISReadyToUse(snapshotName, defaultNamespace, 15)
- require.NoError(s.T(), err)
- require.True(s.T(), ready, "make sure snapshot is in ready state")
- // create restore from snapshot and bind it to app
- logger.Infof("restore pvc to a new snapshot")
- err = helper.FSClient.CreatePVCRestore(defaultNamespace, restorePVCName, snapshotName, storageClassName, "ReadWriteOnce", pvcSize)
- require.NoError(s.T(), err)
- require.True(s.T(), k8sh.WaitUntilPVCIsBound(defaultNamespace, restorePVCName), "Make sure PVC is Bound")
- restorePodName := "restore-pod"
- logger.Infof("bind PVC Restore to application")
- err = helper.FSClient.CreatePod(restorePodName, restorePVCName, defaultNamespace, mountPoint, readOnly)
- assert.NoError(s.T(), err)
- logger.Infof("check pod is in running state")
- require.True(s.T(), k8sh.IsPodRunning(restorePodName, defaultNamespace), "make sure pod is in running state")
- logger.Infof("Storage Mounted successfully")
- // get the checksum of the data and validate it
- logger.Infof("check md5sum of both pvc and restore data is same")
- cmd = fmt.Sprintf("md5sum %s/file.out", mountPoint)
- resp, err = k8sh.RunCommandInPod(defaultNamespace, restorePodName, cmd)
- require.NoError(s.T(), err)
- restorePVCChecksum := strings.Fields(resp)
- require.Equal(s.T(), len(restorePVCChecksum), 2)
- // compare the checksum value and verify the values are equal
- assert.Equal(s.T(), restorePVCChecksum[0], pvcChecksum[0])
- // delete clone PVC and app
- logger.Infof("delete restore pod")
- err = k8sh.DeletePod(k8sutil.DefaultNamespace, restorePodName)
- require.NoError(s.T(), err)
- logger.Infof("delete restore pvc")
- err = helper.FSClient.DeletePVC(defaultNamespace, restorePVCName)
- assertNoErrorUnlessNotFound(s, err)
- assert.True(s.T(), k8sh.WaitUntilPVCIsDeleted(defaultNamespace, restorePVCName))
- // delete the snapshot
- logger.Infof("delete snapshot")
- err = helper.FSClient.DeleteSnapshot(snapshotName, pvcName, snapshotClassName, defaultNamespace)
- require.NoError(s.T(), err)
- logger.Infof("delete application pod")
- // delete the parent PVC and app
- err = k8sh.DeletePod(k8sutil.DefaultNamespace, podName)
- require.NoError(s.T(), err)
- logger.Infof("delete parent pvc")
- err = helper.FSClient.DeletePVC(defaultNamespace, pvcName)
- assertNoErrorUnlessNotFound(s, err)
- assert.True(s.T(), k8sh.WaitUntilPVCIsDeleted(defaultNamespace, pvcName))
- logger.Infof("delete snapshotclass")
- if !testNFS {
- err = helper.FSClient.DeleteSnapshotClass(snapshotClassName, snapshotDeletePolicy, namespace)
- } else {
- err = helper.NFSClient.DeleteSnapshotClass(snapshotClassName, snapshotDeletePolicy)
- }
- require.NoError(s.T(), err)
- logger.Infof("delete snapshot-controller")
- }
- // Smoke Test for File System Storage - Test check the following operations on Filesystem Storage in order
- // Create,Mount,Write,Read,Unmount and Delete.
- func runFileE2ETest(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, filesystemName string, preserveFilesystemOnDelete bool) {
- defer fileTestDataCleanUp(helper, k8sh, s, filePodName, settings.Namespace, filesystemName)
- logger.Infof("Running on Rook Cluster %s", settings.Namespace)
- logger.Infof("File Storage End To End Integration Test - create, mount, write to, read from, and unmount")
- activeCount := 2
- createFilesystem(helper, k8sh, s, settings, filesystemName, activeCount)
- if preserveFilesystemOnDelete {
- _, err := k8sh.Kubectl("-n", settings.Namespace, "patch", "CephFilesystem", filesystemName, "--type=merge", "-p", `{"spec": {"preserveFilesystemOnDelete": true}}`)
- assert.NoError(s.T(), err)
- }
- // Create a test pod where CephFS is consumed without user creds
- storageClassName := "cephfs-storageclass"
- err := helper.FSClient.CreateStorageClass(filesystemName, settings.OperatorNamespace, settings.Namespace, storageClassName)
- assert.NoError(s.T(), err)
- createFilesystemConsumerPod(helper, k8sh, s, settings, filesystemName, storageClassName)
- // Test reading and writing to the first pod
- err = writeAndReadToFilesystem(helper, k8sh, s, settings.Namespace, filePodName, "test_file")
- assert.NoError(s.T(), err)
- t := s.T()
- ctx := context.TODO()
- // TODO: there is a regression here where MDSes don't actually scale down, and this test
- // wasn't catching it. Enabling this test causes the controller to enter into a new reconcile
- // loop and makes the next phase of the test take much longer than it should, making it flaky.
- // Rook issue https://github.com/rook/rook/issues/9857 is tracking this issue.
- // t.Run("filesystem should be able to be scaled down", func(t *testing.T) {
- // downscaleMetadataServers(helper, k8sh, t, settings.Namespace, filesystemName)
- // })
- subvolGroupName := "my-subvolume-group"
- t.Run("install CephFilesystemSubVolumeGroup", func(t *testing.T) {
- err = helper.FSClient.CreateSubvolumeGroup(filesystemName, subvolGroupName)
- assert.NoError(t, err)
- })
- t.Run("delete CephFilesystem should be blocked by csi volumes and CephFilesystemSubVolumeGroup", func(t *testing.T) {
- // NOTE: CephFilesystems do not set "Deleting" phase when they are deleting, so we can't
- // rely on that here
- err := k8sh.RookClientset.CephV1().CephFilesystems(settings.Namespace).Delete(
- ctx, filesystemName, metav1.DeleteOptions{})
- assert.NoError(t, err)
- var cond *cephv1.Condition
- err = wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, 45*time.Second, true, func(context context.Context) (done bool, err error) {
- logger.Infof("waiting for CephFilesystem %q in namespace %q to have condition %q",
- filesystemName, settings.Namespace, cephv1.ConditionDeletionIsBlocked)
- fs, err := k8sh.RookClientset.CephV1().CephFilesystems(settings.Namespace).Get(
- ctx, filesystemName, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- logger.Infof("conditions: %+v", fs.Status.Conditions)
- cond = cephv1.FindStatusCondition(fs.Status.Conditions, cephv1.ConditionDeletionIsBlocked)
- if cond != nil {
- logger.Infof("CephFilesystem %q in namespace %q has condition %q",
- filesystemName, settings.Namespace, cephv1.ConditionDeletionIsBlocked)
- return true, nil
- }
- return false, nil
- })
- assert.NoError(t, err)
- if cond == nil {
- return
- }
- logger.Infof("verifying CephFilesystem %q condition %q is correct: %+v",
- filesystemName, cephv1.ConditionDeletionIsBlocked, cond)
- assert.Equal(t, v1.ConditionTrue, cond.Status)
- assert.Equal(t, cephv1.ObjectHasDependentsReason, cond.Reason)
- // the CephFilesystemSubVolumeGroup and the "csi" subvolumegroup should both block deletion
- assert.Contains(t, cond.Message, "CephFilesystemSubVolumeGroups")
- assert.Contains(t, cond.Message, subvolGroupName)
- assert.Contains(t, cond.Message, "filesystem subvolume groups that contain subvolumes")
- assert.Contains(t, cond.Message, "csi")
- })
- t.Run("deleting CephFilesystemSubVolumeGroup should partially unblock CephFilesystem deletion", func(t *testing.T) {
- err = helper.FSClient.DeleteSubvolumeGroup(filesystemName, subvolGroupName)
- assert.NoError(t, err)
- var cond *cephv1.Condition
- err = wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, 18*time.Second, true, func(context context.Context) (done bool, err error) {
- logger.Infof("waiting for CephFilesystem %q in namespace %q no longer be blocked by CephFilesystemSubVolumeGroups",
- filesystemName, settings.Namespace)
- fs, err := k8sh.RookClientset.CephV1().CephFilesystems(settings.Namespace).Get(
- ctx, filesystemName, metav1.GetOptions{})
- if err != nil {
- return false, err
- }
- cond = cephv1.FindStatusCondition(fs.Status.Conditions, cephv1.ConditionDeletionIsBlocked)
- if cond == nil {
- logger.Warningf("could not find condition %q on CephFilesystem %q", cephv1.ConditionDeletionIsBlocked, filesystemName)
- return false, nil
- }
- if !strings.Contains(cond.Message, "CephFilesystemSubVolumeGroup") {
- logger.Infof("CephFilesystem %q deletion is no longer blocked by CephFilesystemSubVolumeGroups", filesystemName)
- return true, nil
- }
- return false, nil
- })
- assert.NoError(t, err)
- if cond == nil {
- return
- }
- logger.Infof("verifying CephFilesystem %q condition %q is correct: %+v",
- filesystemName, cephv1.ConditionDeletionIsBlocked, cond)
- assert.Equal(t, v1.ConditionTrue, cond.Status)
- assert.Equal(t, cephv1.ObjectHasDependentsReason, cond.Reason)
- // only the raw subvolumegroups should block deletion
- assert.Contains(t, cond.Message, "filesystem subvolume groups that contain subvolumes")
- assert.Contains(t, cond.Message, "csi")
- })
- t.Run("deleting filesystem consumer pod+pvc should fully unblock CephFilesystem deletion", func(t *testing.T) {
- // Cleanup the filesystem and its clients
- cleanupFilesystemConsumer(helper, k8sh, s, settings.Namespace, filePodName)
- err = wait.PollUntilContextTimeout(context.TODO(), 3*time.Second, 30*time.Second, true, func(context context.Context) (done bool, err error) {
- logger.Infof("waiting for CephFilesystem %q in namespace %q to be deleted", filesystemName, settings.Namespace)
- _, err = k8sh.RookClientset.CephV1().CephFilesystems(settings.Namespace).Get(
- ctx, filesystemName, metav1.GetOptions{})
- if err != nil && kerrors.IsNotFound(err) {
- return true, nil
- }
- return false, nil
- })
- logger.Infof("CephFilesystem %q in namespace %q was deleted successfully", filesystemName, settings.Namespace)
- })
- err = helper.FSClient.DeleteStorageClass(storageClassName)
- assertNoErrorUnlessNotFound(s, err)
- if preserveFilesystemOnDelete {
- fses, err := helper.FSClient.List(settings.Namespace)
- assert.NoError(s.T(), err)
- assert.Len(s.T(), fses, 1)
- assert.Equal(s.T(), fses[0].Name, filesystemName)
- err = helper.FSClient.Delete(filesystemName, settings.Namespace)
- assert.NoError(s.T(), err)
- }
- }
- func createFilesystemConsumerPod(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, filesystemName, storageClassName string) {
- err := createPodWithFilesystem(k8sh, s, settings, filePodName, filesystemName, storageClassName, false)
- require.NoError(s.T(), err)
- filePodRunning := k8sh.IsPodRunning(filePodName, settings.Namespace)
- require.True(s.T(), filePodRunning, "make sure file-test pod is in running state")
- logger.Infof("File system mounted successfully")
- }
- func writeAndReadToFilesystem(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, namespace, podName, filename string) error {
- logger.Infof("Write to file system")
- message := "Test Data for file system storage"
- if err := k8sh.WriteToPod(namespace, podName, filename, message); err != nil {
- return err
- }
- return k8sh.ReadFromPod(namespace, podName, filename, message)
- }
- // func downscaleMetadataServers(helper *clients.TestClient, k8sh *utils.K8sHelper, t *testing.T, namespace, fsName string) {
- // logger.Infof("downscaling file system metadata servers")
- // err := helper.FSClient.ScaleDown(fsName, namespace)
- // require.Nil(t, err)
- // }
- func cleanupFilesystemConsumer(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, namespace string, podName string) {
- logger.Infof("Delete file System consumer")
- err := k8sh.DeletePod(namespace, podName)
- assert.Nil(s.T(), err)
- if !k8sh.IsPodTerminated(podName, namespace) {
- k8sh.PrintPodDescribe(namespace, podName)
- assert.Fail(s.T(), fmt.Sprintf("make sure %s pod is terminated", podName))
- }
- err = helper.FSClient.DeletePVC(namespace, podName)
- assertNoErrorUnlessNotFound(s, err)
- isdeleted := k8sh.WaitUntilPVCIsDeleted(namespace, podName)
- if !isdeleted {
- assert.Fail(s.T(), fmt.Sprintf("Failed to delete PVC %q", podName))
- }
- isPVListZero := k8sh.WaitUntilZeroPVs()
- if !isPVListZero {
- assert.Fail(s.T(), "PV list is not zero")
- }
- logger.Infof("File system consumer deleted")
- }
- // cleanupFilesystem cleans up the filesystem and checks if all mds pods are terminated before continuing
- func cleanupFilesystem(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, namespace string, filesystemName string) {
- logger.Infof("Deleting file system")
- err := helper.FSClient.Delete(filesystemName, namespace)
- assert.Nil(s.T(), err)
- logger.Infof("File system %s deleted", filesystemName)
- }
- // Test File System Creation on Rook that was installed on a custom namespace i.e. Namespace != "rook" and delete it again
- func runFileE2ETestLite(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, filesystemName string) {
- logger.Infof("File Storage End to End Integration Test - create Filesystem and make sure mds pod is running")
- logger.Infof("Running on Rook Cluster %s", settings.Namespace)
- activeCount := 1
- createFilesystem(helper, k8sh, s, settings, filesystemName, activeCount)
- // Create a test pod where CephFS is consumed without user creds
- storageClassName := "cephfs-storageclass"
- err := helper.FSClient.CreateStorageClass(filesystemName, settings.OperatorNamespace, settings.Namespace, storageClassName)
- assert.NoError(s.T(), err)
- assert.NoError(s.T(), err)
- fileSystemCSISnapshotTest(helper, k8sh, s, storageClassName, settings.Namespace, false)
- fileSystemCSICloneTest(helper, k8sh, s, storageClassName, settings.Namespace)
- cleanupFilesystem(helper, k8sh, s, settings.Namespace, filesystemName)
- err = helper.FSClient.DeleteStorageClass(storageClassName)
- assertNoErrorUnlessNotFound(s, err)
- }
- func createFilesystem(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, filesystemName string, activeCount int) {
- logger.Infof("Create file System")
- fscErr := helper.FSClient.Create(filesystemName, settings.Namespace, activeCount)
- require.Nil(s.T(), fscErr)
- var err error
- var filesystemList []cephclient.CephFilesystem
- for i := 1; i <= 10; i++ {
- filesystemList, err = helper.FSClient.List(settings.Namespace)
- if err != nil {
- logger.Errorf("failed to list fs. trying again. %v", err)
- continue
- }
- logger.Debugf("filesystemList is %+v", filesystemList)
- if len(filesystemList) == 1 {
- logger.Infof("File system %s created", filesystemList[0].Name)
- break
- }
- logger.Infof("Waiting for file system %s to be created", filesystemName)
- time.Sleep(time.Second * 5)
- }
- logger.Debugf("filesystemList is %+v", filesystemList)
- require.Equal(s.T(), 1, len(filesystemList), "There should be one shared file system present")
- }
- func fileTestDataCleanUp(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, podName string, namespace string, filesystemName string) {
- logger.Infof("Cleaning up file system")
- err := k8sh.DeletePod(namespace, podName)
- assert.NoError(s.T(), err)
- err = helper.FSClient.Delete(filesystemName, namespace)
- assert.NoError(s.T(), err)
- }
- func createPodWithFilesystem(k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, podName, filesystemName, storageClassName string, mountUser bool) error {
- testPodManifest := getFilesystemCSITestPod(settings, podName, storageClassName)
- if err := k8sh.ResourceOperation("create", testPodManifest); err != nil {
- return fmt.Errorf("failed to create pod -- %s. %+v", testPodManifest, err)
- }
- return nil
- }
- func getFilesystemCSITestPod(settings *installer.TestCephSettings, podName, storageClassName string) string {
- claimName := podName
- return `
- apiVersion: v1
- kind: PersistentVolumeClaim
- metadata:
- name: ` + claimName + `
- namespace: ` + settings.Namespace + `
- spec:
- accessModes:
- - ReadWriteOnce
- resources:
- requests:
- storage: 1Gi
- storageClassName: ` + storageClassName + `
- ---
- apiVersion: v1
- kind: Pod
- metadata:
- name: ` + podName + `
- namespace: ` + settings.Namespace + `
- spec:
- containers:
- - name: ` + podName + `
- image: busybox
- command:
- - sh
- - "-c"
- - "touch ` + utils.TestMountPath + `/csi.test && sleep 3600"
- imagePullPolicy: IfNotPresent
- env:
- volumeMounts:
- - mountPath: ` + utils.TestMountPath + `
- name: csivol
- volumes:
- - name: csivol
- persistentVolumeClaim:
- claimName: ` + claimName + `
- readOnly: false
- restartPolicy: Never
- `
- }
- func waitForFilesystemActive(k8sh *utils.K8sHelper, clusterInfo *cephclient.ClusterInfo, filesystemName string) error {
- command, args := cephclient.FinalizeCephCommandArgs("ceph", clusterInfo, []string{"fs", "status", filesystemName}, k8sh.MakeContext().ConfigDir)
- var stat string
- var err error
- logger.Infof("waiting for filesystem %q to be active", filesystemName)
- for i := 0; i < utils.RetryLoop; i++ {
- // run the ceph fs status command
- stat, err := k8sh.MakeContext().Executor.ExecuteCommandWithCombinedOutput(command, args...)
- if err != nil {
- logger.Warningf("failed to get filesystem %q status. %+v", filesystemName, err)
- }
- // as long as at least one mds is active, it's okay
- if strings.Contains(stat, "active") {
- logger.Infof("done waiting for filesystem %q to be active", filesystemName)
- return nil
- }
- logger.Infof("waiting for filesystem %q to be active. status=%s", filesystemName, stat)
- time.Sleep(utils.RetryInterval * time.Second)
- }
- return fmt.Errorf("gave up waiting to get filesystem %q status [err: %+v] Status returned:\n%s", filesystemName, err, stat)
- }
|