ceph_mgr_test.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. /*
  2. Copyright 2020 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package integration
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "os/exec"
  19. "sort"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/require"
  23. "github.com/rook/rook/pkg/operator/k8sutil"
  24. "github.com/rook/rook/tests/framework/installer"
  25. "github.com/rook/rook/tests/framework/utils"
  26. "github.com/stretchr/testify/assert"
  27. "github.com/stretchr/testify/suite"
  28. )
  29. const (
  30. defaultTries = 3
  31. )
  32. // **************************************************
  33. // *** Mgr operations covered by TestMgrSmokeSuite ***
  34. //
  35. // Ceph orchestrator device ls
  36. // Ceph orchestrator status
  37. // Ceph orchestrator host ls
  38. // Ceph orchestrator ls
  39. // **************************************************
  40. func TestCephMgrSuite(t *testing.T) {
  41. s := new(CephMgrSuite)
  42. defer func(s *CephMgrSuite) {
  43. HandlePanics(recover(), s.TearDownSuite, s.T)
  44. }(s)
  45. suite.Run(t, s)
  46. }
  47. type CephMgrSuite struct {
  48. suite.Suite
  49. settings *installer.TestCephSettings
  50. k8sh *utils.K8sHelper
  51. installer *installer.CephInstaller
  52. namespace string
  53. }
  54. type host struct {
  55. Addr string
  56. Hostname string
  57. Labels []string
  58. Status string
  59. }
  60. type serviceStatus struct {
  61. ContainerImageName string `json:"Container_image_name"`
  62. LastRefresh string `json:"Last_refresh"`
  63. Running int
  64. Size int
  65. }
  66. type service struct {
  67. ServiceName string `json:"Service_name"`
  68. ServiceType string `json:"Service_type"`
  69. Status serviceStatus
  70. }
  71. func (s *CephMgrSuite) SetupSuite() {
  72. s.namespace = "mgr-ns"
  73. s.settings = &installer.TestCephSettings{
  74. ClusterName: s.namespace,
  75. OperatorNamespace: installer.SystemNamespace(s.namespace),
  76. Namespace: s.namespace,
  77. StorageClassName: "",
  78. UseHelm: false,
  79. UsePVC: false,
  80. Mons: 1,
  81. SkipOSDCreation: true,
  82. EnableDiscovery: false,
  83. RookVersion: installer.LocalBuildTag,
  84. CephVersion: installer.MainVersion,
  85. }
  86. s.settings.ApplyEnvVars()
  87. s.installer, s.k8sh = StartTestCluster(s.T, s.settings)
  88. s.waitForOrchestrationModule()
  89. s.prepareLocalStorageClass("local-storage")
  90. }
  91. func (s *CephMgrSuite) AfterTest(suiteName, testName string) {
  92. s.installer.CollectOperatorLog(suiteName, testName)
  93. }
  94. func (s *CephMgrSuite) TearDownSuite() {
  95. _ = s.k8sh.DeleteResource("sc", "local-storage")
  96. s.installer.UninstallRook()
  97. }
  98. func (s *CephMgrSuite) executeWithRetry(command []string, maxRetries int) (string, error) {
  99. tries := 0
  100. orchestratorCommand := append([]string{"orch"}, command...)
  101. for {
  102. err, output := s.installer.Execute("ceph", orchestratorCommand, s.namespace)
  103. tries++
  104. if err != nil {
  105. if maxRetries == 1 {
  106. return output, err
  107. }
  108. if tries == maxRetries {
  109. return "", fmt.Errorf("max retries(%d) reached, last err: %v", tries, err)
  110. }
  111. logger.Infof("retrying command <<ceph %s>>: last error: %v", command, err)
  112. continue
  113. }
  114. return output, nil
  115. }
  116. }
  117. func (s *CephMgrSuite) execute(command []string) (string, error) {
  118. return s.executeWithRetry(command, 1)
  119. }
  120. func (s *CephMgrSuite) prepareLocalStorageClass(storageClassName string) {
  121. // Rook orchestrator use PVs based in this storage class to create OSDs
  122. // It is also needed to list "devices"
  123. localStorageClass := `
  124. kind: StorageClass
  125. apiVersion: storage.k8s.io/v1
  126. metadata:
  127. name: ` + storageClassName + `
  128. provisioner: kubernetes.io/no-provisioner
  129. volumeBindingMode: WaitForFirstConsumer
  130. `
  131. err := s.k8sh.ResourceOperation("apply", localStorageClass)
  132. if err == nil {
  133. err, _ = s.installer.Execute("ceph", []string{"config", "set", "mgr", "mgr/rook/storage_class", storageClassName}, s.namespace)
  134. if err == nil {
  135. logger.Infof("Storage class %q set in manager config", storageClassName)
  136. } else {
  137. assert.Fail(s.T(), fmt.Sprintf("Error configuring local storage class in manager config: %q", err))
  138. }
  139. } else {
  140. assert.Fail(s.T(), fmt.Sprintf("Error creating local storage class: %q ", err))
  141. }
  142. }
  143. func (s *CephMgrSuite) enableOrchestratorModule() {
  144. logger.Info("Enabling Rook orchestrator module: <ceph mgr module enable rook --force>")
  145. err, output := s.installer.Execute("ceph", []string{"mgr", "module", "enable", "rook", "--force"}, s.namespace)
  146. logger.Infof("output: %s", output)
  147. if err != nil {
  148. logger.Infof("Failed to enable rook orchestrator module: %q", err)
  149. return
  150. }
  151. logger.Info("Setting orchestrator backend to Rook .... <ceph orch set backend rook>")
  152. output, err = s.execute([]string{"set", "backend", "rook"})
  153. logger.Infof("output: %s", output)
  154. if err != nil {
  155. logger.Infof("Not possible to set rook as backend orchestrator module: %q", err)
  156. }
  157. }
  158. func (s *CephMgrSuite) waitForOrchestrationModule() {
  159. var err error
  160. // Status struct
  161. type orchStatus struct {
  162. Available bool `json:"available"`
  163. Backend string `json:"backend"`
  164. }
  165. for timeout := 0; timeout < 30; timeout++ {
  166. logger.Info("Waiting for rook orchestrator module enabled and ready ...")
  167. output, err := s.execute([]string{"status", "--format", "json"})
  168. logger.Infof("%s", output)
  169. if err == nil {
  170. logger.Info("Ceph orchestrator ready to execute commands")
  171. // Get status information
  172. bytes := []byte(output)
  173. logBytesInfo(bytes)
  174. var status orchStatus
  175. err := json.Unmarshal(bytes[:len(output)], &status)
  176. if err != nil {
  177. logger.Error("Error getting ceph orch status")
  178. continue
  179. }
  180. if status.Backend != "rook" {
  181. assert.Fail(s.T(), fmt.Sprintf("Orchestrator backend is <%q>. Setting it to <Rook>", status.Backend))
  182. s.enableOrchestratorModule()
  183. } else {
  184. logger.Info("Orchestrator backend is <Rook>")
  185. return
  186. }
  187. } else {
  188. exitError, _ := err.(*exec.ExitError)
  189. if exitError.ExitCode() == 22 { // The <ceph orch> commands are still not recognized
  190. logger.Info("Ceph manager modules still not ready ... ")
  191. } else if exitError.ExitCode() == 2 { // The rook orchestrator is not the orchestrator backend
  192. s.enableOrchestratorModule()
  193. }
  194. }
  195. time.Sleep(5 * time.Second)
  196. }
  197. if err != nil {
  198. logger.Error("Giving up waiting for manager module to be ready")
  199. }
  200. require.Nil(s.T(), err)
  201. }
  202. func (s *CephMgrSuite) TestDeviceLs() {
  203. logger.Info("Testing .... <ceph orch device ls>")
  204. deviceList, err := s.executeWithRetry([]string{"device", "ls"}, defaultTries)
  205. assert.Nil(s.T(), err)
  206. logger.Infof("output = %s", deviceList)
  207. }
  208. func (s *CephMgrSuite) TestStatus() {
  209. logger.Info("Testing .... <ceph orch status>")
  210. status, err := s.executeWithRetry([]string{"status"}, defaultTries)
  211. assert.Nil(s.T(), err)
  212. logger.Infof("output = %s", status)
  213. assert.Equal(s.T(), status, "Backend: rook\nAvailable: Yes")
  214. }
  215. func logBytesInfo(bytesSlice []byte) {
  216. logger.Infof("---- bytes slice info ---")
  217. logger.Infof("bytes: %v\n", bytesSlice)
  218. logger.Infof("length: %d\n", len(bytesSlice))
  219. logger.Infof("string: -->%s<--\n", string(bytesSlice))
  220. logger.Infof("-------------------------")
  221. }
  222. func (s *CephMgrSuite) TestHostLs() {
  223. logger.Info("Testing .... <ceph orch host ls>")
  224. // Get the orchestrator hosts
  225. output, err := s.executeWithRetry([]string{"host", "ls", "json"}, defaultTries)
  226. assert.Nil(s.T(), err)
  227. logger.Infof("output = %s", output)
  228. hosts := []byte(output)
  229. logBytesInfo(hosts)
  230. var hostsList []host
  231. err = json.Unmarshal(hosts[:len(output)], &hostsList)
  232. if err != nil {
  233. assert.Nil(s.T(), err)
  234. }
  235. var hostOutput []string
  236. for _, hostItem := range hostsList {
  237. hostOutput = append(hostOutput, hostItem.Hostname)
  238. }
  239. sort.Strings(hostOutput)
  240. // get the k8s nodes
  241. nodes, err := k8sutil.GetNodeHostNames(context.TODO(), s.k8sh.Clientset)
  242. assert.Nil(s.T(), err)
  243. k8sNodes := make([]string, 0, len(nodes))
  244. for k := range nodes {
  245. k8sNodes = append(k8sNodes, k)
  246. }
  247. sort.Strings(k8sNodes)
  248. // nodes and hosts must be the same
  249. assert.Equal(s.T(), hostOutput, k8sNodes)
  250. }
  251. func (s *CephMgrSuite) TestServiceLs() {
  252. logger.Info("Testing .... <ceph orch ls --format json>")
  253. output, err := s.executeWithRetry([]string{"ls", "--format", "json"}, defaultTries)
  254. assert.Nil(s.T(), err)
  255. logger.Infof("output = %s", output)
  256. services := []byte(output)
  257. logBytesInfo(services)
  258. var servicesList []service
  259. err = json.Unmarshal(services[:len(output)], &servicesList)
  260. assert.Nil(s.T(), err)
  261. labelFilter := ""
  262. for _, svc := range servicesList {
  263. if svc.ServiceName != "crash" {
  264. labelFilter = fmt.Sprintf("app=rook-ceph-%s", svc.ServiceName)
  265. } else {
  266. labelFilter = "app=rook-ceph-crashcollector"
  267. }
  268. k8sPods, err := k8sutil.PodsRunningWithLabel(context.TODO(), s.k8sh.Clientset, s.namespace, labelFilter)
  269. logger.Infof("Service: %+v", svc)
  270. logger.Infof("k8s pods for svc %q using label <%q>: %d", svc.ServiceName, labelFilter, k8sPods)
  271. assert.Nil(s.T(), err)
  272. assert.Equal(s.T(), svc.Status.Running, k8sPods, fmt.Sprintf("Wrong number of pods for kind of service <%s>", svc.ServiceType))
  273. }
  274. }