ceph_base_file_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. /*
  2. Copyright 2016 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package integration
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "testing"
  19. "time"
  20. cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
  21. cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
  22. "github.com/rook/rook/pkg/operator/k8sutil"
  23. "github.com/rook/rook/tests/framework/clients"
  24. "github.com/rook/rook/tests/framework/installer"
  25. "github.com/rook/rook/tests/framework/utils"
  26. "github.com/stretchr/testify/assert"
  27. "github.com/stretchr/testify/require"
  28. "github.com/stretchr/testify/suite"
  29. v1 "k8s.io/api/core/v1"
  30. kerrors "k8s.io/apimachinery/pkg/api/errors"
  31. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/apimachinery/pkg/util/wait"
  33. )
  34. const (
  35. filePodName = "file-test"
  36. )
  37. func fileSystemCSICloneTest(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, storageClassName, systemNamespace string) {
  38. // create pvc and app
  39. pvcSize := "1Gi"
  40. pvcName := "parent-pvc"
  41. podName := "demo-pod"
  42. readOnly := false
  43. mountPoint := "/var/lib/test"
  44. logger.Infof("create a PVC")
  45. err := helper.FSClient.CreatePVC(defaultNamespace, pvcName, storageClassName, "ReadWriteOnce", pvcSize)
  46. require.NoError(s.T(), err)
  47. require.True(s.T(), k8sh.WaitUntilPVCIsBound(defaultNamespace, pvcName), "Make sure PVC is Bound")
  48. logger.Infof("bind PVC to application")
  49. err = helper.FSClient.CreatePod(podName, pvcName, defaultNamespace, mountPoint, readOnly)
  50. assert.NoError(s.T(), err)
  51. logger.Infof("check pod is in running state")
  52. require.True(s.T(), k8sh.IsPodRunning(podName, defaultNamespace), "make sure pod is in running state")
  53. logger.Infof("Storage Mounted successfully")
  54. // write data to pvc get the checksum value
  55. logger.Infof("write data to pvc")
  56. cmd := fmt.Sprintf("dd if=/dev/zero of=%s/file.out bs=1MB count=10 status=none conv=fsync && md5sum %s/file.out", mountPoint, mountPoint)
  57. resp, err := k8sh.RunCommandInPod(defaultNamespace, podName, cmd)
  58. require.NoError(s.T(), err)
  59. pvcChecksum := strings.Fields(resp)
  60. require.Equal(s.T(), len(pvcChecksum), 2)
  61. clonePVCName := "clone-pvc"
  62. logger.Infof("create a new pvc from pvc")
  63. err = helper.FSClient.CreatePVCClone(defaultNamespace, clonePVCName, pvcName, storageClassName, "ReadWriteOnce", pvcSize)
  64. require.NoError(s.T(), err)
  65. require.True(s.T(), k8sh.WaitUntilPVCIsBound(defaultNamespace, clonePVCName), "Make sure PVC is Bound")
  66. clonePodName := "clone-pod"
  67. logger.Infof("bind PVC clone to application")
  68. err = helper.FSClient.CreatePod(clonePodName, clonePVCName, defaultNamespace, mountPoint, readOnly)
  69. assert.NoError(s.T(), err)
  70. logger.Infof("check pod is in running state")
  71. require.True(s.T(), k8sh.IsPodRunning(clonePodName, defaultNamespace), "make sure pod is in running state")
  72. logger.Infof("Storage Mounted successfully")
  73. // get the checksum of the data and validate it
  74. logger.Infof("check md5sum of both pvc and clone data is same")
  75. cmd = fmt.Sprintf("md5sum %s/file.out", mountPoint)
  76. resp, err = k8sh.RunCommandInPod(defaultNamespace, clonePodName, cmd)
  77. require.NoError(s.T(), err)
  78. clonePVCChecksum := strings.Fields(resp)
  79. require.Equal(s.T(), len(clonePVCChecksum), 2)
  80. // compare the checksum value and verify the values are equal
  81. assert.Equal(s.T(), clonePVCChecksum[0], pvcChecksum[0])
  82. // delete clone PVC and app
  83. logger.Infof("delete clone pod")
  84. err = k8sh.DeletePod(k8sutil.DefaultNamespace, clonePodName)
  85. require.NoError(s.T(), err)
  86. logger.Infof("delete clone pvc")
  87. err = helper.FSClient.DeletePVC(defaultNamespace, clonePVCName)
  88. assertNoErrorUnlessNotFound(s, err)
  89. assert.True(s.T(), k8sh.WaitUntilPVCIsDeleted(defaultNamespace, clonePVCName))
  90. // delete the parent PVC and app
  91. err = k8sh.DeletePod(k8sutil.DefaultNamespace, podName)
  92. require.NoError(s.T(), err)
  93. logger.Infof("delete parent pvc")
  94. err = helper.FSClient.DeletePVC(defaultNamespace, pvcName)
  95. assertNoErrorUnlessNotFound(s, err)
  96. assert.True(s.T(), k8sh.WaitUntilPVCIsDeleted(defaultNamespace, pvcName))
  97. }
  98. func fileSystemCSISnapshotTest(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, storageClassName, namespace string, testNFS bool) {
  99. logger.Infof("install snapshot CRD")
  100. err := k8sh.CreateSnapshotCRD()
  101. require.NoError(s.T(), err)
  102. logger.Infof("install snapshot controller")
  103. err = k8sh.CreateSnapshotController()
  104. require.NoError(s.T(), err)
  105. // cleanup the CRD and controller in defer to make sure the CRD and
  106. // controller are removed as block test also install CRD and controller.
  107. defer func() {
  108. logger.Infof("delete snapshot-controller")
  109. err = k8sh.DeleteSnapshotController()
  110. require.NoError(s.T(), err)
  111. logger.Infof("delete snapshot CRD")
  112. err = k8sh.DeleteSnapshotCRD()
  113. require.NoError(s.T(), err)
  114. }()
  115. logger.Infof("check snapshot controller is running")
  116. err = k8sh.WaitForSnapshotController(15)
  117. require.NoError(s.T(), err)
  118. // create snapshot class
  119. snapshotDeletePolicy := "Delete"
  120. snapshotClassName := "snapshot-testing"
  121. logger.Infof("create snapshotclass")
  122. if !testNFS {
  123. err = helper.FSClient.CreateSnapshotClass(snapshotClassName, snapshotDeletePolicy, namespace)
  124. } else {
  125. err = helper.NFSClient.CreateSnapshotClass(snapshotClassName, snapshotDeletePolicy)
  126. }
  127. require.NoError(s.T(), err)
  128. // create pvc and app
  129. pvcSize := "1Gi"
  130. pvcName := "snap-pvc"
  131. podName := "demo-pod"
  132. readOnly := false
  133. mountPoint := "/var/lib/test"
  134. logger.Infof("create a PVC")
  135. err = helper.FSClient.CreatePVC(defaultNamespace, pvcName, storageClassName, "ReadWriteOnce", pvcSize)
  136. require.NoError(s.T(), err)
  137. require.True(s.T(), k8sh.WaitUntilPVCIsBound(defaultNamespace, pvcName), "Make sure PVC is Bound")
  138. logger.Infof("bind PVC to application")
  139. err = helper.FSClient.CreatePod(podName, pvcName, defaultNamespace, mountPoint, readOnly)
  140. assert.NoError(s.T(), err)
  141. logger.Infof("check pod is in running state")
  142. require.True(s.T(), k8sh.IsPodRunning(podName, defaultNamespace), "make sure pod is in running state")
  143. logger.Infof("Storage Mounted successfully")
  144. // write data to pvc get the checksum value
  145. logger.Infof("write data to pvc")
  146. cmd := fmt.Sprintf("dd if=/dev/zero of=%s/file.out bs=1MB count=10 status=none conv=fsync && md5sum %s/file.out", mountPoint, mountPoint)
  147. resp, err := k8sh.RunCommandInPod(defaultNamespace, podName, cmd)
  148. require.NoError(s.T(), err)
  149. pvcChecksum := strings.Fields(resp)
  150. require.Equal(s.T(), len(pvcChecksum), 2)
  151. // create a snapshot
  152. snapshotName := "fs-pvc-snapshot"
  153. logger.Infof("create a snapshot from pvc")
  154. err = helper.FSClient.CreateSnapshot(snapshotName, pvcName, snapshotClassName, defaultNamespace)
  155. require.NoError(s.T(), err)
  156. restorePVCName := "restore-fs-pvc"
  157. // check snapshot is in ready state
  158. ready, err := k8sh.CheckSnapshotISReadyToUse(snapshotName, defaultNamespace, 15)
  159. require.NoError(s.T(), err)
  160. require.True(s.T(), ready, "make sure snapshot is in ready state")
  161. // create restore from snapshot and bind it to app
  162. logger.Infof("restore pvc to a new snapshot")
  163. err = helper.FSClient.CreatePVCRestore(defaultNamespace, restorePVCName, snapshotName, storageClassName, "ReadWriteOnce", pvcSize)
  164. require.NoError(s.T(), err)
  165. require.True(s.T(), k8sh.WaitUntilPVCIsBound(defaultNamespace, restorePVCName), "Make sure PVC is Bound")
  166. restorePodName := "restore-pod"
  167. logger.Infof("bind PVC Restore to application")
  168. err = helper.FSClient.CreatePod(restorePodName, restorePVCName, defaultNamespace, mountPoint, readOnly)
  169. assert.NoError(s.T(), err)
  170. logger.Infof("check pod is in running state")
  171. require.True(s.T(), k8sh.IsPodRunning(restorePodName, defaultNamespace), "make sure pod is in running state")
  172. logger.Infof("Storage Mounted successfully")
  173. // get the checksum of the data and validate it
  174. logger.Infof("check md5sum of both pvc and restore data is same")
  175. cmd = fmt.Sprintf("md5sum %s/file.out", mountPoint)
  176. resp, err = k8sh.RunCommandInPod(defaultNamespace, restorePodName, cmd)
  177. require.NoError(s.T(), err)
  178. restorePVCChecksum := strings.Fields(resp)
  179. require.Equal(s.T(), len(restorePVCChecksum), 2)
  180. // compare the checksum value and verify the values are equal
  181. assert.Equal(s.T(), restorePVCChecksum[0], pvcChecksum[0])
  182. // delete clone PVC and app
  183. logger.Infof("delete restore pod")
  184. err = k8sh.DeletePod(k8sutil.DefaultNamespace, restorePodName)
  185. require.NoError(s.T(), err)
  186. logger.Infof("delete restore pvc")
  187. err = helper.FSClient.DeletePVC(defaultNamespace, restorePVCName)
  188. assertNoErrorUnlessNotFound(s, err)
  189. assert.True(s.T(), k8sh.WaitUntilPVCIsDeleted(defaultNamespace, restorePVCName))
  190. // delete the snapshot
  191. logger.Infof("delete snapshot")
  192. err = helper.FSClient.DeleteSnapshot(snapshotName, pvcName, snapshotClassName, defaultNamespace)
  193. require.NoError(s.T(), err)
  194. logger.Infof("delete application pod")
  195. // delete the parent PVC and app
  196. err = k8sh.DeletePod(k8sutil.DefaultNamespace, podName)
  197. require.NoError(s.T(), err)
  198. logger.Infof("delete parent pvc")
  199. err = helper.FSClient.DeletePVC(defaultNamespace, pvcName)
  200. assertNoErrorUnlessNotFound(s, err)
  201. assert.True(s.T(), k8sh.WaitUntilPVCIsDeleted(defaultNamespace, pvcName))
  202. logger.Infof("delete snapshotclass")
  203. if !testNFS {
  204. err = helper.FSClient.DeleteSnapshotClass(snapshotClassName, snapshotDeletePolicy, namespace)
  205. } else {
  206. err = helper.NFSClient.DeleteSnapshotClass(snapshotClassName, snapshotDeletePolicy)
  207. }
  208. require.NoError(s.T(), err)
  209. logger.Infof("delete snapshot-controller")
  210. }
  211. // Smoke Test for File System Storage - Test check the following operations on Filesystem Storage in order
  212. // Create,Mount,Write,Read,Unmount and Delete.
  213. func runFileE2ETest(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, filesystemName string, preserveFilesystemOnDelete bool) {
  214. defer fileTestDataCleanUp(helper, k8sh, s, filePodName, settings.Namespace, filesystemName)
  215. logger.Infof("Running on Rook Cluster %s", settings.Namespace)
  216. logger.Infof("File Storage End To End Integration Test - create, mount, write to, read from, and unmount")
  217. activeCount := 2
  218. createFilesystem(helper, k8sh, s, settings, filesystemName, activeCount)
  219. if preserveFilesystemOnDelete {
  220. _, err := k8sh.Kubectl("-n", settings.Namespace, "patch", "CephFilesystem", filesystemName, "--type=merge", "-p", `{"spec": {"preserveFilesystemOnDelete": true}}`)
  221. assert.NoError(s.T(), err)
  222. }
  223. // Create a test pod where CephFS is consumed without user creds
  224. storageClassName := "cephfs-storageclass"
  225. err := helper.FSClient.CreateStorageClass(filesystemName, settings.OperatorNamespace, settings.Namespace, storageClassName)
  226. assert.NoError(s.T(), err)
  227. createFilesystemConsumerPod(helper, k8sh, s, settings, filesystemName, storageClassName)
  228. // Test reading and writing to the first pod
  229. err = writeAndReadToFilesystem(helper, k8sh, s, settings.Namespace, filePodName, "test_file")
  230. assert.NoError(s.T(), err)
  231. t := s.T()
  232. ctx := context.TODO()
  233. // TODO: there is a regression here where MDSes don't actually scale down, and this test
  234. // wasn't catching it. Enabling this test causes the controller to enter into a new reconcile
  235. // loop and makes the next phase of the test take much longer than it should, making it flaky.
  236. // Rook issue https://github.com/rook/rook/issues/9857 is tracking this issue.
  237. // t.Run("filesystem should be able to be scaled down", func(t *testing.T) {
  238. // downscaleMetadataServers(helper, k8sh, t, settings.Namespace, filesystemName)
  239. // })
  240. subvolGroupName := "my-subvolume-group"
  241. t.Run("install CephFilesystemSubVolumeGroup", func(t *testing.T) {
  242. err = helper.FSClient.CreateSubvolumeGroup(filesystemName, subvolGroupName)
  243. assert.NoError(t, err)
  244. })
  245. t.Run("delete CephFilesystem should be blocked by csi volumes and CephFilesystemSubVolumeGroup", func(t *testing.T) {
  246. // NOTE: CephFilesystems do not set "Deleting" phase when they are deleting, so we can't
  247. // rely on that here
  248. err := k8sh.RookClientset.CephV1().CephFilesystems(settings.Namespace).Delete(
  249. ctx, filesystemName, metav1.DeleteOptions{})
  250. assert.NoError(t, err)
  251. var cond *cephv1.Condition
  252. err = wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, 45*time.Second, true, func(context context.Context) (done bool, err error) {
  253. logger.Infof("waiting for CephFilesystem %q in namespace %q to have condition %q",
  254. filesystemName, settings.Namespace, cephv1.ConditionDeletionIsBlocked)
  255. fs, err := k8sh.RookClientset.CephV1().CephFilesystems(settings.Namespace).Get(
  256. ctx, filesystemName, metav1.GetOptions{})
  257. if err != nil {
  258. return false, err
  259. }
  260. logger.Infof("conditions: %+v", fs.Status.Conditions)
  261. cond = cephv1.FindStatusCondition(fs.Status.Conditions, cephv1.ConditionDeletionIsBlocked)
  262. if cond != nil {
  263. logger.Infof("CephFilesystem %q in namespace %q has condition %q",
  264. filesystemName, settings.Namespace, cephv1.ConditionDeletionIsBlocked)
  265. return true, nil
  266. }
  267. return false, nil
  268. })
  269. assert.NoError(t, err)
  270. if cond == nil {
  271. return
  272. }
  273. logger.Infof("verifying CephFilesystem %q condition %q is correct: %+v",
  274. filesystemName, cephv1.ConditionDeletionIsBlocked, cond)
  275. assert.Equal(t, v1.ConditionTrue, cond.Status)
  276. assert.Equal(t, cephv1.ObjectHasDependentsReason, cond.Reason)
  277. // the CephFilesystemSubVolumeGroup and the "csi" subvolumegroup should both block deletion
  278. assert.Contains(t, cond.Message, "CephFilesystemSubVolumeGroups")
  279. assert.Contains(t, cond.Message, subvolGroupName)
  280. assert.Contains(t, cond.Message, "filesystem subvolume groups that contain subvolumes")
  281. assert.Contains(t, cond.Message, "csi")
  282. })
  283. t.Run("deleting CephFilesystemSubVolumeGroup should partially unblock CephFilesystem deletion", func(t *testing.T) {
  284. err = helper.FSClient.DeleteSubvolumeGroup(filesystemName, subvolGroupName)
  285. assert.NoError(t, err)
  286. var cond *cephv1.Condition
  287. err = wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, 18*time.Second, true, func(context context.Context) (done bool, err error) {
  288. logger.Infof("waiting for CephFilesystem %q in namespace %q no longer be blocked by CephFilesystemSubVolumeGroups",
  289. filesystemName, settings.Namespace)
  290. fs, err := k8sh.RookClientset.CephV1().CephFilesystems(settings.Namespace).Get(
  291. ctx, filesystemName, metav1.GetOptions{})
  292. if err != nil {
  293. return false, err
  294. }
  295. cond = cephv1.FindStatusCondition(fs.Status.Conditions, cephv1.ConditionDeletionIsBlocked)
  296. if cond == nil {
  297. logger.Warningf("could not find condition %q on CephFilesystem %q", cephv1.ConditionDeletionIsBlocked, filesystemName)
  298. return false, nil
  299. }
  300. if !strings.Contains(cond.Message, "CephFilesystemSubVolumeGroup") {
  301. logger.Infof("CephFilesystem %q deletion is no longer blocked by CephFilesystemSubVolumeGroups", filesystemName)
  302. return true, nil
  303. }
  304. return false, nil
  305. })
  306. assert.NoError(t, err)
  307. if cond == nil {
  308. return
  309. }
  310. logger.Infof("verifying CephFilesystem %q condition %q is correct: %+v",
  311. filesystemName, cephv1.ConditionDeletionIsBlocked, cond)
  312. assert.Equal(t, v1.ConditionTrue, cond.Status)
  313. assert.Equal(t, cephv1.ObjectHasDependentsReason, cond.Reason)
  314. // only the raw subvolumegroups should block deletion
  315. assert.Contains(t, cond.Message, "filesystem subvolume groups that contain subvolumes")
  316. assert.Contains(t, cond.Message, "csi")
  317. })
  318. t.Run("deleting filesystem consumer pod+pvc should fully unblock CephFilesystem deletion", func(t *testing.T) {
  319. // Cleanup the filesystem and its clients
  320. cleanupFilesystemConsumer(helper, k8sh, s, settings.Namespace, filePodName)
  321. err = wait.PollUntilContextTimeout(context.TODO(), 3*time.Second, 30*time.Second, true, func(context context.Context) (done bool, err error) {
  322. logger.Infof("waiting for CephFilesystem %q in namespace %q to be deleted", filesystemName, settings.Namespace)
  323. _, err = k8sh.RookClientset.CephV1().CephFilesystems(settings.Namespace).Get(
  324. ctx, filesystemName, metav1.GetOptions{})
  325. if err != nil && kerrors.IsNotFound(err) {
  326. return true, nil
  327. }
  328. return false, nil
  329. })
  330. logger.Infof("CephFilesystem %q in namespace %q was deleted successfully", filesystemName, settings.Namespace)
  331. })
  332. err = helper.FSClient.DeleteStorageClass(storageClassName)
  333. assertNoErrorUnlessNotFound(s, err)
  334. if preserveFilesystemOnDelete {
  335. fses, err := helper.FSClient.List(settings.Namespace)
  336. assert.NoError(s.T(), err)
  337. assert.Len(s.T(), fses, 1)
  338. assert.Equal(s.T(), fses[0].Name, filesystemName)
  339. err = helper.FSClient.Delete(filesystemName, settings.Namespace)
  340. assert.NoError(s.T(), err)
  341. }
  342. }
  343. func createFilesystemConsumerPod(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, filesystemName, storageClassName string) {
  344. err := createPodWithFilesystem(k8sh, s, settings, filePodName, filesystemName, storageClassName, false)
  345. require.NoError(s.T(), err)
  346. filePodRunning := k8sh.IsPodRunning(filePodName, settings.Namespace)
  347. require.True(s.T(), filePodRunning, "make sure file-test pod is in running state")
  348. logger.Infof("File system mounted successfully")
  349. }
  350. func writeAndReadToFilesystem(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, namespace, podName, filename string) error {
  351. logger.Infof("Write to file system")
  352. message := "Test Data for file system storage"
  353. if err := k8sh.WriteToPod(namespace, podName, filename, message); err != nil {
  354. return err
  355. }
  356. return k8sh.ReadFromPod(namespace, podName, filename, message)
  357. }
  358. // func downscaleMetadataServers(helper *clients.TestClient, k8sh *utils.K8sHelper, t *testing.T, namespace, fsName string) {
  359. // logger.Infof("downscaling file system metadata servers")
  360. // err := helper.FSClient.ScaleDown(fsName, namespace)
  361. // require.Nil(t, err)
  362. // }
  363. func cleanupFilesystemConsumer(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, namespace string, podName string) {
  364. logger.Infof("Delete file System consumer")
  365. err := k8sh.DeletePod(namespace, podName)
  366. assert.Nil(s.T(), err)
  367. if !k8sh.IsPodTerminated(podName, namespace) {
  368. k8sh.PrintPodDescribe(namespace, podName)
  369. assert.Fail(s.T(), fmt.Sprintf("make sure %s pod is terminated", podName))
  370. }
  371. err = helper.FSClient.DeletePVC(namespace, podName)
  372. assertNoErrorUnlessNotFound(s, err)
  373. isdeleted := k8sh.WaitUntilPVCIsDeleted(namespace, podName)
  374. if !isdeleted {
  375. assert.Fail(s.T(), fmt.Sprintf("Failed to delete PVC %q", podName))
  376. }
  377. isPVListZero := k8sh.WaitUntilZeroPVs()
  378. if !isPVListZero {
  379. assert.Fail(s.T(), "PV list is not zero")
  380. }
  381. logger.Infof("File system consumer deleted")
  382. }
  383. // cleanupFilesystem cleans up the filesystem and checks if all mds pods are terminated before continuing
  384. func cleanupFilesystem(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, namespace string, filesystemName string) {
  385. logger.Infof("Deleting file system")
  386. err := helper.FSClient.Delete(filesystemName, namespace)
  387. assert.Nil(s.T(), err)
  388. logger.Infof("File system %s deleted", filesystemName)
  389. }
  390. // Test File System Creation on Rook that was installed on a custom namespace i.e. Namespace != "rook" and delete it again
  391. func runFileE2ETestLite(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, filesystemName string) {
  392. logger.Infof("File Storage End to End Integration Test - create Filesystem and make sure mds pod is running")
  393. logger.Infof("Running on Rook Cluster %s", settings.Namespace)
  394. activeCount := 1
  395. createFilesystem(helper, k8sh, s, settings, filesystemName, activeCount)
  396. // Create a test pod where CephFS is consumed without user creds
  397. storageClassName := "cephfs-storageclass"
  398. err := helper.FSClient.CreateStorageClass(filesystemName, settings.OperatorNamespace, settings.Namespace, storageClassName)
  399. assert.NoError(s.T(), err)
  400. assert.NoError(s.T(), err)
  401. fileSystemCSISnapshotTest(helper, k8sh, s, storageClassName, settings.Namespace, false)
  402. fileSystemCSICloneTest(helper, k8sh, s, storageClassName, settings.Namespace)
  403. cleanupFilesystem(helper, k8sh, s, settings.Namespace, filesystemName)
  404. err = helper.FSClient.DeleteStorageClass(storageClassName)
  405. assertNoErrorUnlessNotFound(s, err)
  406. }
  407. func createFilesystem(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, filesystemName string, activeCount int) {
  408. logger.Infof("Create file System")
  409. fscErr := helper.FSClient.Create(filesystemName, settings.Namespace, activeCount)
  410. require.Nil(s.T(), fscErr)
  411. var err error
  412. var filesystemList []cephclient.CephFilesystem
  413. for i := 1; i <= 10; i++ {
  414. filesystemList, err = helper.FSClient.List(settings.Namespace)
  415. if err != nil {
  416. logger.Errorf("failed to list fs. trying again. %v", err)
  417. continue
  418. }
  419. logger.Debugf("filesystemList is %+v", filesystemList)
  420. if len(filesystemList) == 1 {
  421. logger.Infof("File system %s created", filesystemList[0].Name)
  422. break
  423. }
  424. logger.Infof("Waiting for file system %s to be created", filesystemName)
  425. time.Sleep(time.Second * 5)
  426. }
  427. logger.Debugf("filesystemList is %+v", filesystemList)
  428. require.Equal(s.T(), 1, len(filesystemList), "There should be one shared file system present")
  429. }
  430. func fileTestDataCleanUp(helper *clients.TestClient, k8sh *utils.K8sHelper, s *suite.Suite, podName string, namespace string, filesystemName string) {
  431. logger.Infof("Cleaning up file system")
  432. err := k8sh.DeletePod(namespace, podName)
  433. assert.NoError(s.T(), err)
  434. err = helper.FSClient.Delete(filesystemName, namespace)
  435. assert.NoError(s.T(), err)
  436. }
  437. func createPodWithFilesystem(k8sh *utils.K8sHelper, s *suite.Suite, settings *installer.TestCephSettings, podName, filesystemName, storageClassName string, mountUser bool) error {
  438. testPodManifest := getFilesystemCSITestPod(settings, podName, storageClassName)
  439. if err := k8sh.ResourceOperation("create", testPodManifest); err != nil {
  440. return fmt.Errorf("failed to create pod -- %s. %+v", testPodManifest, err)
  441. }
  442. return nil
  443. }
  444. func getFilesystemCSITestPod(settings *installer.TestCephSettings, podName, storageClassName string) string {
  445. claimName := podName
  446. return `
  447. apiVersion: v1
  448. kind: PersistentVolumeClaim
  449. metadata:
  450. name: ` + claimName + `
  451. namespace: ` + settings.Namespace + `
  452. spec:
  453. accessModes:
  454. - ReadWriteOnce
  455. resources:
  456. requests:
  457. storage: 1Gi
  458. storageClassName: ` + storageClassName + `
  459. ---
  460. apiVersion: v1
  461. kind: Pod
  462. metadata:
  463. name: ` + podName + `
  464. namespace: ` + settings.Namespace + `
  465. spec:
  466. containers:
  467. - name: ` + podName + `
  468. image: busybox
  469. command:
  470. - sh
  471. - "-c"
  472. - "touch ` + utils.TestMountPath + `/csi.test && sleep 3600"
  473. imagePullPolicy: IfNotPresent
  474. env:
  475. volumeMounts:
  476. - mountPath: ` + utils.TestMountPath + `
  477. name: csivol
  478. volumes:
  479. - name: csivol
  480. persistentVolumeClaim:
  481. claimName: ` + claimName + `
  482. readOnly: false
  483. restartPolicy: Never
  484. `
  485. }
  486. func waitForFilesystemActive(k8sh *utils.K8sHelper, clusterInfo *cephclient.ClusterInfo, filesystemName string) error {
  487. command, args := cephclient.FinalizeCephCommandArgs("ceph", clusterInfo, []string{"fs", "status", filesystemName}, k8sh.MakeContext().ConfigDir)
  488. var stat string
  489. var err error
  490. logger.Infof("waiting for filesystem %q to be active", filesystemName)
  491. for i := 0; i < utils.RetryLoop; i++ {
  492. // run the ceph fs status command
  493. stat, err := k8sh.MakeContext().Executor.ExecuteCommandWithCombinedOutput(command, args...)
  494. if err != nil {
  495. logger.Warningf("failed to get filesystem %q status. %+v", filesystemName, err)
  496. }
  497. // as long as at least one mds is active, it's okay
  498. if strings.Contains(stat, "active") {
  499. logger.Infof("done waiting for filesystem %q to be active", filesystemName)
  500. return nil
  501. }
  502. logger.Infof("waiting for filesystem %q to be active. status=%s", filesystemName, stat)
  503. time.Sleep(utils.RetryInterval * time.Second)
  504. }
  505. return fmt.Errorf("gave up waiting to get filesystem %q status [err: %+v] Status returned:\n%s", filesystemName, err, stat)
  506. }