discover.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. /*
  2. Copyright 2018 The Rook Authors. All rights reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package discover to discover unused devices.
  14. package discover
  15. import (
  16. "bufio"
  17. "context"
  18. "encoding/json"
  19. "fmt"
  20. "os"
  21. "os/exec"
  22. "os/signal"
  23. "path"
  24. "regexp"
  25. "strings"
  26. "syscall"
  27. "time"
  28. "github.com/coreos/pkg/capnslog"
  29. "github.com/rook/rook/pkg/clusterd"
  30. "github.com/rook/rook/pkg/operator/k8sutil"
  31. "github.com/rook/rook/pkg/util/sys"
  32. v1 "k8s.io/api/core/v1"
  33. kerrors "k8s.io/apimachinery/pkg/api/errors"
  34. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  35. )
  36. const (
  37. discoverDaemonUdev = "DISCOVER_DAEMON_UDEV_BLACKLIST"
  38. )
  39. var (
  40. logger = capnslog.NewPackageLogger("github.com/rook/rook", "rook-discover")
  41. // AppName is the name of the pod
  42. AppName = "rook-discover"
  43. // NodeAttr is the attribute of that node
  44. NodeAttr = "rook.io/node"
  45. // LocalDiskCMData is the data name of the config map storing devices
  46. LocalDiskCMData = "devices"
  47. // LocalDiskCMName is name of the config map storing devices
  48. LocalDiskCMName = "local-device-%s"
  49. nodeName string
  50. namespace string
  51. lastDevice string
  52. cmName string
  53. cm *v1.ConfigMap
  54. udevEventPeriod = time.Duration(5) * time.Second
  55. useCVInventory bool
  56. )
  57. // CephVolumeInventory is the Go struct representation of the json output
  58. type CephVolumeInventory struct {
  59. Path string `json:"path"`
  60. Available bool `json:"available"`
  61. RejectedReasons json.RawMessage `json:"rejected_reasons"`
  62. SysAPI json.RawMessage `json:"sys_api"`
  63. LVS json.RawMessage `json:"lvs"`
  64. }
  65. // Run is the entry point of that package execution
  66. func Run(ctx context.Context, context *clusterd.Context, probeInterval time.Duration, useCV bool) error {
  67. if context == nil {
  68. return fmt.Errorf("nil context")
  69. }
  70. logger.Debugf("device discovery interval is %q", probeInterval.String())
  71. logger.Debugf("use ceph-volume inventory is %t", useCV)
  72. nodeName = os.Getenv(k8sutil.NodeNameEnvVar)
  73. namespace = os.Getenv(k8sutil.PodNamespaceEnvVar)
  74. cmName = k8sutil.TruncateNodeName(LocalDiskCMName, nodeName)
  75. useCVInventory = useCV
  76. sigc := make(chan os.Signal, 1)
  77. signal.Notify(sigc, syscall.SIGTERM)
  78. err := updateDeviceCM(ctx, context)
  79. if err != nil {
  80. logger.Infof("failed to update device configmap: %v", err)
  81. return err
  82. }
  83. udevEvents := make(chan struct{})
  84. go udevBlockMonitor(udevEvents, udevEventPeriod)
  85. for {
  86. select {
  87. case <-sigc:
  88. logger.Infof("shutdown signal received, exiting...")
  89. return nil
  90. case <-time.After(probeInterval):
  91. if err := updateDeviceCM(ctx, context); err != nil {
  92. logger.Errorf("failed to update device configmap during probe interval. %v", err)
  93. }
  94. case _, ok := <-udevEvents:
  95. if ok {
  96. logger.Info("trigger probe from udev event")
  97. if err := updateDeviceCM(ctx, context); err != nil {
  98. logger.Errorf("failed to update device configmap triggered from udev event. %v", err)
  99. }
  100. } else {
  101. logger.Warningf("disabling udev monitoring")
  102. udevEvents = nil
  103. }
  104. }
  105. }
  106. }
  107. func matchUdevEvent(text string, matches, exclusions []string) (bool, error) {
  108. for _, match := range matches {
  109. matched, err := regexp.MatchString(match, text)
  110. if err != nil {
  111. return false, fmt.Errorf("failed to search string: %v", err)
  112. }
  113. if matched {
  114. hasExclusion := false
  115. for _, exclusion := range exclusions {
  116. matched, err = regexp.MatchString(exclusion, text)
  117. if err != nil {
  118. return false, fmt.Errorf("failed to search string: %v", err)
  119. }
  120. if matched {
  121. hasExclusion = true
  122. break
  123. }
  124. }
  125. if !hasExclusion {
  126. logger.Infof("udevadm monitor: matched event: %s", text)
  127. return true, nil
  128. }
  129. }
  130. }
  131. return false, nil
  132. }
  133. // Scans `udevadm monitor` output for block sub-system events. Each line of
  134. // output matching a set of substrings is sent to the provided channel. An event
  135. // is returned if it passes any matches tests, and passes all exclusion tests.
  136. func rawUdevBlockMonitor(c chan struct{}, matches, exclusions []string) {
  137. defer close(c)
  138. // stdbuf -oL performs line buffered output
  139. cmd := exec.Command("stdbuf", "-oL", "udevadm", "monitor", "-u", "-s", "block")
  140. stdout, err := cmd.StdoutPipe()
  141. if err != nil {
  142. logger.Warningf("Cannot open udevadm stdout: %v", err)
  143. return
  144. }
  145. defer stdout.Close()
  146. err = cmd.Start()
  147. if err != nil {
  148. logger.Warningf("Cannot start udevadm monitoring: %v", err)
  149. return
  150. }
  151. scanner := bufio.NewScanner(stdout)
  152. for scanner.Scan() {
  153. text := scanner.Text()
  154. logger.Debugf("udevadm monitor: %s", text)
  155. match, err := matchUdevEvent(text, matches, exclusions)
  156. if err != nil {
  157. logger.Warningf("udevadm filtering failed: %v", err)
  158. return
  159. }
  160. if match {
  161. c <- struct{}{}
  162. }
  163. }
  164. if err := scanner.Err(); err != nil {
  165. logger.Warningf("udevadm monitor scanner error: %v", err)
  166. }
  167. logger.Info("udevadm monitor finished")
  168. }
  169. // Monitors udev for block device changes, and collapses these events such that
  170. // only one event is emitted per period in order to deal with flapping.
  171. func udevBlockMonitor(c chan struct{}, period time.Duration) {
  172. defer close(c)
  173. var udevFilter []string
  174. // return any add or remove events, but none that match device mapper
  175. // events. string matching is case-insensitive
  176. events := make(chan struct{})
  177. // get discoverDaemonUdevBlacklist from the environment variable
  178. // if user doesn't provide any regex; generate the default regex
  179. // else use the regex provided by user
  180. discoverUdev := os.Getenv(discoverDaemonUdev)
  181. if discoverUdev == "" {
  182. discoverUdev = "(?i)dm-[0-9]+,(?i)rbd[0-9]+,(?i)nbd[0-9]+"
  183. }
  184. udevFilter = strings.Split(discoverUdev, ",")
  185. logger.Infof("using the regular expressions %q", udevFilter)
  186. go rawUdevBlockMonitor(events,
  187. []string{"(?i)add", "(?i)remove"},
  188. udevFilter)
  189. timeout := time.NewTimer(period)
  190. defer timeout.Stop()
  191. for {
  192. _, ok := <-events
  193. if !ok {
  194. return
  195. }
  196. if !timeout.Stop() {
  197. <-timeout.C
  198. }
  199. timeout.Reset(period)
  200. for {
  201. select {
  202. case <-timeout.C:
  203. case _, ok := <-events:
  204. if !ok {
  205. return
  206. }
  207. continue
  208. }
  209. break
  210. }
  211. c <- struct{}{}
  212. }
  213. }
  214. func ignoreDevice(dev sys.LocalDisk) bool {
  215. return strings.Contains(strings.ToUpper(dev.DevLinks), "USB")
  216. }
  217. func checkMatchingDevice(checkDev sys.LocalDisk, devices []sys.LocalDisk) *sys.LocalDisk {
  218. for i, dev := range devices {
  219. if ignoreDevice(dev) {
  220. continue
  221. }
  222. // check if devices should be considered the same. the uuid can be
  223. // unstable, so we also use the reported serial and device name, which
  224. // appear to be more stable.
  225. if checkDev.UUID != "" && dev.UUID != "" && checkDev.UUID == dev.UUID {
  226. return &devices[i]
  227. }
  228. // on virt-io devices in libvirt, the serial is reported as an empty
  229. // string, so also account for that.
  230. if checkDev.Serial == dev.Serial && checkDev.Serial != "" {
  231. return &devices[i]
  232. }
  233. if checkDev.Name == dev.Name {
  234. return &devices[i]
  235. }
  236. }
  237. return nil
  238. }
  239. // note that the idea of equality here may not be intuitive. equality of device
  240. // sets refers to a state in which no change has been observed between the sets
  241. // of devices that would warrant changes to their consumption by storage
  242. // daemons. for example, if a device appears to have been wiped vs a device
  243. // appears to now be in use.
  244. func checkDeviceListsEqual(oldDevs, newDevs []sys.LocalDisk) bool {
  245. for _, oldDev := range oldDevs {
  246. if ignoreDevice(oldDev) {
  247. continue
  248. }
  249. match := checkMatchingDevice(oldDev, newDevs)
  250. if match == nil {
  251. // device has been removed
  252. return false
  253. }
  254. if !oldDev.Empty && match.Empty {
  255. // device has changed from non-empty to empty
  256. return false
  257. }
  258. if oldDev.Partitions != nil && match.Partitions == nil {
  259. return false
  260. }
  261. if string(oldDev.CephVolumeData) == "" && string(match.CephVolumeData) != "" {
  262. // return ceph volume inventory data was not enabled before
  263. return false
  264. }
  265. }
  266. for _, newDev := range newDevs {
  267. if ignoreDevice(newDev) {
  268. continue
  269. }
  270. match := checkMatchingDevice(newDev, oldDevs)
  271. if match == nil {
  272. // device has been added
  273. return false
  274. }
  275. // the matching case is handled in the previous join
  276. }
  277. return true
  278. }
  279. // DeviceListsEqual checks whether 2 lists are equal or not
  280. func DeviceListsEqual(old, new string) (bool, error) {
  281. var oldDevs []sys.LocalDisk
  282. var newDevs []sys.LocalDisk
  283. err := json.Unmarshal([]byte(old), &oldDevs)
  284. if err != nil {
  285. return false, fmt.Errorf("cannot unmarshal devices: %+v", err)
  286. }
  287. err = json.Unmarshal([]byte(new), &newDevs)
  288. if err != nil {
  289. return false, fmt.Errorf("cannot unmarshal devices: %+v", err)
  290. }
  291. return checkDeviceListsEqual(oldDevs, newDevs), nil
  292. }
  293. func updateDeviceCM(ctx context.Context, clusterdContext *clusterd.Context) error {
  294. logger.Infof("updating device configmap")
  295. devices, err := probeDevices(clusterdContext)
  296. if err != nil {
  297. logger.Infof("failed to probe devices: %v", err)
  298. return err
  299. }
  300. deviceJSON, err := json.Marshal(devices)
  301. if err != nil {
  302. logger.Infof("failed to marshal: %v", err)
  303. return err
  304. }
  305. deviceStr := string(deviceJSON)
  306. if cm == nil {
  307. cm, err = clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).Get(ctx, cmName, metav1.GetOptions{})
  308. }
  309. if err == nil {
  310. lastDevice = cm.Data[LocalDiskCMData]
  311. logger.Debugf("last devices %s", lastDevice)
  312. } else {
  313. if !kerrors.IsNotFound(err) {
  314. logger.Infof("failed to get configmap: %v", err)
  315. return err
  316. }
  317. data := make(map[string]string, 1)
  318. data[LocalDiskCMData] = deviceStr
  319. // the map doesn't exist yet, create it now
  320. cm = &v1.ConfigMap{
  321. ObjectMeta: metav1.ObjectMeta{
  322. Name: cmName,
  323. Namespace: namespace,
  324. Labels: map[string]string{
  325. k8sutil.AppAttr: AppName,
  326. NodeAttr: nodeName,
  327. },
  328. },
  329. Data: data,
  330. }
  331. // Get the discover daemon pod details to attach the owner reference to the config map
  332. discoverPod, err := k8sutil.GetRunningPod(ctx, clusterdContext.Clientset)
  333. if err != nil {
  334. logger.Warningf("failed to get discover pod to set ownerref. %+v", err)
  335. } else {
  336. k8sutil.SetOwnerRefsWithoutBlockOwner(&cm.ObjectMeta, discoverPod.OwnerReferences)
  337. }
  338. cm, err = clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).Create(ctx, cm, metav1.CreateOptions{})
  339. if err != nil {
  340. logger.Infof("failed to create configmap: %v", err)
  341. return fmt.Errorf("failed to create local device map %s: %+v", cmName, err)
  342. }
  343. lastDevice = deviceStr
  344. }
  345. devicesEqual, err := DeviceListsEqual(lastDevice, deviceStr)
  346. if err != nil {
  347. return fmt.Errorf("failed to compare device lists: %v", err)
  348. }
  349. if !devicesEqual {
  350. data := make(map[string]string, 1)
  351. data[LocalDiskCMData] = deviceStr
  352. cm.Data = data
  353. cm, err = clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).Update(ctx, cm, metav1.UpdateOptions{})
  354. if err != nil {
  355. logger.Infof("failed to update configmap %s: %v", cmName, err)
  356. return err
  357. }
  358. }
  359. return nil
  360. }
  361. func logDevices(devices []*sys.LocalDisk) {
  362. var devicesList []string
  363. for _, device := range devices {
  364. logger.Debugf("localdevice %q: %+v", device.Name, device)
  365. devicesList = append(devicesList, device.Name)
  366. }
  367. logger.Infof("localdevices: %q", strings.Join(devicesList, ", "))
  368. }
  369. func probeDevices(context *clusterd.Context) ([]sys.LocalDisk, error) {
  370. devices := make([]sys.LocalDisk, 0)
  371. localDevices, err := clusterd.DiscoverDevices(context.Executor)
  372. if err != nil {
  373. return devices, fmt.Errorf("failed initial hardware discovery. %+v", err)
  374. }
  375. logDevices(localDevices)
  376. // ceph-volume inventory command takes a little time to complete.
  377. // Get this data only if it is needed and once by function execution
  378. var cvInventory *map[string]string = nil
  379. if useCVInventory {
  380. logger.Infof("Getting ceph-volume inventory information")
  381. cvInventory, err = getCephVolumeInventory(context)
  382. if err != nil {
  383. logger.Errorf("error getting ceph-volume inventory: %v", err)
  384. }
  385. }
  386. for _, device := range localDevices {
  387. if device == nil {
  388. continue
  389. }
  390. partitions, _, err := sys.GetDevicePartitions(device.Name, context.Executor)
  391. if err != nil {
  392. logger.Infof("failed to check device partitions %s: %v", device.Name, err)
  393. continue
  394. }
  395. // check if there is a file system on the device
  396. fs, err := sys.GetDeviceFilesystems(device.Name, context.Executor)
  397. if err != nil {
  398. logger.Infof("failed to check device filesystem %s: %v", device.Name, err)
  399. continue
  400. }
  401. device.Partitions = partitions
  402. device.Filesystem = fs
  403. device.Empty = clusterd.GetDeviceEmpty(device)
  404. // Add the information provided by ceph-volume inventory
  405. if cvInventory != nil {
  406. CVData, deviceExists := (*cvInventory)[path.Join("/dev/", device.Name)]
  407. if deviceExists {
  408. device.CephVolumeData = CVData
  409. } else {
  410. logger.Errorf("ceph-volume information for device %q not found", device.Name)
  411. }
  412. } else {
  413. device.CephVolumeData = ""
  414. }
  415. devices = append(devices, *device)
  416. }
  417. logger.Infof("available devices: %+v", devices)
  418. return devices, nil
  419. }
  420. // getCephVolumeInventory: Return a map of strings indexed by device with the
  421. // information about the device returned by the command <ceph-volume inventory>
  422. func getCephVolumeInventory(context *clusterd.Context) (*map[string]string, error) {
  423. inventory, err := context.Executor.ExecuteCommandWithOutput("ceph-volume", "inventory", "--format", "json")
  424. if err != nil {
  425. return nil, fmt.Errorf("failed to execute ceph-volume inventory. %+v", err)
  426. }
  427. // Return a map with the information of each device indexed by path
  428. CVDevices := make(map[string]string)
  429. // No data retrieved from ceph-volume
  430. if inventory == "" {
  431. return &CVDevices, nil
  432. }
  433. // Get a slice to store the json data
  434. bInventory := []byte(inventory)
  435. var CVInventory []CephVolumeInventory
  436. err = json.Unmarshal(bInventory, &CVInventory)
  437. if err != nil {
  438. return &CVDevices, fmt.Errorf("error unmarshalling json data coming from ceph-volume inventory. %v", err)
  439. }
  440. for _, device := range CVInventory {
  441. jsonData, err := json.Marshal(device)
  442. if err != nil {
  443. logger.Errorf("error marshaling json data for device: %v", device.Path)
  444. } else {
  445. CVDevices[device.Path] = string(jsonData)
  446. }
  447. }
  448. return &CVDevices, nil
  449. }