discover.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. /*
  2. Copyright 2018 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package discover to discover devices on storage nodes.
  14. package discover
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "os"
  20. "regexp"
  21. "strings"
  22. "time"
  23. "github.com/coreos/pkg/capnslog"
  24. cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
  25. "github.com/rook/rook/pkg/clusterd"
  26. discoverDaemon "github.com/rook/rook/pkg/daemon/discover"
  27. "github.com/rook/rook/pkg/operator/ceph/controller"
  28. k8sutil "github.com/rook/rook/pkg/operator/k8sutil"
  29. "github.com/rook/rook/pkg/util/sys"
  30. apps "k8s.io/api/apps/v1"
  31. v1 "k8s.io/api/core/v1"
  32. k8serrors "k8s.io/apimachinery/pkg/api/errors"
  33. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  34. "k8s.io/client-go/kubernetes"
  35. )
  36. const (
  37. discoverDaemonsetName = "rook-discover"
  38. discoverDaemonsetPriorityClassNameEnv = "DISCOVER_PRIORITY_CLASS_NAME"
  39. discoverDaemonsetTolerationEnv = "DISCOVER_TOLERATION"
  40. discoverDaemonsetTolerationKeyEnv = "DISCOVER_TOLERATION_KEY"
  41. discoverDaemonsetTolerationsEnv = "DISCOVER_TOLERATIONS"
  42. discoverDaemonSetNodeAffinityEnv = "DISCOVER_AGENT_NODE_AFFINITY"
  43. discoverDaemonSetPodLabelsEnv = "DISCOVER_AGENT_POD_LABELS"
  44. deviceInUseCMName = "local-device-in-use-cluster-%s-node-%s"
  45. deviceInUseAppName = "rook-claimed-devices"
  46. deviceInUseClusterAttr = "rook.io/cluster"
  47. discoverIntervalEnv = "ROOK_DISCOVER_DEVICES_INTERVAL"
  48. defaultDiscoverInterval = "60m"
  49. discoverDaemonResourcesEnv = "DISCOVER_DAEMON_RESOURCES"
  50. )
  51. var logger = capnslog.NewPackageLogger("github.com/rook/rook", "op-discover")
  52. // Discover reference to be deployed
  53. type Discover struct {
  54. clientset kubernetes.Interface
  55. }
  56. // New creates an instance of Discover
  57. func New(clientset kubernetes.Interface) *Discover {
  58. return &Discover{
  59. clientset: clientset,
  60. }
  61. }
  62. // Start the discover
  63. func (d *Discover) Start(ctx context.Context, namespace, discoverImage, securityAccount string, data map[string]string, useCephVolume bool) error {
  64. err := d.createDiscoverDaemonSet(ctx, namespace, discoverImage, securityAccount, data, useCephVolume)
  65. if err != nil {
  66. return fmt.Errorf("failed to start discover daemonset. %v", err)
  67. }
  68. return nil
  69. }
  70. func (d *Discover) createDiscoverDaemonSet(ctx context.Context, namespace, discoverImage, securityAccount string, data map[string]string, useCephVolume bool) error {
  71. discoveryInterval := k8sutil.GetValue(data, discoverIntervalEnv, defaultDiscoverInterval)
  72. discoveryParameters := []string{"discover",
  73. "--discover-interval", discoveryInterval}
  74. if useCephVolume {
  75. discoveryParameters = append(discoveryParameters, "--use-ceph-volume")
  76. }
  77. discoverDaemonResourcesRaw := k8sutil.GetValue(data, discoverDaemonResourcesEnv, "")
  78. discoverDaemonResources, err := k8sutil.YamlToContainerResource(discoverDaemonResourcesRaw)
  79. if err != nil {
  80. logger.Warningf("failed to parse.%s %v", discoverDaemonResourcesRaw, err)
  81. }
  82. ds := &apps.DaemonSet{
  83. ObjectMeta: metav1.ObjectMeta{
  84. Name: discoverDaemonsetName,
  85. Labels: getLabels(),
  86. },
  87. Spec: apps.DaemonSetSpec{
  88. Selector: &metav1.LabelSelector{
  89. MatchLabels: map[string]string{
  90. "app": discoverDaemonsetName,
  91. },
  92. },
  93. UpdateStrategy: apps.DaemonSetUpdateStrategy{
  94. Type: apps.RollingUpdateDaemonSetStrategyType,
  95. },
  96. Template: v1.PodTemplateSpec{
  97. ObjectMeta: metav1.ObjectMeta{
  98. Labels: getLabels(),
  99. },
  100. Spec: v1.PodSpec{
  101. ServiceAccountName: securityAccount,
  102. Containers: []v1.Container{
  103. {
  104. Name: discoverDaemonsetName,
  105. Image: discoverImage,
  106. Args: discoveryParameters,
  107. SecurityContext: controller.PrivilegedContext(true),
  108. VolumeMounts: []v1.VolumeMount{
  109. {
  110. Name: "dev",
  111. MountPath: "/dev",
  112. // discovery pod could fail to start if /dev is mounted ro
  113. ReadOnly: false,
  114. },
  115. {
  116. Name: "sys",
  117. MountPath: "/sys",
  118. ReadOnly: true,
  119. },
  120. {
  121. Name: "udev",
  122. MountPath: "/run/udev",
  123. ReadOnly: true,
  124. },
  125. },
  126. Resources: discoverDaemonResources,
  127. Env: []v1.EnvVar{
  128. k8sutil.NamespaceEnvVar(),
  129. k8sutil.NodeEnvVar(),
  130. k8sutil.NameEnvVar(),
  131. },
  132. },
  133. },
  134. Volumes: []v1.Volume{
  135. {
  136. Name: "dev",
  137. VolumeSource: v1.VolumeSource{
  138. HostPath: &v1.HostPathVolumeSource{
  139. Path: "/dev",
  140. },
  141. },
  142. },
  143. {
  144. Name: "sys",
  145. VolumeSource: v1.VolumeSource{
  146. HostPath: &v1.HostPathVolumeSource{
  147. Path: "/sys",
  148. },
  149. },
  150. },
  151. {
  152. Name: "udev",
  153. VolumeSource: v1.VolumeSource{
  154. HostPath: &v1.HostPathVolumeSource{
  155. Path: "/run/udev",
  156. },
  157. },
  158. },
  159. },
  160. HostNetwork: false,
  161. PriorityClassName: k8sutil.GetValue(data, discoverDaemonsetPriorityClassNameEnv, ""),
  162. },
  163. },
  164. },
  165. }
  166. // Get the operator pod details to attach the owner reference to the discover daemon set
  167. operatorPod, err := k8sutil.GetRunningPod(ctx, d.clientset)
  168. if err != nil {
  169. logger.Errorf("failed to get operator pod. %+v", err)
  170. } else {
  171. k8sutil.SetOwnerRefsWithoutBlockOwner(&ds.ObjectMeta, operatorPod.OwnerReferences)
  172. }
  173. // Add toleration if any
  174. tolerationValue := k8sutil.GetValue(data, discoverDaemonsetTolerationEnv, "")
  175. if tolerationValue != "" {
  176. ds.Spec.Template.Spec.Tolerations = []v1.Toleration{
  177. {
  178. Effect: v1.TaintEffect(tolerationValue),
  179. Operator: v1.TolerationOpExists,
  180. Key: k8sutil.GetValue(data, discoverDaemonsetTolerationKeyEnv, ""),
  181. },
  182. }
  183. }
  184. tolerationsRaw := k8sutil.GetValue(data, discoverDaemonsetTolerationsEnv, "")
  185. tolerations, err := k8sutil.YamlToTolerations(tolerationsRaw)
  186. if err != nil {
  187. logger.Warningf("failed to parse %s. %+v", tolerationsRaw, err)
  188. }
  189. logger.Infof("tolerations: %v", tolerations)
  190. ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, tolerations...)
  191. // Add NodeAffinity if any
  192. nodeAffinity := k8sutil.GetValue(data, discoverDaemonSetNodeAffinityEnv, "")
  193. if nodeAffinity != "" {
  194. v1NodeAffinity, err := k8sutil.GenerateNodeAffinity(nodeAffinity)
  195. if err != nil {
  196. logger.Errorf("failed to create NodeAffinity. %+v", err)
  197. } else {
  198. ds.Spec.Template.Spec.Affinity = &v1.Affinity{
  199. NodeAffinity: v1NodeAffinity,
  200. }
  201. }
  202. logger.Infof("nodeAffinity: %s", v1NodeAffinity)
  203. }
  204. podLabels := k8sutil.GetValue(data, discoverDaemonSetPodLabelsEnv, "")
  205. if podLabels != "" {
  206. podLabels := k8sutil.ParseStringToLabels(podLabels)
  207. // Override / Set the app label even if set by the user as
  208. // otherwise the DaemonSet pod selector may be broken
  209. podLabels["app"] = discoverDaemonsetName
  210. ds.Spec.Template.ObjectMeta.Labels = podLabels
  211. }
  212. if controller.LoopDevicesAllowed() {
  213. ds.Spec.Template.Spec.Containers[0].Env = append(ds.Spec.Template.Spec.Containers[0].Env,
  214. v1.EnvVar{Name: "CEPH_VOLUME_ALLOW_LOOP_DEVICES", Value: "true"})
  215. }
  216. _, err = d.clientset.AppsV1().DaemonSets(namespace).Create(ctx, ds, metav1.CreateOptions{})
  217. if err != nil {
  218. if !k8serrors.IsAlreadyExists(err) {
  219. return fmt.Errorf("failed to create rook-discover daemon set. %+v", err)
  220. }
  221. logger.Infof("rook-discover daemonset already exists, updating ...")
  222. _, err = d.clientset.AppsV1().DaemonSets(namespace).Update(ctx, ds, metav1.UpdateOptions{})
  223. if err != nil {
  224. return fmt.Errorf("failed to update rook-discover daemon set. %+v", err)
  225. }
  226. } else {
  227. logger.Infof("rook-discover daemonset started")
  228. }
  229. return nil
  230. }
  231. func getLabels() map[string]string {
  232. labels := make(map[string]string)
  233. k8sutil.AddRecommendedLabels(labels, "rook-discover", "rook-ceph-operator", "rook-discover", "rook-discover")
  234. labels["app"] = discoverDaemonsetName
  235. return labels
  236. }
  237. // ListDevices lists all devices discovered on all nodes or specific node if node name is provided.
  238. func ListDevices(ctx context.Context, clusterdContext *clusterd.Context, namespace, nodeName string) (map[string][]sys.LocalDisk, error) {
  239. // convert the host name label to the k8s node name to look up the configmap with the devices
  240. if len(nodeName) > 0 {
  241. var err error
  242. nodeName, err = k8sutil.GetNodeNameFromHostname(ctx, clusterdContext.Clientset, nodeName)
  243. if err != nil {
  244. logger.Warningf("failed to get node name from hostname. %+v", err)
  245. }
  246. }
  247. var devices map[string][]sys.LocalDisk
  248. listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", k8sutil.AppAttr, discoverDaemon.AppName)}
  249. // wait for device discovery configmaps
  250. retryCount := 0
  251. retryMax := 30
  252. sleepTime := 5
  253. for {
  254. retryCount++
  255. if retryCount > retryMax {
  256. return devices, fmt.Errorf("exceeded max retry count waiting for device configmap to appear")
  257. }
  258. if retryCount > 1 {
  259. // only sleep after the first time
  260. <-time.After(time.Duration(sleepTime) * time.Second)
  261. }
  262. cms, err := clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).List(ctx, listOpts)
  263. if err != nil {
  264. logger.Warningf("failed to list device configmaps: %v", err)
  265. return devices, fmt.Errorf("failed to list device configmaps: %+v", err)
  266. }
  267. if len(cms.Items) == 0 {
  268. logger.Infof("no configmap match, retry #%d", retryCount)
  269. continue
  270. }
  271. devices = make(map[string][]sys.LocalDisk, len(cms.Items))
  272. for _, cm := range cms.Items {
  273. node := cm.ObjectMeta.Labels[discoverDaemon.NodeAttr]
  274. if len(nodeName) > 0 && node != nodeName {
  275. continue
  276. }
  277. deviceJson := cm.Data[discoverDaemon.LocalDiskCMData]
  278. logger.Debugf("node %s, device %s", node, deviceJson)
  279. if len(node) == 0 || len(deviceJson) == 0 {
  280. continue
  281. }
  282. var d []sys.LocalDisk
  283. err = json.Unmarshal([]byte(deviceJson), &d)
  284. if err != nil {
  285. logger.Warningf("failed to unmarshal %s", deviceJson)
  286. continue
  287. }
  288. devices[node] = d
  289. }
  290. break
  291. }
  292. logger.Debugf("discovery found the following devices %+v", devices)
  293. return devices, nil
  294. }
  295. // ListDevicesInUse lists all devices on a node that are already used by existing clusters.
  296. func ListDevicesInUse(ctx context.Context, clusterdContext *clusterd.Context, namespace, nodeName string) ([]sys.LocalDisk, error) {
  297. var devices []sys.LocalDisk
  298. if len(nodeName) == 0 {
  299. return devices, fmt.Errorf("empty node name")
  300. }
  301. listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", k8sutil.AppAttr, deviceInUseAppName)}
  302. cms, err := clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).List(ctx, listOpts)
  303. if err != nil {
  304. return devices, fmt.Errorf("failed to list device in use configmaps: %+v", err)
  305. }
  306. for _, cm := range cms.Items {
  307. node := cm.ObjectMeta.Labels[discoverDaemon.NodeAttr]
  308. if node != nodeName {
  309. continue
  310. }
  311. deviceJson := cm.Data[discoverDaemon.LocalDiskCMData]
  312. logger.Debugf("node %s, device in use %s", node, deviceJson)
  313. if len(node) == 0 || len(deviceJson) == 0 {
  314. continue
  315. }
  316. var d []sys.LocalDisk
  317. err = json.Unmarshal([]byte(deviceJson), &d)
  318. if err != nil {
  319. logger.Warningf("failed to unmarshal %s", deviceJson)
  320. continue
  321. }
  322. devices = append(devices, d...)
  323. }
  324. logger.Debugf("devices in use %+v", devices)
  325. return devices, nil
  326. }
  327. func matchDeviceFullPath(devLinks, fullpath string) bool {
  328. dlsArr := strings.Split(devLinks, " ")
  329. for i := range dlsArr {
  330. if dlsArr[i] == fullpath {
  331. return true
  332. }
  333. }
  334. return false
  335. }
  336. // GetAvailableDevices conducts outer join using input filters with free devices that a node has. It marks the devices from join result as in-use.
  337. func GetAvailableDevices(ctx context.Context, clusterdContext *clusterd.Context, nodeName, clusterName string, devices []cephv1.Device, filter string, useAllDevices bool) ([]cephv1.Device, error) {
  338. results := []cephv1.Device{}
  339. if len(devices) == 0 && len(filter) == 0 && !useAllDevices {
  340. return results, nil
  341. }
  342. namespace := os.Getenv(k8sutil.PodNamespaceEnvVar)
  343. // find all devices
  344. allDevices, err := ListDevices(ctx, clusterdContext, namespace, nodeName)
  345. if err != nil {
  346. return results, err
  347. }
  348. // find those on the node
  349. nodeAllDevices, ok := allDevices[nodeName]
  350. if !ok {
  351. return results, fmt.Errorf("node %s has no devices", nodeName)
  352. }
  353. // find those in use on the node
  354. devicesInUse, err := ListDevicesInUse(ctx, clusterdContext, namespace, nodeName)
  355. if err != nil {
  356. return results, err
  357. }
  358. nodeDevices := []sys.LocalDisk{}
  359. for _, nodeDevice := range nodeAllDevices {
  360. // TODO: Filter out devices that are in use by another cluster.
  361. // We need to retain the devices in use for this cluster so the provisioner will continue to configure the same OSDs.
  362. for _, device := range devicesInUse {
  363. if nodeDevice.Name == device.Name {
  364. break
  365. }
  366. }
  367. nodeDevices = append(nodeDevices, nodeDevice)
  368. }
  369. claimedDevices := []sys.LocalDisk{}
  370. // now those left are free to use
  371. if len(devices) > 0 {
  372. for i := range devices {
  373. for j := range nodeDevices {
  374. if devices[i].FullPath != "" && matchDeviceFullPath(nodeDevices[j].DevLinks, devices[i].FullPath) {
  375. if devices[i].Name == "" {
  376. devices[i].Name = nodeDevices[j].Name
  377. }
  378. results = append(results, devices[i])
  379. claimedDevices = append(claimedDevices, nodeDevices[j])
  380. } else if devices[i].Name == nodeDevices[j].Name {
  381. results = append(results, devices[i])
  382. claimedDevices = append(claimedDevices, nodeDevices[j])
  383. }
  384. }
  385. }
  386. } else if len(filter) >= 0 {
  387. for i := range nodeDevices {
  388. //TODO support filter based on other keys
  389. matched, err := regexp.Match(filter, []byte(nodeDevices[i].Name))
  390. if err == nil && matched {
  391. d := cephv1.Device{
  392. Name: nodeDevices[i].Name,
  393. }
  394. claimedDevices = append(claimedDevices, nodeDevices[i])
  395. results = append(results, d)
  396. }
  397. }
  398. } else if useAllDevices {
  399. for i := range nodeDevices {
  400. d := cephv1.Device{
  401. Name: nodeDevices[i].Name,
  402. }
  403. results = append(results, d)
  404. claimedDevices = append(claimedDevices, nodeDevices[i])
  405. }
  406. }
  407. // mark these devices in use
  408. if len(claimedDevices) > 0 {
  409. deviceJson, err := json.Marshal(claimedDevices)
  410. if err != nil {
  411. logger.Infof("failed to marshal: %v", err)
  412. return results, err
  413. }
  414. data := make(map[string]string, 1)
  415. data[discoverDaemon.LocalDiskCMData] = string(deviceJson)
  416. cm := &v1.ConfigMap{
  417. ObjectMeta: metav1.ObjectMeta{
  418. Name: k8sutil.TruncateNodeName(fmt.Sprintf(deviceInUseCMName, clusterName, "%s"), nodeName),
  419. Namespace: namespace,
  420. Labels: map[string]string{
  421. k8sutil.AppAttr: deviceInUseAppName,
  422. discoverDaemon.NodeAttr: nodeName,
  423. deviceInUseClusterAttr: clusterName,
  424. },
  425. },
  426. Data: data,
  427. }
  428. _, err = clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).Create(ctx, cm, metav1.CreateOptions{})
  429. if err != nil {
  430. if !k8serrors.IsAlreadyExists(err) {
  431. return results, fmt.Errorf("failed to update device in use for cluster %s node %s: %v", clusterName, nodeName, err)
  432. }
  433. if _, err := clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).Update(ctx, cm, metav1.UpdateOptions{}); err != nil {
  434. return results, fmt.Errorf("failed to update devices in use. %+v", err)
  435. }
  436. }
  437. }
  438. return results, nil
  439. }
  440. // Stop the discover
  441. func (d *Discover) Stop(ctx context.Context, namespace string) error {
  442. err := d.clientset.AppsV1().DaemonSets(namespace).Delete(ctx, discoverDaemonsetName, metav1.DeleteOptions{})
  443. if err != nil && !k8serrors.IsNotFound(err) {
  444. return err
  445. }
  446. return nil
  447. }