filesystem.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. /*
  2. Copyright 2016 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. ctx "context"
  16. "encoding/json"
  17. "fmt"
  18. "strconv"
  19. "syscall"
  20. "time"
  21. "github.com/pkg/errors"
  22. "github.com/rook/rook/pkg/clusterd"
  23. "github.com/rook/rook/pkg/util/exec"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. )
  26. type MDSDump struct {
  27. Standbys []MDSStandBy `json:"standbys"`
  28. FileSystems []MDSMap `json:"filesystems"`
  29. }
  30. type MDSStandBy struct {
  31. Name string `json:"name"`
  32. Rank int `json:"rank"`
  33. }
  34. // CephFilesystem is a representation of the json structure returned by 'ceph fs ls'
  35. type CephFilesystem struct {
  36. Name string `json:"name"`
  37. MetadataPool string `json:"metadata_pool"`
  38. MetadataPoolID int `json:"metadata_pool_id"`
  39. DataPools []string `json:"data_pools"`
  40. DataPoolIDs []int `json:"data_pool_ids"`
  41. }
  42. // CephFilesystemDetails is a representation of the main json structure returned by 'ceph fs get'
  43. type CephFilesystemDetails struct {
  44. ID int `json:"id"`
  45. MDSMap MDSMap `json:"mdsmap"`
  46. }
  47. // MDSMap is a representation of the mds map sub-structure returned by 'ceph fs get'
  48. type MDSMap struct {
  49. FilesystemName string `json:"fs_name"`
  50. Enabled bool `json:"enabled"`
  51. Root int `json:"root"`
  52. TableServer int `json:"tableserver"`
  53. MaxMDS int `json:"max_mds"`
  54. In []int `json:"in"`
  55. Up map[string]int `json:"up"`
  56. MetadataPool int `json:"metadata_pool"`
  57. DataPools []int `json:"data_pools"`
  58. Failed []int `json:"failed"`
  59. Damaged []int `json:"damaged"`
  60. Stopped []int `json:"stopped"`
  61. Info map[string]MDSInfo `json:"info"`
  62. }
  63. // MDSInfo is a representation of the individual mds daemon sub-sub-structure returned by 'ceph fs get'
  64. type MDSInfo struct {
  65. GID int `json:"gid"`
  66. Name string `json:"name"`
  67. Rank int `json:"rank"`
  68. State string `json:"state"`
  69. Address string `json:"addr"`
  70. }
  71. // ListFilesystems lists all filesystems provided by the Ceph cluster.
  72. func ListFilesystems(context *clusterd.Context, clusterInfo *ClusterInfo) ([]CephFilesystem, error) {
  73. args := []string{"fs", "ls"}
  74. buf, err := NewCephCommand(context, clusterInfo, args).Run()
  75. if err != nil {
  76. return nil, errors.Wrap(err, "failed to list filesystems")
  77. }
  78. var filesystems []CephFilesystem
  79. err = json.Unmarshal(buf, &filesystems)
  80. if err != nil {
  81. return nil, errors.Wrapf(err, "unmarshal failed raw buffer response %s", string(buf))
  82. }
  83. return filesystems, nil
  84. }
  85. var GetFilesystem = getFilesystem
  86. // getFilesystem gets detailed status information about a Ceph filesystem.
  87. func getFilesystem(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) (*CephFilesystemDetails, error) {
  88. args := []string{"fs", "get", fsName}
  89. buf, err := NewCephCommand(context, clusterInfo, args).Run()
  90. if err != nil {
  91. return nil, err
  92. }
  93. var fs CephFilesystemDetails
  94. err = json.Unmarshal(buf, &fs)
  95. if err != nil {
  96. return nil, errors.Wrapf(err, "unmarshal failed raw buffer response %s", string(buf))
  97. }
  98. return &fs, nil
  99. }
  100. // AllowStandbyReplay gets detailed status information about a Ceph filesystem.
  101. func AllowStandbyReplay(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string, allowStandbyReplay bool) error {
  102. logger.Infof("setting allow_standby_replay to %t for filesystem %q", allowStandbyReplay, fsName)
  103. args := []string{"fs", "set", fsName, "allow_standby_replay", strconv.FormatBool(allowStandbyReplay)}
  104. _, err := NewCephCommand(context, clusterInfo, args).Run()
  105. if err != nil {
  106. return errors.Wrapf(err, "failed to set allow_standby_replay to filesystem %s", fsName)
  107. }
  108. return nil
  109. }
  110. // CreateFilesystem performs software configuration steps for Ceph to provide a new filesystem.
  111. func CreateFilesystem(context *clusterd.Context, clusterInfo *ClusterInfo, name, metadataPool string, dataPools []string) error {
  112. if len(dataPools) == 0 {
  113. return errors.New("at least one data pool is required")
  114. }
  115. logger.Infof("creating filesystem %q with metadata pool %q and data pools %v", name, metadataPool, dataPools)
  116. var err error
  117. // enable multiple file systems in case this is not the first
  118. args := []string{"fs", "flag", "set", "enable_multiple", "true", confirmFlag}
  119. _, err = NewCephCommand(context, clusterInfo, args).Run()
  120. if err != nil {
  121. return errors.Wrap(err, "failed to enable multiple file systems")
  122. }
  123. // create the filesystem
  124. args = []string{"fs", "new", name, metadataPool, dataPools[0]}
  125. _, err = NewCephCommand(context, clusterInfo, args).Run()
  126. if err != nil {
  127. return errors.Wrapf(err, "failed enabling ceph fs %q", name)
  128. }
  129. // add each additional pool
  130. for i := 1; i < len(dataPools); i++ {
  131. err = AddDataPoolToFilesystem(context, clusterInfo, name, dataPools[i])
  132. if err != nil {
  133. logger.Errorf("%v", err)
  134. }
  135. }
  136. return nil
  137. }
  138. // AddDataPoolToFilesystem associates the provided data pool with the filesystem.
  139. func AddDataPoolToFilesystem(context *clusterd.Context, clusterInfo *ClusterInfo, name, poolName string) error {
  140. args := []string{"fs", "add_data_pool", name, poolName}
  141. _, err := NewCephCommand(context, clusterInfo, args).Run()
  142. if err != nil {
  143. // Reef disallows calling add_data_pool for a pool that has already
  144. // been added, so ignore the error code.
  145. // Previous releases do not return an error when an existing data pool is added.
  146. if clusterInfo.CephVersion.IsAtLeastReef() {
  147. if code, ok := exec.ExitStatus(err); ok && code == int(syscall.EINVAL) {
  148. return nil
  149. }
  150. }
  151. return errors.Wrapf(err, "failed to add pool %q to file system %q. (%v)", poolName, name, err)
  152. }
  153. return nil
  154. }
  155. // SetNumMDSRanks sets the number of mds ranks (max_mds) for a Ceph filesystem.
  156. func SetNumMDSRanks(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string, activeMDSCount int32) error {
  157. // Always tell Ceph to set the new max_mds value
  158. args := []string{"fs", "set", fsName, "max_mds", strconv.Itoa(int(activeMDSCount))}
  159. if _, err := NewCephCommand(context, clusterInfo, args).Run(); err != nil {
  160. return errors.Wrapf(err, "failed to set filesystem %s num mds ranks (max_mds) to %d", fsName, activeMDSCount)
  161. }
  162. return nil
  163. }
  164. // FailAllStandbyReplayMDS: fail all mds in up:standby-replay state
  165. func FailAllStandbyReplayMDS(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) error {
  166. fs, err := getFilesystem(context, clusterInfo, fsName)
  167. if err != nil {
  168. return errors.Wrapf(err, "failed to fail standby-replay MDSes for fs %q", fsName)
  169. }
  170. for _, info := range fs.MDSMap.Info {
  171. if info.State == "up:standby-replay" {
  172. if err := failMDS(context, clusterInfo, info.GID); err != nil {
  173. return errors.Wrapf(err, "failed to fail MDS %q for filesystem %q in up:standby-replay state", info.Name, fsName)
  174. }
  175. }
  176. }
  177. return nil
  178. }
  179. // GetMdsIdByRank get mds ID from the given rank
  180. func GetMdsIdByRank(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string, rank int32) (string, error) {
  181. fs, err := getFilesystem(context, clusterInfo, fsName)
  182. if err != nil {
  183. return "", errors.Wrap(err, "failed to get ceph fs dump")
  184. }
  185. gid, ok := fs.MDSMap.Up[fmt.Sprintf("mds_%d", rank)]
  186. if !ok {
  187. return "", errors.Errorf("failed to get mds gid from rank %d", rank)
  188. }
  189. info, ok := fs.MDSMap.Info[fmt.Sprintf("gid_%d", gid)]
  190. if !ok {
  191. return "", errors.Errorf("failed to get mds info for rank %d", rank)
  192. }
  193. return info.Name, nil
  194. }
  195. // WaitForActiveRanks waits for the filesystem's number of active ranks to equal the desired count.
  196. // It times out with an error if the number of active ranks does not become desired in time.
  197. // Param 'moreIsOkay' will allow success condition if num of ranks is more than active count given.
  198. func WaitForActiveRanks(
  199. context *clusterd.Context,
  200. clusterInfo *ClusterInfo, fsName string,
  201. desiredActiveRanks int32, moreIsOkay bool, timeout time.Duration,
  202. ) error {
  203. countText := fmt.Sprintf("%d", desiredActiveRanks)
  204. if moreIsOkay {
  205. // If it's okay to have more active ranks than desired, indicate so in log messages
  206. countText = fmt.Sprintf("%d or more", desiredActiveRanks)
  207. }
  208. logger.Infof("waiting %.2f second(s) for number of active mds daemons for fs %s to become %s",
  209. float64(timeout/time.Second), fsName, countText)
  210. err := wait.PollUntilContextTimeout(clusterInfo.Context, 3*time.Second, timeout, true, func(ctx ctx.Context) (bool, error) {
  211. fs, err := getFilesystem(context, clusterInfo, fsName)
  212. if err != nil {
  213. logger.Errorf(
  214. "Error getting filesystem %q details while waiting for num mds ranks to become %d. %v",
  215. fsName, desiredActiveRanks, err)
  216. } else if fs.MDSMap.MaxMDS == int(desiredActiveRanks) &&
  217. activeRanksSuccess(len(fs.MDSMap.Up), int(desiredActiveRanks), moreIsOkay) {
  218. // Both max_mds and number of up MDS daemons must equal desired number of ranks to
  219. // prevent a false positive when Ceph has got the correct number of mdses up but is
  220. // trying to change the number of mdses up to an undesired number.
  221. logger.Debugf("mds ranks for filesystem %q successfully became %d", fsName, desiredActiveRanks)
  222. return true, nil
  223. // continue to inf loop after send ready; only return when get quit signal to
  224. // prevent deadlock
  225. }
  226. return false, nil
  227. })
  228. if err != nil {
  229. return errors.Errorf("timeout waiting for number active mds daemons for filesystem %q to become %q",
  230. fsName, countText)
  231. }
  232. return nil
  233. }
  234. func activeRanksSuccess(upCount, desiredRanks int, moreIsOkay bool) bool {
  235. if moreIsOkay {
  236. return upCount >= desiredRanks
  237. }
  238. return upCount == desiredRanks
  239. }
  240. // MarkFilesystemAsDown marks a Ceph filesystem as down.
  241. func MarkFilesystemAsDown(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) error {
  242. args := []string{"fs", "set", fsName, "cluster_down", "true"}
  243. _, err := NewCephCommand(context, clusterInfo, args).Run()
  244. if err != nil {
  245. return errors.Wrapf(err, "failed to set file system %s to cluster_down", fsName)
  246. }
  247. return nil
  248. }
  249. // failMDS instructs Ceph to fail an mds daemon.
  250. func failMDS(context *clusterd.Context, clusterInfo *ClusterInfo, gid int) error {
  251. args := []string{"mds", "fail", strconv.Itoa(gid)}
  252. _, err := NewCephCommand(context, clusterInfo, args).Run()
  253. if err != nil {
  254. return errors.Wrapf(err, "failed to fail mds %d", gid)
  255. }
  256. return nil
  257. }
  258. // FailFilesystem efficiently brings down the filesystem by marking the filesystem as down
  259. // and failing the MDSes using a single Ceph command.
  260. func FailFilesystem(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) error {
  261. args := []string{"fs", "fail", fsName}
  262. _, err := NewCephCommand(context, clusterInfo, args).Run()
  263. if err != nil {
  264. return errors.Wrapf(err, "failed to fail filesystem %s", fsName)
  265. }
  266. return nil
  267. }
  268. // RemoveFilesystem performs software configuration steps to remove a Ceph filesystem and its
  269. // backing pools.
  270. func RemoveFilesystem(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string, preservePoolsOnDelete bool) error {
  271. fs, err := getFilesystem(context, clusterInfo, fsName)
  272. if err != nil {
  273. return errors.Wrapf(err, "filesystem %s not found", fsName)
  274. }
  275. args := []string{"fs", "rm", fsName, confirmFlag}
  276. _, err = NewCephCommand(context, clusterInfo, args).Run()
  277. if err != nil {
  278. return errors.Wrapf(err, "Failed to delete ceph fs %s", fsName)
  279. }
  280. if !preservePoolsOnDelete {
  281. err = deleteFSPools(context, clusterInfo, fs)
  282. if err != nil {
  283. return errors.Wrapf(err, "failed to delete fs %s pools", fsName)
  284. }
  285. } else {
  286. logger.Infof("PreservePoolsOnDelete is set in filesystem %s. Pools not deleted", fsName)
  287. }
  288. return nil
  289. }
  290. func deleteFSPools(context *clusterd.Context, clusterInfo *ClusterInfo, fs *CephFilesystemDetails) error {
  291. poolNames, err := GetPoolNamesByID(context, clusterInfo)
  292. if err != nil {
  293. return errors.Wrap(err, "failed to get pool names")
  294. }
  295. var lastErr error = nil
  296. // delete the metadata pool
  297. if err := deleteFSPool(context, clusterInfo, poolNames, fs.MDSMap.MetadataPool); err != nil {
  298. lastErr = err
  299. }
  300. // delete the data pools
  301. for _, poolID := range fs.MDSMap.DataPools {
  302. if err := deleteFSPool(context, clusterInfo, poolNames, poolID); err != nil {
  303. lastErr = err
  304. }
  305. }
  306. return lastErr
  307. }
  308. func deleteFSPool(context *clusterd.Context, clusterInfo *ClusterInfo, poolNames map[int]string, id int) error {
  309. name, ok := poolNames[id]
  310. if !ok {
  311. return errors.Errorf("pool %d not found", id)
  312. }
  313. return DeletePool(context, clusterInfo, name)
  314. }
  315. // WaitForNoStandbys waits for all standbys go away
  316. func WaitForNoStandbys(context *clusterd.Context, clusterInfo *ClusterInfo, timeout time.Duration) error {
  317. err := wait.PollUntilContextTimeout(clusterInfo.Context, 3*time.Second, timeout, true, func(ctx ctx.Context) (bool, error) {
  318. mdsDump, err := GetMDSDump(context, clusterInfo)
  319. if err != nil {
  320. logger.Errorf("failed to get fs dump. %v", err)
  321. return false, nil
  322. }
  323. return len(mdsDump.Standbys) == 0, nil
  324. })
  325. if err != nil {
  326. return errors.Wrap(err, "timeout waiting for no standbys")
  327. }
  328. return nil
  329. }
  330. func GetMDSDump(context *clusterd.Context, clusterInfo *ClusterInfo) (*MDSDump, error) {
  331. args := []string{"fs", "dump"}
  332. cmd := NewCephCommand(context, clusterInfo, args)
  333. buf, err := cmd.Run()
  334. if err != nil {
  335. return nil, errors.Wrapf(err, "failed to dump fs info")
  336. }
  337. var dump MDSDump
  338. if err := json.Unmarshal(buf, &dump); err != nil {
  339. return nil, errors.Wrapf(err, "failed to unmarshal fs dump. %s", buf)
  340. }
  341. return &dump, nil
  342. }
  343. // SubvolumeGroup is a representation of a Ceph filesystem subvolume group.
  344. type SubvolumeGroup struct {
  345. Name string `json:"name"`
  346. }
  347. // SubvolumeGroupList is the representation Ceph returns when listing Ceph filesystem subvolume groups.
  348. type SubvolumeGroupList []SubvolumeGroup
  349. // ListSubvolumeGroups lists all subvolume groups in a given filesystem by name.
  350. // Times out after 5 seconds.
  351. var ListSubvolumeGroups = listSubvolumeGroups
  352. // with above, allow this to be overridden for unit testing
  353. func listSubvolumeGroups(context *clusterd.Context, clusterInfo *ClusterInfo, fsName string) (SubvolumeGroupList, error) {
  354. svgs := SubvolumeGroupList{}
  355. args := []string{"fs", "subvolumegroup", "ls", fsName}
  356. cmd := NewCephCommand(context, clusterInfo, args)
  357. buf, err := cmd.RunWithTimeout(exec.CephCommandsTimeout)
  358. if err != nil {
  359. return svgs, errors.Wrapf(err, "failed to list subvolumegroups in filesystem %q", fsName)
  360. }
  361. if err := json.Unmarshal(buf, &svgs); err != nil {
  362. return svgs, errors.Wrapf(err, "failed to unmarshal subvolumegroup list for filesystem %q", fsName)
  363. }
  364. return svgs, nil
  365. }
  366. // Subvolume is a representation of a Ceph filesystem subvolume.
  367. type Subvolume struct {
  368. Name string `json:"name"`
  369. }
  370. // SubvolumeList is the representation Ceph returns when listing Ceph filesystem subvolumes.
  371. type SubvolumeList []Subvolume
  372. // NoSubvolumeGroup can be passed to commands that operate on filesystem subvolume groups to
  373. // indicate the case where there is no subvolume group.
  374. const NoSubvolumeGroup = ""
  375. // ListSubvolumesInGroup lists all subvolumes present in the given filesystem's subvolume group by
  376. // name. If groupName is empty, list subvolumes that are not in any group. Times out after 5 seconds.
  377. var ListSubvolumesInGroup = listSubvolumesInGroup
  378. // with above, allow this to be overridden for unit testing
  379. func listSubvolumesInGroup(context *clusterd.Context, clusterInfo *ClusterInfo, fsName, groupName string) (SubvolumeList, error) {
  380. svs := SubvolumeList{}
  381. args := []string{"fs", "subvolume", "ls", fsName}
  382. if groupName != NoSubvolumeGroup {
  383. args = append(args, groupName)
  384. }
  385. cmd := NewCephCommand(context, clusterInfo, args)
  386. // if the command takes a long time to run, this is a good indication that there are *many*
  387. // subvolumes present
  388. buf, err := cmd.RunWithTimeout(exec.CephCommandsTimeout)
  389. if err != nil {
  390. return svs, errors.Wrapf(err, "failed to list subvolumes in filesystem %q subvolume group %q", fsName, groupName)
  391. }
  392. if err := json.Unmarshal(buf, &svs); err != nil {
  393. return svs, errors.Wrapf(err, "failed to unmarshal subvolume list for filesystem %q subvolume group %q", fsName, groupName)
  394. }
  395. return svs, nil
  396. }