mirror.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. /*
  2. Copyright 2020 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "encoding/base64"
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "strings"
  20. "github.com/pkg/errors"
  21. cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
  22. "github.com/rook/rook/pkg/clusterd"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. )
  25. // PeerToken is the content of the peer token
  26. type PeerToken struct {
  27. ClusterFSID string `json:"fsid"`
  28. ClientID string `json:"client_id"`
  29. Key string `json:"key"`
  30. MonHost string `json:"mon_host"`
  31. // These fields are added by Rook and NOT part of the output of client.CreateRBDMirrorBootstrapPeer()
  32. Namespace string `json:"namespace"`
  33. }
  34. var (
  35. rbdMirrorPeerCaps = []string{"mon", "profile rbd-mirror-peer", "osd", "profile rbd"}
  36. rbdMirrorPeerKeyringID = "rbd-mirror-peer"
  37. )
  38. // ImportRBDMirrorBootstrapPeer add a mirror peer in the rbd-mirror configuration
  39. func ImportRBDMirrorBootstrapPeer(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string, direction string, token []byte) error {
  40. logger.Infof("add rbd-mirror bootstrap peer token for pool %q", poolName)
  41. // Token file
  42. tokenFilePattern := fmt.Sprintf("rbd-mirror-token-%s", poolName)
  43. tokenFilePath, err := os.CreateTemp("/tmp", tokenFilePattern)
  44. if err != nil {
  45. return errors.Wrapf(err, "failed to create temporary token file for pool %q", poolName)
  46. }
  47. // Write token into a file
  48. err = os.WriteFile(tokenFilePath.Name(), token, 0400)
  49. if err != nil {
  50. return errors.Wrapf(err, "failed to write token to file %q", tokenFilePath.Name())
  51. }
  52. // Remove token once we exit, we don't need it anymore
  53. defer func() error {
  54. err := os.Remove(tokenFilePath.Name())
  55. return err
  56. }() //nolint // we don't want to return here
  57. // Build command
  58. args := []string{"mirror", "pool", "peer", "bootstrap", "import", poolName, tokenFilePath.Name()}
  59. if direction != "" {
  60. args = append(args, "--direction", direction)
  61. }
  62. cmd := NewRBDCommand(context, clusterInfo, args)
  63. // Run command
  64. output, err := cmd.Run()
  65. if err != nil {
  66. return errors.Wrapf(err, "failed to add rbd-mirror peer token for pool %q. %s", poolName, output)
  67. }
  68. logger.Infof("successfully added rbd-mirror peer token for pool %q", poolName)
  69. return nil
  70. }
  71. // CreateRBDMirrorBootstrapPeer add a mirror peer in the rbd-mirror configuration
  72. func CreateRBDMirrorBootstrapPeer(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) ([]byte, error) {
  73. logger.Infof("create rbd-mirror bootstrap peer token for pool %q", poolName)
  74. // Build command
  75. args := []string{"mirror", "pool", "peer", "bootstrap", "create", poolName}
  76. cmd := NewRBDCommand(context, clusterInfo, args)
  77. // Run command
  78. output, err := cmd.Run()
  79. if err != nil {
  80. return nil, errors.Wrapf(err, "failed to create rbd-mirror peer token for pool %q. %s", poolName, output)
  81. }
  82. logger.Infof("successfully created rbd-mirror bootstrap peer token for pool %q", poolName)
  83. return output, nil
  84. }
  85. // enablePoolMirroring turns on mirroring on that pool by specifying the mirroring type
  86. func enablePoolMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error {
  87. logger.Infof("enabling mirroring type %q for pool %q", pool.Mirroring.Mode, pool.Name)
  88. // Build command
  89. args := []string{"mirror", "pool", "enable", pool.Name, pool.Mirroring.Mode}
  90. cmd := NewRBDCommand(context, clusterInfo, args)
  91. // Run command
  92. output, err := cmd.Run()
  93. if err != nil {
  94. return errors.Wrapf(err, "failed to enable mirroring type %q for pool %q. %s", pool.Mirroring.Mode, pool.Name, output)
  95. }
  96. return nil
  97. }
  98. // disablePoolMirroring turns off mirroring on a pool
  99. func disablePoolMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) error {
  100. logger.Infof("disabling mirroring for pool %q", poolName)
  101. // Build command
  102. args := []string{"mirror", "pool", "disable", poolName}
  103. cmd := NewRBDCommand(context, clusterInfo, args)
  104. // Run command
  105. output, err := cmd.Run()
  106. if err != nil {
  107. return errors.Wrapf(err, "failed to disable mirroring for pool %q. %s", poolName, output)
  108. }
  109. return nil
  110. }
  111. func removeClusterPeer(context *clusterd.Context, clusterInfo *ClusterInfo, poolName, peerUUID string) error {
  112. logger.Infof("removing cluster peer with UUID %q for the pool %q", peerUUID, poolName)
  113. // Build command
  114. args := []string{"mirror", "pool", "peer", "remove", poolName, peerUUID}
  115. cmd := NewRBDCommand(context, clusterInfo, args)
  116. // Run command
  117. output, err := cmd.Run()
  118. if err != nil {
  119. return errors.Wrapf(err, "failed to remove cluster peer with UUID %q for the pool %q. %s", peerUUID, poolName, output)
  120. }
  121. return nil
  122. }
  123. // GetPoolMirroringStatus prints the pool mirroring status
  124. func GetPoolMirroringStatus(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) (*cephv1.PoolMirroringStatus, error) {
  125. logger.Debugf("retrieving mirroring pool %q status", poolName)
  126. // Build command
  127. args := []string{"mirror", "pool", "status", poolName}
  128. cmd := NewRBDCommand(context, clusterInfo, args)
  129. cmd.JsonOutput = true
  130. // Run command
  131. buf, err := cmd.Run()
  132. if err != nil {
  133. return nil, errors.Wrapf(err, "failed to retrieve mirroring pool %q status", poolName)
  134. }
  135. var poolMirroringStatus cephv1.PoolMirroringStatus
  136. if err := json.Unmarshal([]byte(buf), &poolMirroringStatus); err != nil {
  137. return nil, errors.Wrap(err, "failed to unmarshal mirror pool status response")
  138. }
  139. return &poolMirroringStatus, nil
  140. }
  141. // GetPoolMirroringInfo prints the pool mirroring information
  142. func GetPoolMirroringInfo(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) (*cephv1.PoolMirroringInfo, error) {
  143. logger.Debugf("retrieving mirroring pool %q info", poolName)
  144. // Build command
  145. args := []string{"mirror", "pool", "info", poolName}
  146. cmd := NewRBDCommand(context, clusterInfo, args)
  147. cmd.JsonOutput = true
  148. // Run command
  149. buf, err := cmd.Run()
  150. if err != nil {
  151. return nil, errors.Wrapf(err, "failed to retrieve mirroring pool %q info. %s", poolName, string(buf))
  152. }
  153. // Unmarshal JSON into Go struct
  154. var poolMirroringInfo cephv1.PoolMirroringInfo
  155. if err := json.Unmarshal(buf, &poolMirroringInfo); err != nil {
  156. return nil, errors.Wrap(err, "failed to unmarshal mirror pool info response")
  157. }
  158. return &poolMirroringInfo, nil
  159. }
  160. // enableSnapshotSchedule configures the snapshots schedule on a mirrored pool
  161. func enableSnapshotSchedule(context *clusterd.Context, clusterInfo *ClusterInfo, snapSpec cephv1.SnapshotScheduleSpec, poolName string) error {
  162. logger.Infof("enabling snapshot schedule for pool %q", poolName)
  163. // Build command
  164. args := []string{"mirror", "snapshot", "schedule", "add", "--pool", poolName, snapSpec.Interval}
  165. // If a start time is defined let's add it
  166. if snapSpec.StartTime != "" {
  167. args = append(args, snapSpec.StartTime)
  168. }
  169. cmd := NewRBDCommand(context, clusterInfo, args)
  170. // Run command
  171. buf, err := cmd.Run()
  172. if err != nil {
  173. return errors.Wrapf(err, "failed to enable snapshot schedule on pool %q. %s", poolName, string(buf))
  174. }
  175. logger.Infof("successfully enabled snapshot schedule for pool %q every %q", poolName, snapSpec.Interval)
  176. return nil
  177. }
  178. // removeSnapshotSchedule removes the snapshots schedule on a mirrored pool
  179. func removeSnapshotSchedule(context *clusterd.Context, clusterInfo *ClusterInfo, snapScheduleResponse cephv1.SnapshotSchedule, poolName string) error {
  180. logger.Debugf("removing snapshot schedule for pool %q (before adding new ones)", poolName)
  181. // Build command
  182. args := []string{"mirror", "snapshot", "schedule", "remove", "--pool", poolName, snapScheduleResponse.Interval}
  183. // If a start time is defined let's add it
  184. if snapScheduleResponse.StartTime != "" {
  185. args = append(args, snapScheduleResponse.StartTime)
  186. }
  187. cmd := NewRBDCommand(context, clusterInfo, args)
  188. // Run command
  189. buf, err := cmd.Run()
  190. if err != nil {
  191. return errors.Wrapf(err, "failed to remove snapshot schedule on pool %q. %s", poolName, string(buf))
  192. }
  193. logger.Infof("successfully removed snapshot schedule %q for pool %q", poolName, snapScheduleResponse.Interval)
  194. return nil
  195. }
  196. func enableSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error {
  197. logger.Info("resetting current snapshot schedules")
  198. // Reset any existing schedules
  199. err := removeSnapshotSchedules(context, clusterInfo, pool)
  200. if err != nil {
  201. logger.Errorf("failed to remove snapshot schedules. %v", err)
  202. }
  203. // Enable all the snap schedules
  204. for _, snapSchedule := range pool.Mirroring.SnapshotSchedules {
  205. err := enableSnapshotSchedule(context, clusterInfo, snapSchedule, pool.Name)
  206. if err != nil {
  207. return errors.Wrap(err, "failed to enable snapshot schedule")
  208. }
  209. }
  210. return nil
  211. }
  212. // removeSnapshotSchedules removes all the existing snapshot schedules
  213. func removeSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error {
  214. // Get the list of existing snapshot schedule
  215. existingSnapshotSchedules, err := listSnapshotSchedules(context, clusterInfo, pool.Name)
  216. if err != nil {
  217. return errors.Wrap(err, "failed to list snapshot schedule(s)")
  218. }
  219. // Remove each schedule
  220. for _, existingSnapshotSchedule := range existingSnapshotSchedules {
  221. err := removeSnapshotSchedule(context, clusterInfo, existingSnapshotSchedule, pool.Name)
  222. if err != nil {
  223. return errors.Wrapf(err, "failed to remove snapshot schedule %v", existingSnapshotSchedule)
  224. }
  225. }
  226. return nil
  227. }
  228. // listSnapshotSchedules configures the snapshots schedule on a mirrored pool
  229. func listSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) ([]cephv1.SnapshotSchedule, error) {
  230. // Build command
  231. args := []string{"mirror", "snapshot", "schedule", "ls", "--pool", poolName}
  232. cmd := NewRBDCommand(context, clusterInfo, args)
  233. cmd.JsonOutput = true
  234. // Run command
  235. buf, err := cmd.Run()
  236. if err != nil {
  237. return nil, errors.Wrapf(err, "failed to retrieve snapshot schedules on pool %q. %s", poolName, string(buf))
  238. }
  239. // Unmarshal JSON into Go struct
  240. var snapshotSchedules []cephv1.SnapshotSchedule
  241. if err := json.Unmarshal([]byte(buf), &snapshotSchedules); err != nil {
  242. return nil, errors.Wrap(err, "failed to unmarshal mirror snapshot schedule list response")
  243. }
  244. logger.Debugf("successfully listed snapshot schedules for pool %q", poolName)
  245. return snapshotSchedules, nil
  246. }
  247. // ListSnapshotSchedulesRecursively configures the snapshots schedule on a mirrored pool
  248. func ListSnapshotSchedulesRecursively(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) ([]cephv1.SnapshotSchedulesSpec, error) {
  249. // Build command
  250. args := []string{"mirror", "snapshot", "schedule", "ls", "--pool", poolName, "--recursive"}
  251. cmd := NewRBDCommand(context, clusterInfo, args)
  252. cmd.JsonOutput = true
  253. // Run command
  254. buf, err := cmd.Run()
  255. if err != nil {
  256. return nil, errors.Wrapf(err, "failed to retrieve snapshot schedules recursively on pool %q. %s", poolName, string(buf))
  257. }
  258. // Unmarshal JSON into Go struct
  259. var snapshotSchedulesRecursive []cephv1.SnapshotSchedulesSpec
  260. if err := json.Unmarshal([]byte(buf), &snapshotSchedulesRecursive); err != nil {
  261. return nil, errors.Wrap(err, "failed to unmarshal mirror snapshot schedule list recursive response")
  262. }
  263. logger.Debugf("successfully recursively listed snapshot schedules for pool %q", poolName)
  264. return snapshotSchedulesRecursive, nil
  265. }
  266. /*
  267. CreateRBDMirrorBootstrapPeerWithoutPool creates a bootstrap peer for the current cluster
  268. It creates the cephx user for the remote cluster to use with all the necessary details
  269. This function is handy on scenarios where no pools have been created yet but replication communication is required (connecting peers)
  270. It essentially sits above CreateRBDMirrorBootstrapPeer()
  271. and is a cluster-wide option in the scenario where all the pools will be mirrored to the same remote cluster
  272. So the scenario looks like:
  273. 1. Create the cephx ID on the source cluster
  274. 2. Enable a source pool for mirroring - at any time, we just don't know when
  275. rbd --cluster site-a mirror pool enable image-pool image
  276. 3. Copy the key details over to the other cluster (non-ceph workflow)
  277. 4. Enable destination pool for mirroring
  278. rbd --cluster site-b mirror pool enable image-pool image
  279. 5. Add the peer details to the destination pool
  280. 6. Repeat the steps flipping source and destination to enable
  281. bi-directional mirroring
  282. */
  283. func CreateRBDMirrorBootstrapPeerWithoutPool(context *clusterd.Context, clusterInfo *ClusterInfo) ([]byte, error) {
  284. fullClientName := getQualifiedUser(rbdMirrorPeerKeyringID)
  285. logger.Infof("create rbd-mirror bootstrap peer token %q", fullClientName)
  286. key, err := AuthGetOrCreateKey(context, clusterInfo, fullClientName, rbdMirrorPeerCaps)
  287. if err != nil {
  288. return nil, errors.Wrapf(err, "failed to create rbd-mirror peer key %q", fullClientName)
  289. }
  290. logger.Infof("successfully created rbd-mirror bootstrap peer token for cluster %q", clusterInfo.NamespacedName().Name)
  291. mons := sets.New[string]()
  292. for _, mon := range clusterInfo.Monitors {
  293. mons.Insert(mon.Endpoint)
  294. }
  295. peerToken := PeerToken{
  296. ClusterFSID: clusterInfo.FSID,
  297. ClientID: rbdMirrorPeerKeyringID,
  298. Key: key,
  299. MonHost: strings.Join(sets.List(mons), ","),
  300. Namespace: clusterInfo.Namespace,
  301. }
  302. // Marshal the Go type back to JSON
  303. decodedTokenBackToJSON, err := json.Marshal(peerToken)
  304. if err != nil {
  305. return nil, errors.Wrap(err, "failed to encode peer token to json")
  306. }
  307. // Return the base64 encoded token
  308. return []byte(base64.StdEncoding.EncodeToString(decodedTokenBackToJSON)), nil
  309. }