filesystem_mirror.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. Copyright 2021 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "strings"
  18. "syscall"
  19. "github.com/pkg/errors"
  20. cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
  21. "github.com/rook/rook/pkg/clusterd"
  22. cephver "github.com/rook/rook/pkg/operator/ceph/version"
  23. "github.com/rook/rook/pkg/util/exec"
  24. )
  25. type BootstrapPeerToken struct {
  26. Token string `json:"token"`
  27. }
  28. // RemoveFilesystemMirrorPeer add a mirror peer in the cephfs-mirror configuration
  29. func RemoveFilesystemMirrorPeer(context *clusterd.Context, clusterInfo *ClusterInfo, peerUUID string) error {
  30. logger.Infof("removing cephfs-mirror peer %q", peerUUID)
  31. // Build command
  32. args := []string{"fs", "snapshot", "mirror", "peer_remove", peerUUID}
  33. cmd := NewCephCommand(context, clusterInfo, args)
  34. // Run command
  35. output, err := cmd.Run()
  36. if err != nil {
  37. return errors.Wrapf(err, "failed to remove cephfs-mirror peer for filesystem %q. %s", peerUUID, output)
  38. }
  39. logger.Infof("successfully removed cephfs-mirror peer %q", peerUUID)
  40. return nil
  41. }
  42. // EnableFilesystemSnapshotMirror enables filesystem snapshot mirroring
  43. func EnableFilesystemSnapshotMirror(context *clusterd.Context, clusterInfo *ClusterInfo, filesystem string) error {
  44. logger.Infof("enabling ceph filesystem snapshot mirror for filesystem %q", filesystem)
  45. // Build command
  46. args := []string{"fs", "snapshot", "mirror", "enable", filesystem}
  47. cmd := NewCephCommand(context, clusterInfo, args)
  48. // Run command
  49. output, err := cmd.Run()
  50. if err != nil {
  51. return errors.Wrapf(err, "failed to enable ceph filesystem snapshot mirror for filesystem %q. %s", filesystem, output)
  52. }
  53. logger.Infof("successfully enabled ceph filesystem snapshot mirror for filesystem %q", filesystem)
  54. return nil
  55. }
  56. // DisableFilesystemSnapshotMirror enables filesystem snapshot mirroring
  57. func DisableFilesystemSnapshotMirror(context *clusterd.Context, clusterInfo *ClusterInfo, filesystem string) error {
  58. logger.Infof("disabling ceph filesystem snapshot mirror for filesystem %q", filesystem)
  59. // Build command
  60. args := []string{"fs", "snapshot", "mirror", "disable", filesystem}
  61. cmd := NewCephCommand(context, clusterInfo, args)
  62. // Run command
  63. output, err := cmd.Run()
  64. if err != nil {
  65. if code, err := exec.ExtractExitCode(err); err == nil && code == int(syscall.ENOTSUP) {
  66. logger.Debug("filesystem mirroring is not enabled, nothing to disable")
  67. return nil
  68. }
  69. return errors.Wrapf(err, "failed to disable ceph filesystem snapshot mirror for filesystem %q. %s", filesystem, output)
  70. }
  71. logger.Infof("successfully disabled ceph filesystem snapshot mirror for filesystem %q", filesystem)
  72. return nil
  73. }
  74. func AddSnapshotSchedule(context *clusterd.Context, clusterInfo *ClusterInfo, path, interval, startTime, filesystem string) error {
  75. logger.Infof("adding snapshot schedule every %q to ceph filesystem %q on path %q", interval, filesystem, path)
  76. args := []string{"fs", "snap-schedule", "add", path, interval}
  77. if startTime != "" {
  78. args = append(args, startTime)
  79. }
  80. args = append(args, fmt.Sprintf("fs=%s", filesystem))
  81. cmd := NewCephCommand(context, clusterInfo, args)
  82. cmd.JsonOutput = false
  83. // Example command: "ceph fs snap-schedule add / 4d fs=myfs2"
  84. // CHANGE time for "2014-01-09T21:48:00" IF interval
  85. // Run command
  86. output, err := cmd.Run()
  87. if err != nil {
  88. if code, ok := exec.ExitStatus(err); ok && code != int(syscall.EEXIST) {
  89. return errors.Wrapf(err, "failed to add snapshot schedule every %q to ceph filesystem %q on path %q. %s", interval, filesystem, path, output)
  90. }
  91. }
  92. logger.Infof("successfully added snapshot schedule every %q to ceph filesystem %q on path %q", interval, filesystem, path)
  93. return nil
  94. }
  95. func AddSnapshotScheduleRetention(context *clusterd.Context, clusterInfo *ClusterInfo, path, duration, filesystem string) error {
  96. logger.Infof("adding snapshot schedule retention %s to ceph filesystem %q on path %q", duration, filesystem, path)
  97. // Example command: "ceph fs snap-schedule retention add / d 1 fs=myfs2"
  98. args := []string{"fs", "snap-schedule", "retention", "add", path, duration, fmt.Sprintf("fs=%s", filesystem)}
  99. cmd := NewCephCommand(context, clusterInfo, args)
  100. cmd.JsonOutput = false
  101. // Run command
  102. output, err := cmd.Run()
  103. if err != nil {
  104. if code, ok := exec.ExitStatus(err); ok && code == int(syscall.ENOENT) {
  105. logger.Warningf("snapshot schedule retention %s already exists for filesystem %q on path %q. %s", duration, filesystem, path, output)
  106. } else {
  107. return errors.Wrapf(err, "failed to add snapshot schedule retention %s to ceph filesystem %q on path %q. %s", duration, filesystem, path, output)
  108. }
  109. }
  110. logger.Infof("successfully added snapshot schedule retention %s to ceph filesystem %q on path %q", duration, filesystem, path)
  111. return nil
  112. }
  113. func GetSnapshotScheduleStatus(context *clusterd.Context, clusterInfo *ClusterInfo, filesystem string) ([]cephv1.FilesystemSnapshotSchedulesSpec, error) {
  114. logger.Infof("retrieving snapshot schedule status for ceph filesystem %q", filesystem)
  115. args := []string{"fs", "snap-schedule", "status", "/", "recursive=true", fmt.Sprintf("--fs=%s", filesystem)}
  116. cmd := NewCephCommand(context, clusterInfo, args)
  117. // Run command
  118. output, err := cmd.Run()
  119. if err != nil {
  120. return nil, errors.Wrapf(err, "failed to retrieve snapshot schedule status for ceph filesystem %q. %s", filesystem, output)
  121. }
  122. // Unmarshal JSON into Go struct
  123. var filesystemSnapshotSchedulesStatusSpec []cephv1.FilesystemSnapshotSchedulesSpec
  124. /* Replace new line since the command outputs a new line first and breaks the json parsing...
  125. [root@rook-ceph-operator-75c6d6bbfc-wqlnc /]# ceph --connect-timeout=15 --cluster=rook-ceph --conf=/var/lib/rook/rook-ceph/rook-ceph.config --name=client.admin --keyring=/var/lib/rook/rook-ceph/client.admin.keyring --format json fs snap-schedule status /
  126. [{"fs": "myfs", "subvol": null, "path": "/", "rel_path": "/", "schedule": "24h", "retention": {"h": 24}, "start": "2021-07-01T00:00:00", "created": "2021-07-01T12:19:12", "first": null, "last": null, "last_pruned": null, "created_count": 0, "pruned_count": 0, "active": true},{"fs": "myfs", "subvol": null, "path": "/", "rel_path": "/", "schedule": "25h", "retention": {"h": 24}, "start": "2021-07-01T00:00:00", "created": "2021-07-01T12:31:25", "first": null, "last": null, "last_pruned": null, "created_count": 0, "pruned_count": 0, "active": true}]
  127. */
  128. if err := json.Unmarshal([]byte(strings.ReplaceAll(string(output), "\n", "")), &filesystemSnapshotSchedulesStatusSpec); err != nil {
  129. return nil, errors.Wrap(err, "failed to unmarshal filesystem mirror snapshot schedule status response")
  130. }
  131. logger.Infof("successfully retrieved snapshot schedule status for ceph filesystem %q", filesystem)
  132. return filesystemSnapshotSchedulesStatusSpec, nil
  133. }
  134. // ImportFSMirrorBootstrapPeer add a mirror peer in the cephfs-mirror configuration
  135. func ImportFSMirrorBootstrapPeer(context *clusterd.Context, clusterInfo *ClusterInfo, fsName, token string) error {
  136. logger.Infof("importing cephfs bootstrap peer token for filesystem %q", fsName)
  137. // Build command
  138. args := []string{"fs", "snapshot", "mirror", "peer_bootstrap", "import", fsName, strings.TrimSpace(token)}
  139. cmd := NewCephCommand(context, clusterInfo, args)
  140. cmd.JsonOutput = false
  141. cmd.combinedOutput = true
  142. // Run command
  143. output, err := cmd.Run()
  144. if err != nil {
  145. return errors.Wrapf(err, "failed to import cephfs-mirror peer token for filesystem %q. %s", fsName, output)
  146. }
  147. logger.Infof("successfully imported cephfs-mirror peer for filesystem %q", fsName)
  148. return nil
  149. }
  150. // CreateFSMirrorBootstrapPeer add a mirror peer in the cephfs-mirror configuration
  151. func CreateFSMirrorBootstrapPeer(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) ([]byte, error) {
  152. logger.Infof("create cephfs-mirror bootstrap peer token for filesystem %q", fsName)
  153. // Build command
  154. args := []string{"fs", "snapshot", "mirror", "peer_bootstrap", "create", fsName, "client.mirror", clusterInfo.FSID}
  155. cmd := NewCephCommand(context, clusterInfo, args)
  156. // Run command
  157. output, err := cmd.Run()
  158. if err != nil {
  159. return nil, errors.Wrapf(err, "failed to create cephfs-mirror peer token for filesystem %q. %s", fsName, output)
  160. }
  161. // Unmarshal JSON into Go struct
  162. var bootstrapPeerToken BootstrapPeerToken
  163. if err := json.Unmarshal(output, &bootstrapPeerToken); err != nil {
  164. return nil, errors.Wrapf(err, "failed to unmarshal cephfs-mirror peer token create response. %s", output)
  165. }
  166. logger.Infof("successfully created cephfs-mirror bootstrap peer token for filesystem %q", fsName)
  167. return []byte(bootstrapPeerToken.Token), nil
  168. }
  169. // GetFSMirrorDaemonStatus returns the mirroring status of a given filesystem
  170. func GetFSMirrorDaemonStatus(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) ([]cephv1.FilesystemMirroringInfo, error) {
  171. // Using Debug level since this is called in a recurrent go routine
  172. logger.Debugf("retrieving filesystem mirror status for filesystem %q", fsName)
  173. args := []string{"fs", "snapshot", "mirror", "daemon", "status"} // for Ceph v16.2.7 and above
  174. if !clusterInfo.CephVersion.IsAtLeast(cephver.CephVersion{Major: 16, Minor: 2, Extra: 7}) {
  175. // fs-name needed for Ceph v16.2.6 and earlier
  176. args = append(args, fsName)
  177. }
  178. // Build command
  179. cmd := NewCephCommand(context, clusterInfo, args)
  180. // Run command
  181. output, err := cmd.Run()
  182. if err != nil {
  183. return nil, errors.Wrapf(err, "failed to retrieve filesystem mirror status for filesystem %q. %s", fsName, output)
  184. }
  185. // Unmarshal JSON into Go struct
  186. var filesystemMirroringInfo []cephv1.FilesystemMirroringInfo
  187. if err := json.Unmarshal([]byte(output), &filesystemMirroringInfo); err != nil {
  188. return nil, errors.Wrapf(err, "failed to unmarshal filesystem mirror status response. %q.", string(output))
  189. }
  190. logger.Debugf("successfully retrieved filesystem mirror status for filesystem %q", fsName)
  191. return filesystemMirroringInfo, nil
  192. }