ceph_installer.go 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064
  1. /*
  2. Copyright 2016 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package installer
  14. import (
  15. "context"
  16. "flag"
  17. "fmt"
  18. "math/rand"
  19. "os"
  20. "path"
  21. "testing"
  22. "time"
  23. "github.com/google/uuid"
  24. "github.com/pkg/errors"
  25. cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
  26. "github.com/rook/rook/pkg/daemon/ceph/client"
  27. "github.com/rook/rook/pkg/operator/ceph/cluster"
  28. "github.com/rook/rook/pkg/operator/ceph/cluster/mon"
  29. "github.com/rook/rook/tests/framework/utils"
  30. "github.com/stretchr/testify/assert"
  31. v1 "k8s.io/api/core/v1"
  32. kerrors "k8s.io/apimachinery/pkg/api/errors"
  33. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  34. "k8s.io/apimachinery/pkg/util/wait"
  35. "k8s.io/client-go/kubernetes"
  36. )
  37. const (
  38. // test with the latest releases
  39. quincyTestImage = "quay.io/ceph/ceph:v17"
  40. reefTestImage = "quay.io/ceph/ceph:v18"
  41. // test with the current development versions
  42. quincyDevelTestImage = "quay.io/ceph/daemon-base:latest-quincy-devel"
  43. reefDevelTestImage = "quay.io/ceph/daemon-base:latest-reef-devel"
  44. // test with the latest Ceph main image
  45. mainTestImage = "quay.io/ceph/daemon-base:latest-main-devel"
  46. cephOperatorLabel = "app=rook-ceph-operator"
  47. defaultclusterName = "test-cluster"
  48. clusterCustomSettings = `
  49. [global]
  50. osd_pool_default_size = 1
  51. bdev_flock_retry = 20
  52. mon_warn_on_pool_no_redundancy = false
  53. bluefs_buffered_io = false
  54. mon_data_avail_warn = 10
  55. [mon]
  56. mon compact on start = true
  57. `
  58. volumeReplicationVersion = "v0.5.0"
  59. )
  60. var (
  61. QuincyVersion = cephv1.CephVersionSpec{Image: quincyTestImage}
  62. QuincyDevelVersion = cephv1.CephVersionSpec{Image: quincyDevelTestImage}
  63. ReefVersion = cephv1.CephVersionSpec{Image: reefTestImage}
  64. ReefDevelVersion = cephv1.CephVersionSpec{Image: reefDevelTestImage}
  65. MainVersion = cephv1.CephVersionSpec{Image: mainTestImage, AllowUnsupported: true}
  66. volumeReplicationBaseURL = fmt.Sprintf("https://raw.githubusercontent.com/csi-addons/kubernetes-csi-addons/%s/config/crd/bases/", volumeReplicationVersion)
  67. volumeReplicationCRDURL = volumeReplicationBaseURL + "replication.storage.openshift.io_volumereplications.yaml"
  68. volumeReplicationClassCRDURL = volumeReplicationBaseURL + "replication.storage.openshift.io_volumereplicationclasses.yaml"
  69. )
  70. // CephInstaller wraps installing and uninstalling rook on a platform
  71. type CephInstaller struct {
  72. settings *TestCephSettings
  73. Manifests CephManifests
  74. k8shelper *utils.K8sHelper
  75. hostPathToDelete string
  76. helmHelper *utils.HelmHelper
  77. k8sVersion string
  78. changeHostnames bool
  79. T func() *testing.T
  80. }
  81. func ReturnCephVersion() cephv1.CephVersionSpec {
  82. switch os.Getenv("CEPH_SUITE_VERSION") {
  83. case "main":
  84. return MainVersion
  85. case "quincy-devel":
  86. return QuincyDevelVersion
  87. case "reef-devel":
  88. return ReefDevelVersion
  89. default:
  90. return ReefDevelVersion
  91. }
  92. }
  93. // CreateCephOperator creates rook-operator via kubectl
  94. func (h *CephInstaller) CreateCephOperator() (err error) {
  95. // creating rook resources
  96. logger.Info("Creating Rook CRDs")
  97. resources := h.Manifests.GetCRDs(h.k8shelper)
  98. if _, err = h.k8shelper.KubectlWithStdin(resources, createFromStdinArgs...); err != nil {
  99. return err
  100. }
  101. if h.changeHostnames {
  102. // give nodes a hostname that is different from its k8s node name to confirm that all the daemons will be initialized properly
  103. err = h.k8shelper.ChangeHostnames()
  104. assert.NoError(h.T(), err)
  105. }
  106. // The operator namespace needs to be created explicitly, while the cluster namespace is created with the common.yaml
  107. if err := h.k8shelper.CreateNamespace(h.settings.OperatorNamespace); err != nil {
  108. return err
  109. }
  110. // Create the namespace and RBAC before starting the operator
  111. _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetCommon(), createFromStdinArgs...)
  112. if err != nil {
  113. return errors.Errorf("Failed to create rook-operator pod: %v ", err)
  114. }
  115. if h.settings.TestNFSCSI {
  116. csiNFSRBAC := h.Manifests.GetCSINFSRBAC()
  117. if _, err = h.k8shelper.KubectlWithStdin(csiNFSRBAC, createFromStdinArgs...); err != nil {
  118. return err
  119. }
  120. }
  121. if err := h.CreateVolumeReplicationCRDs(); err != nil {
  122. return errors.Wrap(err, "failed to create volume replication CRDs")
  123. }
  124. _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetOperator(), createFromStdinArgs...)
  125. if err != nil {
  126. return errors.Errorf("Failed to create rook-operator pod: %v", err)
  127. }
  128. logger.Infof("Rook operator started")
  129. return nil
  130. }
  131. func (h *CephInstaller) CreateVolumeReplicationCRDs() (err error) {
  132. if !h.Manifests.Settings().EnableVolumeReplication {
  133. logger.Info("volume replication CRDs skipped")
  134. return nil
  135. }
  136. logger.Info("Creating volume replication CRDs")
  137. if _, err := h.k8shelper.KubectlWithStdin(readManifestFromURL(volumeReplicationCRDURL), createFromStdinArgs...); err != nil {
  138. return errors.Wrap(err, "failed to create volumereplication CRD")
  139. }
  140. if _, err := h.k8shelper.KubectlWithStdin(readManifestFromURL(volumeReplicationClassCRDURL), createFromStdinArgs...); err != nil {
  141. return errors.Wrap(err, "failed to create volumereplicationclass CRD")
  142. }
  143. return nil
  144. }
  145. func (h *CephInstaller) WaitForToolbox(namespace string) error {
  146. if err := h.k8shelper.WaitForLabeledPodsToRun("app=rook-ceph-tools", namespace); err != nil {
  147. return errors.Wrap(err, "Rook Toolbox couldn't start")
  148. }
  149. logger.Infof("Rook Toolbox started")
  150. podNames, err := h.k8shelper.GetPodNamesForApp("rook-ceph-tools", namespace)
  151. assert.NoError(h.T(), err)
  152. for _, podName := range podNames {
  153. // All e2e tests should run ceph commands in the toolbox since we are not inside a container
  154. logger.Infof("found active toolbox pod: %q", podName)
  155. client.RunAllCephCommandsInToolboxPod = podName
  156. return nil
  157. }
  158. return errors.Errorf("could not find toolbox pod")
  159. }
  160. // CreateRookToolbox creates rook-ceph-tools via kubectl
  161. func (h *CephInstaller) CreateRookToolbox(manifests CephManifests) (err error) {
  162. logger.Infof("Starting Rook toolbox")
  163. _, err = h.k8shelper.KubectlWithStdin(manifests.GetToolbox(), createFromStdinArgs...)
  164. if err != nil {
  165. return errors.Wrap(err, "failed to create rook-toolbox pod")
  166. }
  167. return nil
  168. }
  169. // Execute a command in the ceph toolbox
  170. func (h *CephInstaller) Execute(command string, parameters []string, namespace string) (error, string) {
  171. clusterInfo := client.AdminTestClusterInfo(namespace)
  172. cmd, args := client.FinalizeCephCommandArgs(command, clusterInfo, parameters, h.k8shelper.MakeContext().ConfigDir)
  173. result, err := h.k8shelper.MakeContext().Executor.ExecuteCommandWithOutput(cmd, args...)
  174. if err != nil {
  175. logger.Warningf("Error executing command %q: <%v>", command, err)
  176. return err, result
  177. }
  178. return nil, result
  179. }
  180. // CreateCephCluster creates rook cluster via kubectl
  181. func (h *CephInstaller) CreateCephCluster() error {
  182. ctx := context.TODO()
  183. var err error
  184. h.settings.DataDirHostPath, err = h.initTestDir(h.settings.Namespace)
  185. if err != nil {
  186. return errors.Errorf("failed to create test dir. %+v", err)
  187. }
  188. logger.Infof("Creating cluster with settings: %+v", h.settings)
  189. logger.Infof("Creating custom ceph.conf settings")
  190. customSettings := map[string]string{"config": clusterCustomSettings}
  191. customCM := &v1.ConfigMap{
  192. ObjectMeta: metav1.ObjectMeta{
  193. Name: "rook-config-override",
  194. Namespace: h.settings.Namespace,
  195. },
  196. Data: customSettings,
  197. }
  198. if _, err := h.k8shelper.Clientset.CoreV1().ConfigMaps(h.settings.Namespace).Create(ctx, customCM, metav1.CreateOptions{}); err != nil {
  199. return errors.Errorf("failed to create custom ceph.conf. %+v", err)
  200. }
  201. customCSISettings := map[string]string{
  202. "ceph.conf": "[global]\nauth_client_required = cephx",
  203. }
  204. customCSICM := &v1.ConfigMap{
  205. ObjectMeta: metav1.ObjectMeta{
  206. Name: "csi-ceph-conf-override",
  207. Namespace: h.settings.Namespace,
  208. },
  209. Data: customCSISettings,
  210. }
  211. if _, err := h.k8shelper.Clientset.CoreV1().ConfigMaps(h.settings.Namespace).Create(ctx, customCSICM, metav1.CreateOptions{}); err != nil {
  212. return errors.Errorf("failed to create custom csi ceph.conf. %+v", err)
  213. }
  214. logger.Info("Starting Rook Cluster")
  215. rookCluster := h.Manifests.GetCephCluster()
  216. logger.Info(rookCluster)
  217. maxTry := 10
  218. for i := 0; i < maxTry; i++ {
  219. _, err := h.k8shelper.KubectlWithStdin(rookCluster, createFromStdinArgs...)
  220. if err == nil {
  221. break
  222. }
  223. if i == maxTry-1 {
  224. return errors.Errorf("failed to create rook cluster. %v", err)
  225. }
  226. logger.Infof("failed to create rook cluster, trying again... %v", err)
  227. time.Sleep(5 * time.Second)
  228. }
  229. return nil
  230. }
  231. func (h *CephInstaller) waitForCluster() error {
  232. if err := h.k8shelper.WaitForPodCount("app=rook-ceph-mon", h.settings.Namespace, h.settings.Mons); err != nil {
  233. return err
  234. }
  235. if err := h.k8shelper.WaitForPodCount("app=rook-ceph-mgr", h.settings.Namespace, 1); err != nil {
  236. return err
  237. }
  238. if !h.settings.SkipOSDCreation {
  239. if err := h.k8shelper.WaitForPodCount("app=rook-ceph-osd", h.settings.Namespace, 1); err != nil {
  240. return err
  241. }
  242. }
  243. if h.settings.UseCrashPruner {
  244. if err := h.k8shelper.WaitForCronJob("rook-ceph-crashcollector-pruner", h.settings.Namespace); err != nil {
  245. return err
  246. }
  247. }
  248. logger.Infof("Rook Cluster started")
  249. if !h.settings.SkipOSDCreation {
  250. return h.k8shelper.WaitForLabeledPodsToRun("app=rook-ceph-osd", h.settings.Namespace)
  251. }
  252. return nil
  253. }
  254. // CreateRookExternalCluster creates rook external cluster via kubectl
  255. func (h *CephInstaller) CreateRookExternalCluster(externalManifests CephManifests) error {
  256. var err error
  257. externalSettings := externalManifests.Settings()
  258. externalSettings.DataDirHostPath, err = h.initTestDir(externalSettings.Namespace)
  259. if err != nil {
  260. return errors.Errorf("failed to create test dir. %+v", err)
  261. }
  262. logger.Infof("Creating external cluster %q with core storage namespace %q", externalSettings.Namespace, h.settings.Namespace)
  263. logger.Infof("Creating external cluster roles")
  264. roles := externalManifests.GetCommonExternal()
  265. if _, err := h.k8shelper.KubectlWithStdin(roles, createFromStdinArgs...); err != nil {
  266. return errors.Wrap(err, "failed to create cluster roles")
  267. }
  268. // Inject connection information from the first cluster
  269. logger.Info("Injecting cluster connection information")
  270. err = h.injectRookExternalClusterInfo(externalSettings)
  271. if err != nil {
  272. return errors.Wrap(err, "failed to inject cluster information into the external cluster")
  273. }
  274. // Start the external cluster
  275. logger.Infof("Starting Rook External Cluster with yaml")
  276. rookCluster := externalManifests.GetExternalCephCluster()
  277. if _, err := h.k8shelper.KubectlWithStdin(rookCluster, createFromStdinArgs...); err != nil {
  278. return errors.Wrap(err, "failed to create rook external cluster")
  279. }
  280. logger.Infof("Running toolbox on external namespace %q", externalSettings.Namespace)
  281. if err := h.CreateRookToolbox(externalManifests); err != nil {
  282. return errors.Wrap(err, "failed to start toolbox on external cluster")
  283. }
  284. if err := h.WaitForToolbox(externalManifests.Settings().Namespace); err != nil {
  285. return errors.Wrap(err, "failed to wait for toolbox on external cluster")
  286. }
  287. var clusterStatus cephv1.ClusterStatus
  288. for i := 0; i < 16; i++ {
  289. ctx := context.TODO()
  290. clusterResource, err := h.k8shelper.RookClientset.CephV1().CephClusters(externalSettings.Namespace).Get(ctx, externalSettings.ClusterName, metav1.GetOptions{})
  291. if err != nil {
  292. logger.Warningf("failed to get external cluster CR, retrying. %v", err)
  293. time.Sleep(time.Second * 5)
  294. continue
  295. }
  296. clusterStatus = clusterResource.Status
  297. clusterPhase := string(clusterResource.Status.Phase)
  298. if clusterPhase != "Connected" {
  299. logger.Warningf("failed to start external cluster, retrying, state: %v", clusterResource.Status)
  300. time.Sleep(time.Second * 5)
  301. } else if clusterPhase == "Connected" {
  302. logger.Info("Rook external cluster connected")
  303. return nil
  304. }
  305. }
  306. return errors.Errorf("failed to start external cluster, state: %v", clusterStatus)
  307. }
  308. // InjectRookExternalClusterInfo inject connection information for an external cluster
  309. func (h *CephInstaller) injectRookExternalClusterInfo(externalSettings *TestCephSettings) error {
  310. ctx := context.TODO()
  311. // get config map
  312. cm, err := h.GetRookExternalClusterMonConfigMap()
  313. if err != nil {
  314. return errors.Errorf("failed to get configmap. %v", err)
  315. }
  316. // create config map
  317. _, err = h.k8shelper.Clientset.CoreV1().ConfigMaps(externalSettings.Namespace).Create(ctx, cm, metav1.CreateOptions{})
  318. if err != nil {
  319. return errors.Errorf("failed to create configmap. %v", err)
  320. }
  321. // get secret
  322. secret, err := h.GetRookExternalClusterMonSecret()
  323. if err != nil {
  324. return errors.Errorf("failed to get secret. %v", err)
  325. }
  326. // create secret
  327. _, err = h.k8shelper.Clientset.CoreV1().Secrets(externalSettings.Namespace).Create(ctx, secret, metav1.CreateOptions{})
  328. if err != nil {
  329. return errors.Errorf("failed to create secret. %v", err)
  330. }
  331. return nil
  332. }
  333. // GetRookExternalClusterMonConfigMap gets the monitor kubernetes configmap of the external cluster
  334. func (h *CephInstaller) GetRookExternalClusterMonConfigMap() (*v1.ConfigMap, error) {
  335. ctx := context.TODO()
  336. configMapName := "rook-ceph-mon-endpoints"
  337. externalCM, err := h.k8shelper.Clientset.CoreV1().ConfigMaps(h.settings.Namespace).Get(ctx, configMapName, metav1.GetOptions{})
  338. if err != nil {
  339. return nil, errors.Errorf("failed to get secret. %v", err)
  340. }
  341. newCM := &v1.ConfigMap{}
  342. newCM.Name = externalCM.Name
  343. newCM.Data = externalCM.Data
  344. return newCM, nil
  345. }
  346. // GetRookExternalClusterMonSecret gets the monitor kubernetes secret of the external cluster
  347. func (h *CephInstaller) GetRookExternalClusterMonSecret() (*v1.Secret, error) {
  348. ctx := context.TODO()
  349. secretName := "rook-ceph-mon" //nolint:gosec // We safely suppress gosec in tests file
  350. externalSecret, err := h.k8shelper.Clientset.CoreV1().Secrets(h.settings.Namespace).Get(ctx, secretName, metav1.GetOptions{})
  351. if err != nil {
  352. return nil, errors.Errorf("failed to get secret. %v", err)
  353. }
  354. newSecret := &v1.Secret{}
  355. newSecret.Name = externalSecret.Name
  356. newSecret.Data = externalSecret.Data
  357. return newSecret, nil
  358. }
  359. func (h *CephInstaller) initTestDir(namespace string) (string, error) {
  360. val, err := baseTestDir()
  361. if err != nil {
  362. return "", err
  363. }
  364. h.hostPathToDelete = path.Join(val, "rook-test")
  365. testDir := path.Join(h.hostPathToDelete, namespace)
  366. // skip the test dir creation if we are not running under "/data"
  367. if val != "/data" {
  368. // Create the test dir on the local host
  369. if err := os.MkdirAll(testDir, 0777); err != nil {
  370. return "", err
  371. }
  372. var err error
  373. if testDir, err = os.MkdirTemp(testDir, "test-"); err != nil {
  374. return "", err
  375. }
  376. } else {
  377. // Compose a random test directory name without actually creating it since not running on the localhost
  378. r := rand.Int() //nolint:gosec // We safely suppress gosec in tests file
  379. testDir = path.Join(testDir, fmt.Sprintf("test-%d", r))
  380. }
  381. return testDir, nil
  382. }
  383. // GetNodeHostnames returns the list of nodes in the k8s cluster
  384. func (h *CephInstaller) GetNodeHostnames() ([]string, error) {
  385. ctx := context.TODO()
  386. nodes, err := h.k8shelper.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
  387. if err != nil {
  388. return nil, errors.Errorf("failed to get k8s nodes. %+v", err)
  389. }
  390. var names []string
  391. for _, node := range nodes.Items {
  392. names = append(names, node.Labels[v1.LabelHostname])
  393. }
  394. return names, nil
  395. }
  396. func (h *CephInstaller) installRookOperator() (bool, error) {
  397. var err error
  398. h.k8shelper.CreateAnonSystemClusterBinding()
  399. // Create rook operator
  400. logger.Infof("Starting Rook Operator")
  401. if h.settings.UseHelm {
  402. // enable the discovery daemonset with the helm chart
  403. err := h.CreateRookOperatorViaHelm()
  404. if err != nil {
  405. return false, errors.Wrap(err, "failed to configure helm")
  406. }
  407. } else {
  408. err := h.CreateCephOperator()
  409. if err != nil {
  410. return false, errors.Wrap(err, "failed to configure ceph operator")
  411. }
  412. }
  413. if !h.k8shelper.IsPodInExpectedState("rook-ceph-operator", h.settings.OperatorNamespace, "Running") {
  414. logger.Error("rook-ceph-operator is not running")
  415. h.k8shelper.GetLogsFromNamespace(h.settings.OperatorNamespace, "test-setup", utils.TestEnvName())
  416. logger.Error("rook-ceph-operator is not Running, abort!")
  417. return false, err
  418. }
  419. if h.settings.EnableDiscovery {
  420. if h.k8shelper.IsPodInExpectedState("rook-discover", h.settings.OperatorNamespace, "Running") {
  421. assert.NoError(h.T(), err)
  422. } else {
  423. assert.Error(h.T(), err)
  424. assert.True(h.T(), kerrors.IsNotFound(err))
  425. }
  426. }
  427. return true, nil
  428. }
  429. func (h *CephInstaller) InstallRook() (bool, error) {
  430. if h.settings.RookVersion != LocalBuildTag {
  431. // make sure we have the images from a previous release locally so the test doesn't hit a timeout
  432. assert.NoError(h.T(), h.k8shelper.GetDockerImage("rook/ceph:"+h.settings.RookVersion))
  433. }
  434. assert.NoError(h.T(), h.k8shelper.GetDockerImage(h.settings.CephVersion.Image))
  435. k8sversion := h.k8shelper.GetK8sServerVersion()
  436. logger.Infof("Installing rook on K8s %s", k8sversion)
  437. success, err := h.installRookOperator()
  438. if err != nil {
  439. return false, err
  440. }
  441. if !success {
  442. return false, nil
  443. }
  444. if h.settings.UseHelm {
  445. // Install Prometheus so we can create the prometheus rules
  446. args := []string{"apply", "-f", "https://raw.githubusercontent.com/coreos/prometheus-operator/v0.40.0/bundle.yaml"}
  447. _, err = h.k8shelper.MakeContext().Executor.ExecuteCommandWithOutput("kubectl", args...)
  448. if err != nil {
  449. return false, errors.Wrap(err, "failed to install prometheus")
  450. }
  451. // Create the cluster
  452. err = h.CreateRookCephClusterViaHelm()
  453. if err != nil {
  454. return false, errors.Wrap(err, "failed to install ceph cluster using Helm")
  455. }
  456. } else {
  457. // Create rook cluster
  458. err = h.CreateCephCluster()
  459. if err != nil {
  460. logger.Errorf("Cluster %q install failed. %v", h.settings.Namespace, err)
  461. return false, err
  462. }
  463. err = h.CreateRookToolbox(h.Manifests)
  464. if err != nil {
  465. return false, errors.Wrapf(err, "failed to install toolbox in cluster %s", h.settings.Namespace)
  466. }
  467. }
  468. logger.Info("Waiting for Rook Cluster")
  469. if err := h.waitForCluster(); err != nil {
  470. return false, err
  471. }
  472. err = h.WaitForToolbox(h.settings.Namespace)
  473. if err != nil {
  474. return false, err
  475. }
  476. const loopCount = 20
  477. for i := 0; i < loopCount; i++ {
  478. _, err = client.Status(h.k8shelper.MakeContext(), client.AdminTestClusterInfo(h.settings.Namespace))
  479. if err == nil {
  480. logger.Infof("toolbox ready")
  481. break
  482. }
  483. logger.Infof("toolbox is not ready")
  484. if i == loopCount-1 {
  485. return false, errors.Errorf("toolbox cannot connect to cluster")
  486. }
  487. time.Sleep(5 * time.Second)
  488. }
  489. if h.settings.UseHelm {
  490. logger.Infof("Confirming ceph cluster installed correctly")
  491. if err := h.ConfirmHelmClusterInstalledCorrectly(); err != nil {
  492. return false, errors.Wrap(err, "the ceph cluster storage CustomResources did not install correctly")
  493. }
  494. if !h.settings.RetainHelmDefaultStorageCRs {
  495. h.removeCephClusterHelmResources()
  496. }
  497. }
  498. logger.Infof("installed rook operator and cluster %s on k8s %s", h.settings.Namespace, h.k8sVersion)
  499. return true, nil
  500. }
  501. // UninstallRook uninstalls rook from k8s
  502. func (h *CephInstaller) UninstallRook() {
  503. h.UninstallRookFromMultipleNS(h.Manifests)
  504. }
  505. // UninstallRookFromMultipleNS uninstalls rook from multiple namespaces in k8s
  506. func (h *CephInstaller) UninstallRookFromMultipleNS(manifests ...CephManifests) {
  507. ctx := context.TODO()
  508. var clusterNamespaces []string
  509. for _, manifest := range manifests {
  510. clusterNamespaces = append(clusterNamespaces, manifest.Settings().Namespace)
  511. // Gather pod restart count and alert
  512. h.k8shelper.GetPodRestartsFromNamespace(manifest.Settings().Namespace, h.T().Name(), utils.TestEnvName())
  513. }
  514. // Gather logs after status checks
  515. h.GatherAllRookLogs(h.T().Name(), append([]string{h.settings.OperatorNamespace}, clusterNamespaces...)...)
  516. // If test failed do not teardown and leave the cluster in the state it is
  517. if h.T().Failed() {
  518. logger.Info("one of the tests failed, leaving the cluster in its bad shape for investigation")
  519. return
  520. }
  521. logger.Infof("Uninstalling Rook")
  522. var err error
  523. skipOperatorCleanup := false
  524. for _, manifest := range manifests {
  525. namespace := manifest.Settings().Namespace
  526. clusterName := manifest.Settings().ClusterName
  527. if manifest.Settings().SkipCleanupPolicy && manifest.Settings().SkipClusterCleanup {
  528. logger.Infof("SKIPPING ALL CLEANUP for namespace %q", namespace)
  529. skipOperatorCleanup = true
  530. continue
  531. }
  532. testCleanupPolicy := !h.settings.UseHelm && !manifest.Settings().IsExternal && !manifest.Settings().SkipCleanupPolicy
  533. if testCleanupPolicy {
  534. // Add cleanup policy to the core ceph cluster
  535. err = h.addCleanupPolicy(namespace, clusterName)
  536. if err != nil {
  537. assert.NoError(h.T(), err)
  538. // no need to check for cleanup policy later if it already failed
  539. testCleanupPolicy = false
  540. }
  541. // if the test passed, check that the ceph status is HEALTH_OK before we tear the cluster down
  542. if !h.T().Failed() {
  543. // Only check the Ceph status for the core cluster
  544. // The check won't work for an external cluster since the core cluster is already gone
  545. h.checkCephHealthStatus()
  546. }
  547. }
  548. // The pool CRs should already be removed by the tests that created them
  549. pools, err := h.k8shelper.RookClientset.CephV1().CephBlockPools(namespace).List(ctx, metav1.ListOptions{})
  550. assert.NoError(h.T(), err, "failed to retrieve pool CRs")
  551. for _, pool := range pools.Items {
  552. logger.Infof("found pools: %v", pools)
  553. assert.Fail(h.T(), fmt.Sprintf("pool %q still exists", pool.Name))
  554. // Get the operator log
  555. h.GatherAllRookLogs(h.T().Name()+"poolcheck", h.settings.OperatorNamespace)
  556. }
  557. if h.settings.UseHelm {
  558. // helm rook-ceph-cluster cleanup
  559. if h.settings.RetainHelmDefaultStorageCRs {
  560. h.removeCephClusterHelmResources()
  561. }
  562. err = h.helmHelper.DeleteLocalRookHelmChart(namespace, CephClusterChartName)
  563. checkError(h.T(), err, fmt.Sprintf("cannot uninstall helm chart %s", CephClusterChartName))
  564. } else {
  565. err = h.k8shelper.DeleteResourceAndWait(false, "-n", namespace, "cephcluster", clusterName)
  566. checkError(h.T(), err, fmt.Sprintf("cannot remove cluster %s", namespace))
  567. }
  568. h.waitForResourceDeletion(namespace, clusterName)
  569. if testCleanupPolicy {
  570. err = h.waitForCleanupJobs(namespace)
  571. if err != nil {
  572. assert.NoError(h.T(), err)
  573. h.GatherAllRookLogs(h.T().Name()+"cleanup-job", append([]string{h.settings.OperatorNamespace}, clusterNamespaces...)...)
  574. }
  575. }
  576. // helm operator cleanup
  577. if h.settings.UseHelm {
  578. err = h.helmHelper.DeleteLocalRookHelmChart(h.settings.OperatorNamespace, OperatorChartName)
  579. checkError(h.T(), err, fmt.Sprintf("cannot uninstall helm chart %s", OperatorChartName))
  580. // delete the entire namespace (in non-helm installs it's removed with the common.yaml)
  581. err = h.k8shelper.DeleteResourceAndWait(false, "namespace", h.settings.OperatorNamespace)
  582. checkError(h.T(), err, fmt.Sprintf("cannot delete namespace %s", h.settings.OperatorNamespace))
  583. continue
  584. }
  585. // Skip the remainder of cluster cleanup if desired
  586. if manifest.Settings().SkipClusterCleanup {
  587. logger.Infof("SKIPPING CLUSTER CLEANUP")
  588. skipOperatorCleanup = true
  589. continue
  590. }
  591. // non-helm cleanup
  592. if manifest.Settings().IsExternal {
  593. logger.Infof("Deleting all the resources in the common external manifest")
  594. _, err = h.k8shelper.KubectlWithStdin(manifest.GetCommonExternal(), deleteFromStdinArgs...)
  595. if err != nil {
  596. logger.Errorf("failed to remove common external resources. %v", err)
  597. } else {
  598. logger.Infof("done deleting all the resources in the common external manifest")
  599. }
  600. } else {
  601. h.k8shelper.PrintResources(namespace, "cephblockpools.ceph.rook.io")
  602. h.k8shelper.PrintResources(namespace, "cephclients.ceph.rook.io")
  603. h.k8shelper.PrintResources(namespace, "cephclusters.ceph.rook.io")
  604. h.k8shelper.PrintResources(namespace, "cephfilesystemmirrors.ceph.rook.io")
  605. h.k8shelper.PrintResources(namespace, "cephfilesystems.ceph.rook.io")
  606. h.k8shelper.PrintResources(namespace, "cephnfses.ceph.rook.io")
  607. h.k8shelper.PrintResources(namespace, "cephobjectrealms.ceph.rook.io")
  608. h.k8shelper.PrintResources(namespace, "cephobjectstores.ceph.rook.io")
  609. h.k8shelper.PrintResources(namespace, "cephobjectstoreusers.ceph.rook.io")
  610. h.k8shelper.PrintResources(namespace, "cephobjectzonegroups.ceph.rook.io")
  611. h.k8shelper.PrintResources(namespace, "cephobjectzones.ceph.rook.io")
  612. h.k8shelper.PrintResources(namespace, "cephrbdmirrors.ceph.rook.io")
  613. h.k8shelper.PrintResources(namespace, "objectbucketclaims.ceph.rook.io")
  614. h.k8shelper.PrintResources(namespace, "objectbuckets.ceph.rook.io")
  615. h.k8shelper.PrintPodStatus(namespace)
  616. h.k8shelper.PrintPVs(true)
  617. logger.Infof("Deleting all the resources in the common manifest")
  618. _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetCommon(), deleteFromStdinArgs...)
  619. if err != nil {
  620. logger.Errorf("failed to remove common manifest. %v", err)
  621. } else {
  622. logger.Infof("done deleting all the resources in the common manifest")
  623. }
  624. if h.settings.TestNFSCSI {
  625. _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetCSINFSRBAC(), deleteFromStdinArgs...)
  626. if err != nil {
  627. logger.Errorf("failed to remove csi nfs rbac manifest. %v", err)
  628. } else {
  629. logger.Info("done deleting all the resources in the csi nfs rbac manifest")
  630. }
  631. }
  632. }
  633. }
  634. // Skip the remainder of cluster cleanup if desired
  635. if skipOperatorCleanup {
  636. logger.Infof("SKIPPING OPERATOR CLEANUP")
  637. return
  638. }
  639. if !h.settings.UseHelm {
  640. logger.Infof("Deleting all the resources in the operator manifest")
  641. _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetOperator(), deleteFromStdinArgs...)
  642. if err != nil {
  643. logger.Errorf("failed to remove operator resources. %v", err)
  644. } else {
  645. logger.Infof("done deleting all the resources in the operator manifest")
  646. }
  647. }
  648. logger.Info("removing the CRDs")
  649. _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetCRDs(h.k8shelper), deleteFromStdinArgs...)
  650. if err != nil {
  651. logger.Errorf("failed to remove CRDS. %v", err)
  652. } else {
  653. logger.Infof("done deleting all the CRDs")
  654. }
  655. err = h.k8shelper.DeleteResourceAndWait(false, "namespace", h.settings.OperatorNamespace)
  656. checkError(h.T(), err, fmt.Sprintf("cannot delete operator namespace %s", h.settings.OperatorNamespace))
  657. logger.Infof("done removing the operator from namespace %s", h.settings.OperatorNamespace)
  658. logger.Infof("removing host data dir %s", h.hostPathToDelete)
  659. // removing data dir if exists
  660. if h.hostPathToDelete != "" {
  661. nodes, err := h.GetNodeHostnames()
  662. checkError(h.T(), err, "cannot get node names")
  663. for _, node := range nodes {
  664. err = h.verifyDirCleanup(node, h.hostPathToDelete)
  665. logger.Infof("verified cleanup of %s from node %s", h.hostPathToDelete, node)
  666. assert.NoError(h.T(), err)
  667. }
  668. }
  669. if h.changeHostnames {
  670. // revert the hostname labels for the test
  671. _, err = h.k8shelper.RestoreHostnames()
  672. assert.NoError(h.T(), err)
  673. }
  674. // wait a bit longer for the system namespace to be cleaned up after their deletion
  675. for i := 0; i < 15; i++ {
  676. _, err := h.k8shelper.Clientset.CoreV1().Namespaces().Get(ctx, h.settings.OperatorNamespace, metav1.GetOptions{})
  677. if err != nil && kerrors.IsNotFound(err) {
  678. logger.Infof("operator namespace %q removed", h.settings.OperatorNamespace)
  679. break
  680. }
  681. logger.Infof("operator namespace %q still found...", h.settings.OperatorNamespace)
  682. time.Sleep(5 * time.Second)
  683. }
  684. }
  685. func (h *CephInstaller) waitForResourceDeletion(namespace, clusterName string) {
  686. ctx := context.TODO()
  687. clusterDeleteRetries := 0
  688. crdCheckerFunc := func() error {
  689. // Check for existence of the cluster CR
  690. _, err := h.k8shelper.RookClientset.CephV1().CephClusters(namespace).Get(ctx, clusterName, metav1.GetOptions{})
  691. clusterDeleteRetries++
  692. if err != nil {
  693. if !kerrors.IsNotFound(err) {
  694. return err
  695. }
  696. } else {
  697. // If the operator really isn't going to remove the finalizer, just force remove it
  698. if clusterDeleteRetries > 10 {
  699. h.removeClusterFinalizers(namespace, clusterName)
  700. }
  701. }
  702. // Check for existence of the mon endpoints configmap, which has a finalizer
  703. _, err = h.k8shelper.Clientset.CoreV1().ConfigMaps(namespace).Get(ctx, mon.EndpointConfigMapName, metav1.GetOptions{})
  704. if err != nil && !kerrors.IsNotFound(err) {
  705. return err
  706. }
  707. // Check for existence of the mon secret, which has a finalizer
  708. _, err = h.k8shelper.Clientset.CoreV1().Secrets(namespace).Get(ctx, mon.AppName, metav1.GetOptions{})
  709. return err
  710. }
  711. err := h.k8shelper.WaitForCustomResourceDeletion(namespace, clusterName, crdCheckerFunc)
  712. checkError(h.T(), err, fmt.Sprintf("failed to wait for cluster crd %s deletion", namespace))
  713. }
  714. func (h *CephInstaller) removeClusterFinalizers(namespace, clusterName string) {
  715. ctx := context.TODO()
  716. // Get the latest cluster instead of using the same instance in case it has been changed
  717. cluster, err := h.k8shelper.RookClientset.CephV1().CephClusters(namespace).Get(ctx, clusterName, metav1.GetOptions{})
  718. if err != nil {
  719. logger.Errorf("failed to remove finalizer. failed to get cluster. %+v", err)
  720. return
  721. }
  722. objectMeta := &cluster.ObjectMeta
  723. if len(objectMeta.Finalizers) == 0 {
  724. logger.Infof("no finalizers to remove from cluster %s", namespace)
  725. return
  726. }
  727. objectMeta.Finalizers = nil
  728. _, err = h.k8shelper.RookClientset.CephV1().CephClusters(cluster.Namespace).Update(ctx, cluster, metav1.UpdateOptions{})
  729. if err != nil {
  730. logger.Errorf("failed to remove finalizers from cluster %s. %+v", objectMeta.Name, err)
  731. return
  732. }
  733. logger.Infof("removed finalizers from cluster %s", objectMeta.Name)
  734. }
  735. func (h *CephInstaller) checkCephHealthStatus() {
  736. ctx := context.TODO()
  737. clusterResource, err := h.k8shelper.RookClientset.CephV1().CephClusters(h.settings.Namespace).Get(ctx, h.settings.ClusterName, metav1.GetOptions{})
  738. assert.Nil(h.T(), err)
  739. clusterPhase := string(clusterResource.Status.Phase)
  740. if clusterPhase != "Ready" && clusterPhase != "Connected" && clusterPhase != "Progressing" {
  741. assert.Equal(h.T(), "Ready", string(clusterResource.Status.Phase))
  742. }
  743. // Depending on the tests, the health may be fluctuating with different components being started or stopped.
  744. // If needed, give it a few seconds to settle and check the status again.
  745. logger.Infof("checking ceph cluster health in namespace %q", h.settings.Namespace)
  746. if clusterResource.Status.CephStatus.Health != "HEALTH_OK" {
  747. time.Sleep(10 * time.Second)
  748. clusterResource, err = h.k8shelper.RookClientset.CephV1().CephClusters(h.settings.Namespace).Get(ctx, h.settings.ClusterName, metav1.GetOptions{})
  749. assert.Nil(h.T(), err)
  750. }
  751. // The health status is not stable enough for the integration tests to rely on.
  752. // We should enable this check if we can get the ceph status to be stable despite all the changing configurations performed by rook.
  753. //assert.Equal(h.T(), "HEALTH_OK", clusterResource.Status.CephStatus.Health)
  754. assert.NotEqual(h.T(), "", clusterResource.Status.CephStatus.LastChecked)
  755. // Print the details if the health is not ok
  756. if clusterResource.Status.CephStatus.Health != "HEALTH_OK" {
  757. logger.Errorf("Ceph health status: %s", clusterResource.Status.CephStatus.Health)
  758. for name, message := range clusterResource.Status.CephStatus.Details {
  759. logger.Errorf("Ceph health message: %s. %s: %s", name, message.Severity, message.Message)
  760. }
  761. }
  762. }
  763. func (h *CephInstaller) verifyDirCleanup(node, dir string) error {
  764. resources := h.GetCleanupVerificationPod(node, dir)
  765. _, err := h.k8shelper.KubectlWithStdin(resources, createFromStdinArgs...)
  766. return err
  767. }
  768. func (h *CephInstaller) CollectOperatorLog(suiteName, testName string) {
  769. if !h.T().Failed() && TestLogCollectionLevel() != "all" {
  770. return
  771. }
  772. name := fmt.Sprintf("%s_%s", suiteName, testName)
  773. h.k8shelper.CollectPodLogsFromLabel(cephOperatorLabel, h.settings.OperatorNamespace, name, utils.TestEnvName())
  774. }
  775. func (h *CephInstaller) GatherAllRookLogs(testName string, namespaces ...string) {
  776. if !h.T().Failed() && TestLogCollectionLevel() != "all" {
  777. return
  778. }
  779. logger.Infof("gathering all logs from the test")
  780. for _, namespace := range namespaces {
  781. h.k8shelper.GetLogsFromNamespace(namespace, testName, utils.TestEnvName())
  782. h.k8shelper.GetPodDescribeFromNamespace(namespace, testName, utils.TestEnvName())
  783. h.k8shelper.GetEventsFromNamespace(namespace, testName, utils.TestEnvName())
  784. }
  785. }
  786. // NewCephInstaller creates new instance of CephInstaller
  787. func NewCephInstaller(t func() *testing.T, clientset *kubernetes.Clientset, settings *TestCephSettings) *CephInstaller {
  788. // By default set a cluster name that is different from the namespace so we don't rely on the namespace
  789. // in expected places
  790. if settings.ClusterName == "" {
  791. settings.ClusterName = defaultclusterName
  792. }
  793. version, err := clientset.ServerVersion()
  794. if err != nil {
  795. logger.Infof("failed to get kubectl server version. %+v", err)
  796. }
  797. k8shelp, err := utils.CreateK8sHelper(t)
  798. if err != nil {
  799. panic("failed to get kubectl client :" + err.Error())
  800. }
  801. logger.Infof("Rook Version: %s", settings.RookVersion)
  802. logger.Infof("Ceph Version: %s", settings.CephVersion.Image)
  803. h := &CephInstaller{
  804. settings: settings,
  805. Manifests: NewCephManifests(settings),
  806. k8shelper: k8shelp,
  807. helmHelper: utils.NewHelmHelper(testHelmPath()),
  808. k8sVersion: version.String(),
  809. changeHostnames: settings.ChangeHostName,
  810. T: t,
  811. }
  812. flag.Parse()
  813. return h
  814. }
  815. // GetCleanupPod gets a cleanup Pod that cleans up the dataDirHostPath
  816. func (h *CephInstaller) GetCleanupPod(node, removalDir string) string {
  817. return `apiVersion: batch/v1
  818. kind: Job
  819. metadata:
  820. name: rook-cleanup-` + uuid.Must(uuid.NewRandom()).String() + `
  821. spec:
  822. template:
  823. spec:
  824. restartPolicy: Never
  825. containers:
  826. - name: rook-cleaner
  827. image: rook/ceph:` + LocalBuildTag + `
  828. securityContext:
  829. privileged: true
  830. volumeMounts:
  831. - name: cleaner
  832. mountPath: /scrub
  833. command:
  834. - "sh"
  835. - "-c"
  836. - "rm -rf /scrub/*"
  837. nodeSelector:
  838. kubernetes.io/hostname: ` + node + `
  839. volumes:
  840. - name: cleaner
  841. hostPath:
  842. path: ` + removalDir
  843. }
  844. // GetCleanupVerificationPod verifies that the dataDirHostPath is empty
  845. func (h *CephInstaller) GetCleanupVerificationPod(node, hostPathDir string) string {
  846. return `apiVersion: batch/v1
  847. kind: Job
  848. metadata:
  849. name: rook-verify-cleanup-` + uuid.Must(uuid.NewRandom()).String() + `
  850. spec:
  851. template:
  852. spec:
  853. restartPolicy: Never
  854. containers:
  855. - name: rook-cleaner
  856. image: rook/ceph:` + LocalBuildTag + `
  857. securityContext:
  858. privileged: true
  859. volumeMounts:
  860. - name: cleaner
  861. mountPath: /scrub
  862. command:
  863. - "sh"
  864. - "-c"
  865. - |
  866. set -xEeuo pipefail
  867. #Assert dataDirHostPath is empty
  868. if [ "$(ls -A /scrub/)" ]; then
  869. exit 1
  870. fi
  871. nodeSelector:
  872. kubernetes.io/hostname: ` + node + `
  873. volumes:
  874. - name: cleaner
  875. hostPath:
  876. path: ` + hostPathDir
  877. }
  878. func (h *CephInstaller) addCleanupPolicy(namespace, clusterName string) error {
  879. // Retry updating the CR a few times in case of random failure
  880. var returnErr error
  881. for i := 0; i < 3; i++ {
  882. ctx := context.TODO()
  883. cluster, err := h.k8shelper.RookClientset.CephV1().CephClusters(namespace).Get(ctx, clusterName, metav1.GetOptions{})
  884. if err != nil {
  885. return errors.Errorf("failed to get ceph cluster. %+v", err)
  886. }
  887. cluster.Spec.CleanupPolicy.Confirmation = cephv1.DeleteDataDirOnHostsConfirmation
  888. cluster.Spec.CleanupPolicy.AllowUninstallWithVolumes = true
  889. _, err = h.k8shelper.RookClientset.CephV1().CephClusters(namespace).Update(ctx, cluster, metav1.UpdateOptions{})
  890. if err != nil {
  891. returnErr = errors.Errorf("failed to add clean up policy to the cluster. %+v", err)
  892. logger.Warningf("could not add cleanup policy, trying again... %v", err)
  893. } else {
  894. logger.Info("successfully added cleanup policy to the ceph cluster and skipping checks for existing volumes")
  895. return nil
  896. }
  897. }
  898. return returnErr
  899. }
  900. func (h *CephInstaller) waitForCleanupJobs(namespace string) error {
  901. allRookCephCleanupJobs := func(ctx context.Context) (done bool, err error) {
  902. appLabelSelector := fmt.Sprintf("app=%s", cluster.CleanupAppName)
  903. cleanupJobs, err := h.k8shelper.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: appLabelSelector})
  904. if err != nil {
  905. return false, errors.Errorf("failed to get cleanup jobs. %+v", err)
  906. }
  907. // Clean up jobs might take some time to start
  908. if len(cleanupJobs.Items) == 0 {
  909. logger.Infof("no jobs with label selector %q found.", appLabelSelector)
  910. return false, nil
  911. }
  912. for _, job := range cleanupJobs.Items {
  913. logger.Infof("job %q status: %+v", job.Name, job.Status)
  914. if job.Status.Failed > 0 {
  915. return false, errors.Errorf("job %s failed", job.Name)
  916. }
  917. if job.Status.Succeeded == 1 {
  918. l, err := h.k8shelper.Kubectl("-n", namespace, "logs", fmt.Sprintf("job.batch/%s", job.Name))
  919. if err != nil {
  920. logger.Errorf("cannot get logs for pod %s. %v", job.Name, err)
  921. }
  922. rawData := []byte(l)
  923. logger.Infof("cleanup job %s done. logs: %s", job.Name, string(rawData))
  924. }
  925. if job.Status.Succeeded == 0 {
  926. return false, nil
  927. }
  928. }
  929. logger.Infof("cleanup job(s) completed")
  930. return true, nil
  931. }
  932. logger.Info("waiting for job(s) to cleanup the host...")
  933. err := wait.PollUntilContextTimeout(context.TODO(), 5*time.Second, 90*time.Second, true, allRookCephCleanupJobs)
  934. if err != nil {
  935. return errors.Errorf("failed to wait for clean up jobs to complete. %+v", err)
  936. }
  937. logger.Info("successfully executed all the ceph clean up jobs")
  938. return nil
  939. }