12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064 |
- /*
- Copyright 2016 The Rook Authors. All rights reserved.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package installer
- import (
- "context"
- "flag"
- "fmt"
- "math/rand"
- "os"
- "path"
- "testing"
- "time"
- "github.com/google/uuid"
- "github.com/pkg/errors"
- cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
- "github.com/rook/rook/pkg/daemon/ceph/client"
- "github.com/rook/rook/pkg/operator/ceph/cluster"
- "github.com/rook/rook/pkg/operator/ceph/cluster/mon"
- "github.com/rook/rook/tests/framework/utils"
- "github.com/stretchr/testify/assert"
- v1 "k8s.io/api/core/v1"
- kerrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/kubernetes"
- )
- const (
- // test with the latest releases
- quincyTestImage = "quay.io/ceph/ceph:v17"
- reefTestImage = "quay.io/ceph/ceph:v18"
- // test with the current development versions
- quincyDevelTestImage = "quay.io/ceph/daemon-base:latest-quincy-devel"
- reefDevelTestImage = "quay.io/ceph/daemon-base:latest-reef-devel"
- // test with the latest Ceph main image
- mainTestImage = "quay.io/ceph/daemon-base:latest-main-devel"
- cephOperatorLabel = "app=rook-ceph-operator"
- defaultclusterName = "test-cluster"
- clusterCustomSettings = `
- [global]
- osd_pool_default_size = 1
- bdev_flock_retry = 20
- mon_warn_on_pool_no_redundancy = false
- bluefs_buffered_io = false
- mon_data_avail_warn = 10
- [mon]
- mon compact on start = true
- `
- volumeReplicationVersion = "v0.5.0"
- )
- var (
- QuincyVersion = cephv1.CephVersionSpec{Image: quincyTestImage}
- QuincyDevelVersion = cephv1.CephVersionSpec{Image: quincyDevelTestImage}
- ReefVersion = cephv1.CephVersionSpec{Image: reefTestImage}
- ReefDevelVersion = cephv1.CephVersionSpec{Image: reefDevelTestImage}
- MainVersion = cephv1.CephVersionSpec{Image: mainTestImage, AllowUnsupported: true}
- volumeReplicationBaseURL = fmt.Sprintf("https://raw.githubusercontent.com/csi-addons/kubernetes-csi-addons/%s/config/crd/bases/", volumeReplicationVersion)
- volumeReplicationCRDURL = volumeReplicationBaseURL + "replication.storage.openshift.io_volumereplications.yaml"
- volumeReplicationClassCRDURL = volumeReplicationBaseURL + "replication.storage.openshift.io_volumereplicationclasses.yaml"
- )
- // CephInstaller wraps installing and uninstalling rook on a platform
- type CephInstaller struct {
- settings *TestCephSettings
- Manifests CephManifests
- k8shelper *utils.K8sHelper
- hostPathToDelete string
- helmHelper *utils.HelmHelper
- k8sVersion string
- changeHostnames bool
- T func() *testing.T
- }
- func ReturnCephVersion() cephv1.CephVersionSpec {
- switch os.Getenv("CEPH_SUITE_VERSION") {
- case "main":
- return MainVersion
- case "quincy-devel":
- return QuincyDevelVersion
- case "reef-devel":
- return ReefDevelVersion
- default:
- return ReefDevelVersion
- }
- }
- // CreateCephOperator creates rook-operator via kubectl
- func (h *CephInstaller) CreateCephOperator() (err error) {
- // creating rook resources
- logger.Info("Creating Rook CRDs")
- resources := h.Manifests.GetCRDs(h.k8shelper)
- if _, err = h.k8shelper.KubectlWithStdin(resources, createFromStdinArgs...); err != nil {
- return err
- }
- if h.changeHostnames {
- // give nodes a hostname that is different from its k8s node name to confirm that all the daemons will be initialized properly
- err = h.k8shelper.ChangeHostnames()
- assert.NoError(h.T(), err)
- }
- // The operator namespace needs to be created explicitly, while the cluster namespace is created with the common.yaml
- if err := h.k8shelper.CreateNamespace(h.settings.OperatorNamespace); err != nil {
- return err
- }
- // Create the namespace and RBAC before starting the operator
- _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetCommon(), createFromStdinArgs...)
- if err != nil {
- return errors.Errorf("Failed to create rook-operator pod: %v ", err)
- }
- if h.settings.TestNFSCSI {
- csiNFSRBAC := h.Manifests.GetCSINFSRBAC()
- if _, err = h.k8shelper.KubectlWithStdin(csiNFSRBAC, createFromStdinArgs...); err != nil {
- return err
- }
- }
- if err := h.CreateVolumeReplicationCRDs(); err != nil {
- return errors.Wrap(err, "failed to create volume replication CRDs")
- }
- _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetOperator(), createFromStdinArgs...)
- if err != nil {
- return errors.Errorf("Failed to create rook-operator pod: %v", err)
- }
- logger.Infof("Rook operator started")
- return nil
- }
- func (h *CephInstaller) CreateVolumeReplicationCRDs() (err error) {
- if !h.Manifests.Settings().EnableVolumeReplication {
- logger.Info("volume replication CRDs skipped")
- return nil
- }
- logger.Info("Creating volume replication CRDs")
- if _, err := h.k8shelper.KubectlWithStdin(readManifestFromURL(volumeReplicationCRDURL), createFromStdinArgs...); err != nil {
- return errors.Wrap(err, "failed to create volumereplication CRD")
- }
- if _, err := h.k8shelper.KubectlWithStdin(readManifestFromURL(volumeReplicationClassCRDURL), createFromStdinArgs...); err != nil {
- return errors.Wrap(err, "failed to create volumereplicationclass CRD")
- }
- return nil
- }
- func (h *CephInstaller) WaitForToolbox(namespace string) error {
- if err := h.k8shelper.WaitForLabeledPodsToRun("app=rook-ceph-tools", namespace); err != nil {
- return errors.Wrap(err, "Rook Toolbox couldn't start")
- }
- logger.Infof("Rook Toolbox started")
- podNames, err := h.k8shelper.GetPodNamesForApp("rook-ceph-tools", namespace)
- assert.NoError(h.T(), err)
- for _, podName := range podNames {
- // All e2e tests should run ceph commands in the toolbox since we are not inside a container
- logger.Infof("found active toolbox pod: %q", podName)
- client.RunAllCephCommandsInToolboxPod = podName
- return nil
- }
- return errors.Errorf("could not find toolbox pod")
- }
- // CreateRookToolbox creates rook-ceph-tools via kubectl
- func (h *CephInstaller) CreateRookToolbox(manifests CephManifests) (err error) {
- logger.Infof("Starting Rook toolbox")
- _, err = h.k8shelper.KubectlWithStdin(manifests.GetToolbox(), createFromStdinArgs...)
- if err != nil {
- return errors.Wrap(err, "failed to create rook-toolbox pod")
- }
- return nil
- }
- // Execute a command in the ceph toolbox
- func (h *CephInstaller) Execute(command string, parameters []string, namespace string) (error, string) {
- clusterInfo := client.AdminTestClusterInfo(namespace)
- cmd, args := client.FinalizeCephCommandArgs(command, clusterInfo, parameters, h.k8shelper.MakeContext().ConfigDir)
- result, err := h.k8shelper.MakeContext().Executor.ExecuteCommandWithOutput(cmd, args...)
- if err != nil {
- logger.Warningf("Error executing command %q: <%v>", command, err)
- return err, result
- }
- return nil, result
- }
- // CreateCephCluster creates rook cluster via kubectl
- func (h *CephInstaller) CreateCephCluster() error {
- ctx := context.TODO()
- var err error
- h.settings.DataDirHostPath, err = h.initTestDir(h.settings.Namespace)
- if err != nil {
- return errors.Errorf("failed to create test dir. %+v", err)
- }
- logger.Infof("Creating cluster with settings: %+v", h.settings)
- logger.Infof("Creating custom ceph.conf settings")
- customSettings := map[string]string{"config": clusterCustomSettings}
- customCM := &v1.ConfigMap{
- ObjectMeta: metav1.ObjectMeta{
- Name: "rook-config-override",
- Namespace: h.settings.Namespace,
- },
- Data: customSettings,
- }
- if _, err := h.k8shelper.Clientset.CoreV1().ConfigMaps(h.settings.Namespace).Create(ctx, customCM, metav1.CreateOptions{}); err != nil {
- return errors.Errorf("failed to create custom ceph.conf. %+v", err)
- }
- customCSISettings := map[string]string{
- "ceph.conf": "[global]\nauth_client_required = cephx",
- }
- customCSICM := &v1.ConfigMap{
- ObjectMeta: metav1.ObjectMeta{
- Name: "csi-ceph-conf-override",
- Namespace: h.settings.Namespace,
- },
- Data: customCSISettings,
- }
- if _, err := h.k8shelper.Clientset.CoreV1().ConfigMaps(h.settings.Namespace).Create(ctx, customCSICM, metav1.CreateOptions{}); err != nil {
- return errors.Errorf("failed to create custom csi ceph.conf. %+v", err)
- }
- logger.Info("Starting Rook Cluster")
- rookCluster := h.Manifests.GetCephCluster()
- logger.Info(rookCluster)
- maxTry := 10
- for i := 0; i < maxTry; i++ {
- _, err := h.k8shelper.KubectlWithStdin(rookCluster, createFromStdinArgs...)
- if err == nil {
- break
- }
- if i == maxTry-1 {
- return errors.Errorf("failed to create rook cluster. %v", err)
- }
- logger.Infof("failed to create rook cluster, trying again... %v", err)
- time.Sleep(5 * time.Second)
- }
- return nil
- }
- func (h *CephInstaller) waitForCluster() error {
- if err := h.k8shelper.WaitForPodCount("app=rook-ceph-mon", h.settings.Namespace, h.settings.Mons); err != nil {
- return err
- }
- if err := h.k8shelper.WaitForPodCount("app=rook-ceph-mgr", h.settings.Namespace, 1); err != nil {
- return err
- }
- if !h.settings.SkipOSDCreation {
- if err := h.k8shelper.WaitForPodCount("app=rook-ceph-osd", h.settings.Namespace, 1); err != nil {
- return err
- }
- }
- if h.settings.UseCrashPruner {
- if err := h.k8shelper.WaitForCronJob("rook-ceph-crashcollector-pruner", h.settings.Namespace); err != nil {
- return err
- }
- }
- logger.Infof("Rook Cluster started")
- if !h.settings.SkipOSDCreation {
- return h.k8shelper.WaitForLabeledPodsToRun("app=rook-ceph-osd", h.settings.Namespace)
- }
- return nil
- }
- // CreateRookExternalCluster creates rook external cluster via kubectl
- func (h *CephInstaller) CreateRookExternalCluster(externalManifests CephManifests) error {
- var err error
- externalSettings := externalManifests.Settings()
- externalSettings.DataDirHostPath, err = h.initTestDir(externalSettings.Namespace)
- if err != nil {
- return errors.Errorf("failed to create test dir. %+v", err)
- }
- logger.Infof("Creating external cluster %q with core storage namespace %q", externalSettings.Namespace, h.settings.Namespace)
- logger.Infof("Creating external cluster roles")
- roles := externalManifests.GetCommonExternal()
- if _, err := h.k8shelper.KubectlWithStdin(roles, createFromStdinArgs...); err != nil {
- return errors.Wrap(err, "failed to create cluster roles")
- }
- // Inject connection information from the first cluster
- logger.Info("Injecting cluster connection information")
- err = h.injectRookExternalClusterInfo(externalSettings)
- if err != nil {
- return errors.Wrap(err, "failed to inject cluster information into the external cluster")
- }
- // Start the external cluster
- logger.Infof("Starting Rook External Cluster with yaml")
- rookCluster := externalManifests.GetExternalCephCluster()
- if _, err := h.k8shelper.KubectlWithStdin(rookCluster, createFromStdinArgs...); err != nil {
- return errors.Wrap(err, "failed to create rook external cluster")
- }
- logger.Infof("Running toolbox on external namespace %q", externalSettings.Namespace)
- if err := h.CreateRookToolbox(externalManifests); err != nil {
- return errors.Wrap(err, "failed to start toolbox on external cluster")
- }
- if err := h.WaitForToolbox(externalManifests.Settings().Namespace); err != nil {
- return errors.Wrap(err, "failed to wait for toolbox on external cluster")
- }
- var clusterStatus cephv1.ClusterStatus
- for i := 0; i < 16; i++ {
- ctx := context.TODO()
- clusterResource, err := h.k8shelper.RookClientset.CephV1().CephClusters(externalSettings.Namespace).Get(ctx, externalSettings.ClusterName, metav1.GetOptions{})
- if err != nil {
- logger.Warningf("failed to get external cluster CR, retrying. %v", err)
- time.Sleep(time.Second * 5)
- continue
- }
- clusterStatus = clusterResource.Status
- clusterPhase := string(clusterResource.Status.Phase)
- if clusterPhase != "Connected" {
- logger.Warningf("failed to start external cluster, retrying, state: %v", clusterResource.Status)
- time.Sleep(time.Second * 5)
- } else if clusterPhase == "Connected" {
- logger.Info("Rook external cluster connected")
- return nil
- }
- }
- return errors.Errorf("failed to start external cluster, state: %v", clusterStatus)
- }
- // InjectRookExternalClusterInfo inject connection information for an external cluster
- func (h *CephInstaller) injectRookExternalClusterInfo(externalSettings *TestCephSettings) error {
- ctx := context.TODO()
- // get config map
- cm, err := h.GetRookExternalClusterMonConfigMap()
- if err != nil {
- return errors.Errorf("failed to get configmap. %v", err)
- }
- // create config map
- _, err = h.k8shelper.Clientset.CoreV1().ConfigMaps(externalSettings.Namespace).Create(ctx, cm, metav1.CreateOptions{})
- if err != nil {
- return errors.Errorf("failed to create configmap. %v", err)
- }
- // get secret
- secret, err := h.GetRookExternalClusterMonSecret()
- if err != nil {
- return errors.Errorf("failed to get secret. %v", err)
- }
- // create secret
- _, err = h.k8shelper.Clientset.CoreV1().Secrets(externalSettings.Namespace).Create(ctx, secret, metav1.CreateOptions{})
- if err != nil {
- return errors.Errorf("failed to create secret. %v", err)
- }
- return nil
- }
- // GetRookExternalClusterMonConfigMap gets the monitor kubernetes configmap of the external cluster
- func (h *CephInstaller) GetRookExternalClusterMonConfigMap() (*v1.ConfigMap, error) {
- ctx := context.TODO()
- configMapName := "rook-ceph-mon-endpoints"
- externalCM, err := h.k8shelper.Clientset.CoreV1().ConfigMaps(h.settings.Namespace).Get(ctx, configMapName, metav1.GetOptions{})
- if err != nil {
- return nil, errors.Errorf("failed to get secret. %v", err)
- }
- newCM := &v1.ConfigMap{}
- newCM.Name = externalCM.Name
- newCM.Data = externalCM.Data
- return newCM, nil
- }
- // GetRookExternalClusterMonSecret gets the monitor kubernetes secret of the external cluster
- func (h *CephInstaller) GetRookExternalClusterMonSecret() (*v1.Secret, error) {
- ctx := context.TODO()
- secretName := "rook-ceph-mon" //nolint:gosec // We safely suppress gosec in tests file
- externalSecret, err := h.k8shelper.Clientset.CoreV1().Secrets(h.settings.Namespace).Get(ctx, secretName, metav1.GetOptions{})
- if err != nil {
- return nil, errors.Errorf("failed to get secret. %v", err)
- }
- newSecret := &v1.Secret{}
- newSecret.Name = externalSecret.Name
- newSecret.Data = externalSecret.Data
- return newSecret, nil
- }
- func (h *CephInstaller) initTestDir(namespace string) (string, error) {
- val, err := baseTestDir()
- if err != nil {
- return "", err
- }
- h.hostPathToDelete = path.Join(val, "rook-test")
- testDir := path.Join(h.hostPathToDelete, namespace)
- // skip the test dir creation if we are not running under "/data"
- if val != "/data" {
- // Create the test dir on the local host
- if err := os.MkdirAll(testDir, 0777); err != nil {
- return "", err
- }
- var err error
- if testDir, err = os.MkdirTemp(testDir, "test-"); err != nil {
- return "", err
- }
- } else {
- // Compose a random test directory name without actually creating it since not running on the localhost
- r := rand.Int() //nolint:gosec // We safely suppress gosec in tests file
- testDir = path.Join(testDir, fmt.Sprintf("test-%d", r))
- }
- return testDir, nil
- }
- // GetNodeHostnames returns the list of nodes in the k8s cluster
- func (h *CephInstaller) GetNodeHostnames() ([]string, error) {
- ctx := context.TODO()
- nodes, err := h.k8shelper.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
- if err != nil {
- return nil, errors.Errorf("failed to get k8s nodes. %+v", err)
- }
- var names []string
- for _, node := range nodes.Items {
- names = append(names, node.Labels[v1.LabelHostname])
- }
- return names, nil
- }
- func (h *CephInstaller) installRookOperator() (bool, error) {
- var err error
- h.k8shelper.CreateAnonSystemClusterBinding()
- // Create rook operator
- logger.Infof("Starting Rook Operator")
- if h.settings.UseHelm {
- // enable the discovery daemonset with the helm chart
- err := h.CreateRookOperatorViaHelm()
- if err != nil {
- return false, errors.Wrap(err, "failed to configure helm")
- }
- } else {
- err := h.CreateCephOperator()
- if err != nil {
- return false, errors.Wrap(err, "failed to configure ceph operator")
- }
- }
- if !h.k8shelper.IsPodInExpectedState("rook-ceph-operator", h.settings.OperatorNamespace, "Running") {
- logger.Error("rook-ceph-operator is not running")
- h.k8shelper.GetLogsFromNamespace(h.settings.OperatorNamespace, "test-setup", utils.TestEnvName())
- logger.Error("rook-ceph-operator is not Running, abort!")
- return false, err
- }
- if h.settings.EnableDiscovery {
- if h.k8shelper.IsPodInExpectedState("rook-discover", h.settings.OperatorNamespace, "Running") {
- assert.NoError(h.T(), err)
- } else {
- assert.Error(h.T(), err)
- assert.True(h.T(), kerrors.IsNotFound(err))
- }
- }
- return true, nil
- }
- func (h *CephInstaller) InstallRook() (bool, error) {
- if h.settings.RookVersion != LocalBuildTag {
- // make sure we have the images from a previous release locally so the test doesn't hit a timeout
- assert.NoError(h.T(), h.k8shelper.GetDockerImage("rook/ceph:"+h.settings.RookVersion))
- }
- assert.NoError(h.T(), h.k8shelper.GetDockerImage(h.settings.CephVersion.Image))
- k8sversion := h.k8shelper.GetK8sServerVersion()
- logger.Infof("Installing rook on K8s %s", k8sversion)
- success, err := h.installRookOperator()
- if err != nil {
- return false, err
- }
- if !success {
- return false, nil
- }
- if h.settings.UseHelm {
- // Install Prometheus so we can create the prometheus rules
- args := []string{"apply", "-f", "https://raw.githubusercontent.com/coreos/prometheus-operator/v0.40.0/bundle.yaml"}
- _, err = h.k8shelper.MakeContext().Executor.ExecuteCommandWithOutput("kubectl", args...)
- if err != nil {
- return false, errors.Wrap(err, "failed to install prometheus")
- }
- // Create the cluster
- err = h.CreateRookCephClusterViaHelm()
- if err != nil {
- return false, errors.Wrap(err, "failed to install ceph cluster using Helm")
- }
- } else {
- // Create rook cluster
- err = h.CreateCephCluster()
- if err != nil {
- logger.Errorf("Cluster %q install failed. %v", h.settings.Namespace, err)
- return false, err
- }
- err = h.CreateRookToolbox(h.Manifests)
- if err != nil {
- return false, errors.Wrapf(err, "failed to install toolbox in cluster %s", h.settings.Namespace)
- }
- }
- logger.Info("Waiting for Rook Cluster")
- if err := h.waitForCluster(); err != nil {
- return false, err
- }
- err = h.WaitForToolbox(h.settings.Namespace)
- if err != nil {
- return false, err
- }
- const loopCount = 20
- for i := 0; i < loopCount; i++ {
- _, err = client.Status(h.k8shelper.MakeContext(), client.AdminTestClusterInfo(h.settings.Namespace))
- if err == nil {
- logger.Infof("toolbox ready")
- break
- }
- logger.Infof("toolbox is not ready")
- if i == loopCount-1 {
- return false, errors.Errorf("toolbox cannot connect to cluster")
- }
- time.Sleep(5 * time.Second)
- }
- if h.settings.UseHelm {
- logger.Infof("Confirming ceph cluster installed correctly")
- if err := h.ConfirmHelmClusterInstalledCorrectly(); err != nil {
- return false, errors.Wrap(err, "the ceph cluster storage CustomResources did not install correctly")
- }
- if !h.settings.RetainHelmDefaultStorageCRs {
- h.removeCephClusterHelmResources()
- }
- }
- logger.Infof("installed rook operator and cluster %s on k8s %s", h.settings.Namespace, h.k8sVersion)
- return true, nil
- }
- // UninstallRook uninstalls rook from k8s
- func (h *CephInstaller) UninstallRook() {
- h.UninstallRookFromMultipleNS(h.Manifests)
- }
- // UninstallRookFromMultipleNS uninstalls rook from multiple namespaces in k8s
- func (h *CephInstaller) UninstallRookFromMultipleNS(manifests ...CephManifests) {
- ctx := context.TODO()
- var clusterNamespaces []string
- for _, manifest := range manifests {
- clusterNamespaces = append(clusterNamespaces, manifest.Settings().Namespace)
- // Gather pod restart count and alert
- h.k8shelper.GetPodRestartsFromNamespace(manifest.Settings().Namespace, h.T().Name(), utils.TestEnvName())
- }
- // Gather logs after status checks
- h.GatherAllRookLogs(h.T().Name(), append([]string{h.settings.OperatorNamespace}, clusterNamespaces...)...)
- // If test failed do not teardown and leave the cluster in the state it is
- if h.T().Failed() {
- logger.Info("one of the tests failed, leaving the cluster in its bad shape for investigation")
- return
- }
- logger.Infof("Uninstalling Rook")
- var err error
- skipOperatorCleanup := false
- for _, manifest := range manifests {
- namespace := manifest.Settings().Namespace
- clusterName := manifest.Settings().ClusterName
- if manifest.Settings().SkipCleanupPolicy && manifest.Settings().SkipClusterCleanup {
- logger.Infof("SKIPPING ALL CLEANUP for namespace %q", namespace)
- skipOperatorCleanup = true
- continue
- }
- testCleanupPolicy := !h.settings.UseHelm && !manifest.Settings().IsExternal && !manifest.Settings().SkipCleanupPolicy
- if testCleanupPolicy {
- // Add cleanup policy to the core ceph cluster
- err = h.addCleanupPolicy(namespace, clusterName)
- if err != nil {
- assert.NoError(h.T(), err)
- // no need to check for cleanup policy later if it already failed
- testCleanupPolicy = false
- }
- // if the test passed, check that the ceph status is HEALTH_OK before we tear the cluster down
- if !h.T().Failed() {
- // Only check the Ceph status for the core cluster
- // The check won't work for an external cluster since the core cluster is already gone
- h.checkCephHealthStatus()
- }
- }
- // The pool CRs should already be removed by the tests that created them
- pools, err := h.k8shelper.RookClientset.CephV1().CephBlockPools(namespace).List(ctx, metav1.ListOptions{})
- assert.NoError(h.T(), err, "failed to retrieve pool CRs")
- for _, pool := range pools.Items {
- logger.Infof("found pools: %v", pools)
- assert.Fail(h.T(), fmt.Sprintf("pool %q still exists", pool.Name))
- // Get the operator log
- h.GatherAllRookLogs(h.T().Name()+"poolcheck", h.settings.OperatorNamespace)
- }
- if h.settings.UseHelm {
- // helm rook-ceph-cluster cleanup
- if h.settings.RetainHelmDefaultStorageCRs {
- h.removeCephClusterHelmResources()
- }
- err = h.helmHelper.DeleteLocalRookHelmChart(namespace, CephClusterChartName)
- checkError(h.T(), err, fmt.Sprintf("cannot uninstall helm chart %s", CephClusterChartName))
- } else {
- err = h.k8shelper.DeleteResourceAndWait(false, "-n", namespace, "cephcluster", clusterName)
- checkError(h.T(), err, fmt.Sprintf("cannot remove cluster %s", namespace))
- }
- h.waitForResourceDeletion(namespace, clusterName)
- if testCleanupPolicy {
- err = h.waitForCleanupJobs(namespace)
- if err != nil {
- assert.NoError(h.T(), err)
- h.GatherAllRookLogs(h.T().Name()+"cleanup-job", append([]string{h.settings.OperatorNamespace}, clusterNamespaces...)...)
- }
- }
- // helm operator cleanup
- if h.settings.UseHelm {
- err = h.helmHelper.DeleteLocalRookHelmChart(h.settings.OperatorNamespace, OperatorChartName)
- checkError(h.T(), err, fmt.Sprintf("cannot uninstall helm chart %s", OperatorChartName))
- // delete the entire namespace (in non-helm installs it's removed with the common.yaml)
- err = h.k8shelper.DeleteResourceAndWait(false, "namespace", h.settings.OperatorNamespace)
- checkError(h.T(), err, fmt.Sprintf("cannot delete namespace %s", h.settings.OperatorNamespace))
- continue
- }
- // Skip the remainder of cluster cleanup if desired
- if manifest.Settings().SkipClusterCleanup {
- logger.Infof("SKIPPING CLUSTER CLEANUP")
- skipOperatorCleanup = true
- continue
- }
- // non-helm cleanup
- if manifest.Settings().IsExternal {
- logger.Infof("Deleting all the resources in the common external manifest")
- _, err = h.k8shelper.KubectlWithStdin(manifest.GetCommonExternal(), deleteFromStdinArgs...)
- if err != nil {
- logger.Errorf("failed to remove common external resources. %v", err)
- } else {
- logger.Infof("done deleting all the resources in the common external manifest")
- }
- } else {
- h.k8shelper.PrintResources(namespace, "cephblockpools.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephclients.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephclusters.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephfilesystemmirrors.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephfilesystems.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephnfses.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephobjectrealms.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephobjectstores.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephobjectstoreusers.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephobjectzonegroups.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephobjectzones.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "cephrbdmirrors.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "objectbucketclaims.ceph.rook.io")
- h.k8shelper.PrintResources(namespace, "objectbuckets.ceph.rook.io")
- h.k8shelper.PrintPodStatus(namespace)
- h.k8shelper.PrintPVs(true)
- logger.Infof("Deleting all the resources in the common manifest")
- _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetCommon(), deleteFromStdinArgs...)
- if err != nil {
- logger.Errorf("failed to remove common manifest. %v", err)
- } else {
- logger.Infof("done deleting all the resources in the common manifest")
- }
- if h.settings.TestNFSCSI {
- _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetCSINFSRBAC(), deleteFromStdinArgs...)
- if err != nil {
- logger.Errorf("failed to remove csi nfs rbac manifest. %v", err)
- } else {
- logger.Info("done deleting all the resources in the csi nfs rbac manifest")
- }
- }
- }
- }
- // Skip the remainder of cluster cleanup if desired
- if skipOperatorCleanup {
- logger.Infof("SKIPPING OPERATOR CLEANUP")
- return
- }
- if !h.settings.UseHelm {
- logger.Infof("Deleting all the resources in the operator manifest")
- _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetOperator(), deleteFromStdinArgs...)
- if err != nil {
- logger.Errorf("failed to remove operator resources. %v", err)
- } else {
- logger.Infof("done deleting all the resources in the operator manifest")
- }
- }
- logger.Info("removing the CRDs")
- _, err = h.k8shelper.KubectlWithStdin(h.Manifests.GetCRDs(h.k8shelper), deleteFromStdinArgs...)
- if err != nil {
- logger.Errorf("failed to remove CRDS. %v", err)
- } else {
- logger.Infof("done deleting all the CRDs")
- }
- err = h.k8shelper.DeleteResourceAndWait(false, "namespace", h.settings.OperatorNamespace)
- checkError(h.T(), err, fmt.Sprintf("cannot delete operator namespace %s", h.settings.OperatorNamespace))
- logger.Infof("done removing the operator from namespace %s", h.settings.OperatorNamespace)
- logger.Infof("removing host data dir %s", h.hostPathToDelete)
- // removing data dir if exists
- if h.hostPathToDelete != "" {
- nodes, err := h.GetNodeHostnames()
- checkError(h.T(), err, "cannot get node names")
- for _, node := range nodes {
- err = h.verifyDirCleanup(node, h.hostPathToDelete)
- logger.Infof("verified cleanup of %s from node %s", h.hostPathToDelete, node)
- assert.NoError(h.T(), err)
- }
- }
- if h.changeHostnames {
- // revert the hostname labels for the test
- _, err = h.k8shelper.RestoreHostnames()
- assert.NoError(h.T(), err)
- }
- // wait a bit longer for the system namespace to be cleaned up after their deletion
- for i := 0; i < 15; i++ {
- _, err := h.k8shelper.Clientset.CoreV1().Namespaces().Get(ctx, h.settings.OperatorNamespace, metav1.GetOptions{})
- if err != nil && kerrors.IsNotFound(err) {
- logger.Infof("operator namespace %q removed", h.settings.OperatorNamespace)
- break
- }
- logger.Infof("operator namespace %q still found...", h.settings.OperatorNamespace)
- time.Sleep(5 * time.Second)
- }
- }
- func (h *CephInstaller) waitForResourceDeletion(namespace, clusterName string) {
- ctx := context.TODO()
- clusterDeleteRetries := 0
- crdCheckerFunc := func() error {
- // Check for existence of the cluster CR
- _, err := h.k8shelper.RookClientset.CephV1().CephClusters(namespace).Get(ctx, clusterName, metav1.GetOptions{})
- clusterDeleteRetries++
- if err != nil {
- if !kerrors.IsNotFound(err) {
- return err
- }
- } else {
- // If the operator really isn't going to remove the finalizer, just force remove it
- if clusterDeleteRetries > 10 {
- h.removeClusterFinalizers(namespace, clusterName)
- }
- }
- // Check for existence of the mon endpoints configmap, which has a finalizer
- _, err = h.k8shelper.Clientset.CoreV1().ConfigMaps(namespace).Get(ctx, mon.EndpointConfigMapName, metav1.GetOptions{})
- if err != nil && !kerrors.IsNotFound(err) {
- return err
- }
- // Check for existence of the mon secret, which has a finalizer
- _, err = h.k8shelper.Clientset.CoreV1().Secrets(namespace).Get(ctx, mon.AppName, metav1.GetOptions{})
- return err
- }
- err := h.k8shelper.WaitForCustomResourceDeletion(namespace, clusterName, crdCheckerFunc)
- checkError(h.T(), err, fmt.Sprintf("failed to wait for cluster crd %s deletion", namespace))
- }
- func (h *CephInstaller) removeClusterFinalizers(namespace, clusterName string) {
- ctx := context.TODO()
- // Get the latest cluster instead of using the same instance in case it has been changed
- cluster, err := h.k8shelper.RookClientset.CephV1().CephClusters(namespace).Get(ctx, clusterName, metav1.GetOptions{})
- if err != nil {
- logger.Errorf("failed to remove finalizer. failed to get cluster. %+v", err)
- return
- }
- objectMeta := &cluster.ObjectMeta
- if len(objectMeta.Finalizers) == 0 {
- logger.Infof("no finalizers to remove from cluster %s", namespace)
- return
- }
- objectMeta.Finalizers = nil
- _, err = h.k8shelper.RookClientset.CephV1().CephClusters(cluster.Namespace).Update(ctx, cluster, metav1.UpdateOptions{})
- if err != nil {
- logger.Errorf("failed to remove finalizers from cluster %s. %+v", objectMeta.Name, err)
- return
- }
- logger.Infof("removed finalizers from cluster %s", objectMeta.Name)
- }
- func (h *CephInstaller) checkCephHealthStatus() {
- ctx := context.TODO()
- clusterResource, err := h.k8shelper.RookClientset.CephV1().CephClusters(h.settings.Namespace).Get(ctx, h.settings.ClusterName, metav1.GetOptions{})
- assert.Nil(h.T(), err)
- clusterPhase := string(clusterResource.Status.Phase)
- if clusterPhase != "Ready" && clusterPhase != "Connected" && clusterPhase != "Progressing" {
- assert.Equal(h.T(), "Ready", string(clusterResource.Status.Phase))
- }
- // Depending on the tests, the health may be fluctuating with different components being started or stopped.
- // If needed, give it a few seconds to settle and check the status again.
- logger.Infof("checking ceph cluster health in namespace %q", h.settings.Namespace)
- if clusterResource.Status.CephStatus.Health != "HEALTH_OK" {
- time.Sleep(10 * time.Second)
- clusterResource, err = h.k8shelper.RookClientset.CephV1().CephClusters(h.settings.Namespace).Get(ctx, h.settings.ClusterName, metav1.GetOptions{})
- assert.Nil(h.T(), err)
- }
- // The health status is not stable enough for the integration tests to rely on.
- // We should enable this check if we can get the ceph status to be stable despite all the changing configurations performed by rook.
- //assert.Equal(h.T(), "HEALTH_OK", clusterResource.Status.CephStatus.Health)
- assert.NotEqual(h.T(), "", clusterResource.Status.CephStatus.LastChecked)
- // Print the details if the health is not ok
- if clusterResource.Status.CephStatus.Health != "HEALTH_OK" {
- logger.Errorf("Ceph health status: %s", clusterResource.Status.CephStatus.Health)
- for name, message := range clusterResource.Status.CephStatus.Details {
- logger.Errorf("Ceph health message: %s. %s: %s", name, message.Severity, message.Message)
- }
- }
- }
- func (h *CephInstaller) verifyDirCleanup(node, dir string) error {
- resources := h.GetCleanupVerificationPod(node, dir)
- _, err := h.k8shelper.KubectlWithStdin(resources, createFromStdinArgs...)
- return err
- }
- func (h *CephInstaller) CollectOperatorLog(suiteName, testName string) {
- if !h.T().Failed() && TestLogCollectionLevel() != "all" {
- return
- }
- name := fmt.Sprintf("%s_%s", suiteName, testName)
- h.k8shelper.CollectPodLogsFromLabel(cephOperatorLabel, h.settings.OperatorNamespace, name, utils.TestEnvName())
- }
- func (h *CephInstaller) GatherAllRookLogs(testName string, namespaces ...string) {
- if !h.T().Failed() && TestLogCollectionLevel() != "all" {
- return
- }
- logger.Infof("gathering all logs from the test")
- for _, namespace := range namespaces {
- h.k8shelper.GetLogsFromNamespace(namespace, testName, utils.TestEnvName())
- h.k8shelper.GetPodDescribeFromNamespace(namespace, testName, utils.TestEnvName())
- h.k8shelper.GetEventsFromNamespace(namespace, testName, utils.TestEnvName())
- }
- }
- // NewCephInstaller creates new instance of CephInstaller
- func NewCephInstaller(t func() *testing.T, clientset *kubernetes.Clientset, settings *TestCephSettings) *CephInstaller {
- // By default set a cluster name that is different from the namespace so we don't rely on the namespace
- // in expected places
- if settings.ClusterName == "" {
- settings.ClusterName = defaultclusterName
- }
- version, err := clientset.ServerVersion()
- if err != nil {
- logger.Infof("failed to get kubectl server version. %+v", err)
- }
- k8shelp, err := utils.CreateK8sHelper(t)
- if err != nil {
- panic("failed to get kubectl client :" + err.Error())
- }
- logger.Infof("Rook Version: %s", settings.RookVersion)
- logger.Infof("Ceph Version: %s", settings.CephVersion.Image)
- h := &CephInstaller{
- settings: settings,
- Manifests: NewCephManifests(settings),
- k8shelper: k8shelp,
- helmHelper: utils.NewHelmHelper(testHelmPath()),
- k8sVersion: version.String(),
- changeHostnames: settings.ChangeHostName,
- T: t,
- }
- flag.Parse()
- return h
- }
- // GetCleanupPod gets a cleanup Pod that cleans up the dataDirHostPath
- func (h *CephInstaller) GetCleanupPod(node, removalDir string) string {
- return `apiVersion: batch/v1
- kind: Job
- metadata:
- name: rook-cleanup-` + uuid.Must(uuid.NewRandom()).String() + `
- spec:
- template:
- spec:
- restartPolicy: Never
- containers:
- - name: rook-cleaner
- image: rook/ceph:` + LocalBuildTag + `
- securityContext:
- privileged: true
- volumeMounts:
- - name: cleaner
- mountPath: /scrub
- command:
- - "sh"
- - "-c"
- - "rm -rf /scrub/*"
- nodeSelector:
- kubernetes.io/hostname: ` + node + `
- volumes:
- - name: cleaner
- hostPath:
- path: ` + removalDir
- }
- // GetCleanupVerificationPod verifies that the dataDirHostPath is empty
- func (h *CephInstaller) GetCleanupVerificationPod(node, hostPathDir string) string {
- return `apiVersion: batch/v1
- kind: Job
- metadata:
- name: rook-verify-cleanup-` + uuid.Must(uuid.NewRandom()).String() + `
- spec:
- template:
- spec:
- restartPolicy: Never
- containers:
- - name: rook-cleaner
- image: rook/ceph:` + LocalBuildTag + `
- securityContext:
- privileged: true
- volumeMounts:
- - name: cleaner
- mountPath: /scrub
- command:
- - "sh"
- - "-c"
- - |
- set -xEeuo pipefail
- #Assert dataDirHostPath is empty
- if [ "$(ls -A /scrub/)" ]; then
- exit 1
- fi
- nodeSelector:
- kubernetes.io/hostname: ` + node + `
- volumes:
- - name: cleaner
- hostPath:
- path: ` + hostPathDir
- }
- func (h *CephInstaller) addCleanupPolicy(namespace, clusterName string) error {
- // Retry updating the CR a few times in case of random failure
- var returnErr error
- for i := 0; i < 3; i++ {
- ctx := context.TODO()
- cluster, err := h.k8shelper.RookClientset.CephV1().CephClusters(namespace).Get(ctx, clusterName, metav1.GetOptions{})
- if err != nil {
- return errors.Errorf("failed to get ceph cluster. %+v", err)
- }
- cluster.Spec.CleanupPolicy.Confirmation = cephv1.DeleteDataDirOnHostsConfirmation
- cluster.Spec.CleanupPolicy.AllowUninstallWithVolumes = true
- _, err = h.k8shelper.RookClientset.CephV1().CephClusters(namespace).Update(ctx, cluster, metav1.UpdateOptions{})
- if err != nil {
- returnErr = errors.Errorf("failed to add clean up policy to the cluster. %+v", err)
- logger.Warningf("could not add cleanup policy, trying again... %v", err)
- } else {
- logger.Info("successfully added cleanup policy to the ceph cluster and skipping checks for existing volumes")
- return nil
- }
- }
- return returnErr
- }
- func (h *CephInstaller) waitForCleanupJobs(namespace string) error {
- allRookCephCleanupJobs := func(ctx context.Context) (done bool, err error) {
- appLabelSelector := fmt.Sprintf("app=%s", cluster.CleanupAppName)
- cleanupJobs, err := h.k8shelper.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: appLabelSelector})
- if err != nil {
- return false, errors.Errorf("failed to get cleanup jobs. %+v", err)
- }
- // Clean up jobs might take some time to start
- if len(cleanupJobs.Items) == 0 {
- logger.Infof("no jobs with label selector %q found.", appLabelSelector)
- return false, nil
- }
- for _, job := range cleanupJobs.Items {
- logger.Infof("job %q status: %+v", job.Name, job.Status)
- if job.Status.Failed > 0 {
- return false, errors.Errorf("job %s failed", job.Name)
- }
- if job.Status.Succeeded == 1 {
- l, err := h.k8shelper.Kubectl("-n", namespace, "logs", fmt.Sprintf("job.batch/%s", job.Name))
- if err != nil {
- logger.Errorf("cannot get logs for pod %s. %v", job.Name, err)
- }
- rawData := []byte(l)
- logger.Infof("cleanup job %s done. logs: %s", job.Name, string(rawData))
- }
- if job.Status.Succeeded == 0 {
- return false, nil
- }
- }
- logger.Infof("cleanup job(s) completed")
- return true, nil
- }
- logger.Info("waiting for job(s) to cleanup the host...")
- err := wait.PollUntilContextTimeout(context.TODO(), 5*time.Second, 90*time.Second, true, allRookCephCleanupJobs)
- if err != nil {
- return errors.Errorf("failed to wait for clean up jobs to complete. %+v", err)
- }
- logger.Info("successfully executed all the ceph clean up jobs")
- return nil
- }
|