podman.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. //go:build !windows
  4. // +build !windows
  5. package podmanreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver"
  6. import (
  7. "context"
  8. "encoding/json"
  9. "net/url"
  10. "sync"
  11. "time"
  12. "go.uber.org/zap"
  13. )
  14. type clientFactory func(logger *zap.Logger, cfg *Config) (PodmanClient, error)
  15. type PodmanClient interface {
  16. ping(context.Context) error
  17. stats(context.Context, url.Values) ([]containerStats, error)
  18. list(context.Context, url.Values) ([]container, error)
  19. events(context.Context, url.Values) (<-chan event, <-chan error)
  20. }
  21. type ContainerScraper struct {
  22. client PodmanClient
  23. containers map[string]container
  24. containersLock sync.Mutex
  25. logger *zap.Logger
  26. config *Config
  27. }
  28. func newContainerScraper(engineClient PodmanClient, logger *zap.Logger, config *Config) *ContainerScraper {
  29. return &ContainerScraper{
  30. client: engineClient,
  31. containers: make(map[string]container),
  32. logger: logger,
  33. config: config,
  34. }
  35. }
  36. // containers provides a slice of container to use for individual fetchContainerStats calls.
  37. func (pc *ContainerScraper) getContainers() []container {
  38. pc.containersLock.Lock()
  39. defer pc.containersLock.Unlock()
  40. containers := make([]container, 0, len(pc.containers))
  41. for _, container := range pc.containers {
  42. containers = append(containers, container)
  43. }
  44. return containers
  45. }
  46. // loadContainerList will load the initial running container maps for
  47. // inspection and establishing which containers warrant stat gathering calls
  48. // by the receiver.
  49. func (pc *ContainerScraper) loadContainerList(ctx context.Context) error {
  50. params := url.Values{}
  51. runningFilter := map[string][]string{
  52. "status": {"running"},
  53. }
  54. jsonFilter, err := json.Marshal(runningFilter)
  55. if err != nil {
  56. return nil
  57. }
  58. params.Add("filters", string(jsonFilter))
  59. listCtx, cancel := context.WithTimeout(ctx, pc.config.Timeout)
  60. defer cancel()
  61. containerList, err := pc.client.list(listCtx, params)
  62. if err != nil {
  63. return err
  64. }
  65. for _, c := range containerList {
  66. pc.persistContainer(c)
  67. }
  68. return nil
  69. }
  70. func (pc *ContainerScraper) events(ctx context.Context, options url.Values) (<-chan event, <-chan error) {
  71. return pc.client.events(ctx, options)
  72. }
  73. func (pc *ContainerScraper) containerEventLoop(ctx context.Context) {
  74. filters := url.Values{}
  75. cidFilter := map[string][]string{
  76. "status": {"died", "start"},
  77. "type": {"container"},
  78. }
  79. jsonFilter, err := json.Marshal(cidFilter)
  80. if err != nil {
  81. return
  82. }
  83. filters.Add("filters", string(jsonFilter))
  84. EVENT_LOOP:
  85. for {
  86. eventCh, errCh := pc.events(ctx, filters)
  87. for {
  88. select {
  89. case <-ctx.Done():
  90. return
  91. case podmanEvent := <-eventCh:
  92. pc.logger.Info("Event received", zap.String("status", podmanEvent.Status))
  93. switch podmanEvent.Status {
  94. case "died":
  95. pc.logger.Debug("Podman container died:", zap.String("id", podmanEvent.ID))
  96. pc.removeContainer(podmanEvent.ID)
  97. case "start":
  98. pc.logger.Debug(
  99. "Podman container started:",
  100. zap.String("id", podmanEvent.ID),
  101. zap.String("status", podmanEvent.Status),
  102. )
  103. pc.inspectAndPersistContainer(ctx, podmanEvent.ID)
  104. }
  105. case err := <-errCh:
  106. // We are only interested when the context hasn't been canceled since requests made
  107. // with a closed context are guaranteed to fail.
  108. if ctx.Err() == nil {
  109. pc.logger.Error("Error watching podman container events", zap.Error(err))
  110. // Either decoding or connection error has occurred, so we should resume the event loop after
  111. // waiting a moment. In cases of extended daemon unavailability this will retry until
  112. // collector teardown or background context is closed.
  113. select {
  114. case <-time.After(3 * time.Second):
  115. continue EVENT_LOOP
  116. case <-ctx.Done():
  117. return
  118. }
  119. }
  120. }
  121. }
  122. }
  123. }
  124. // inspectAndPersistContainer queries inspect api and returns *container and true when container should be queried for stats,
  125. // nil and false otherwise. Persists the container in the cache if container is
  126. // running and not excluded.
  127. func (pc *ContainerScraper) inspectAndPersistContainer(ctx context.Context, cid string) (*container, bool) {
  128. params := url.Values{}
  129. cidFilter := map[string][]string{
  130. "id": {cid},
  131. }
  132. jsonFilter, err := json.Marshal(cidFilter)
  133. if err != nil {
  134. return nil, false
  135. }
  136. params.Add("filters", string(jsonFilter))
  137. inspectCtx, cancel := context.WithTimeout(ctx, pc.config.Timeout)
  138. defer cancel()
  139. container, err := pc.client.list(inspectCtx, params)
  140. if len(container) == 1 && err == nil {
  141. pc.persistContainer(container[0])
  142. return &container[0], true
  143. }
  144. pc.logger.Error(
  145. "Could not inspect updated container",
  146. zap.String("id", cid),
  147. zap.Error(err),
  148. )
  149. return nil, false
  150. }
  151. // fetchContainerStats will query the desired container stats
  152. func (pc *ContainerScraper) fetchContainerStats(ctx context.Context, c container) (containerStats, error) {
  153. params := url.Values{}
  154. params.Add("stream", "false")
  155. params.Add("containers", c.ID)
  156. stats, err := pc.client.stats(ctx, params)
  157. if err != nil || len(stats) < 1 {
  158. return containerStats{}, err
  159. }
  160. return stats[0], nil
  161. }
  162. func (pc *ContainerScraper) persistContainer(c container) {
  163. pc.logger.Debug("Monitoring Podman container", zap.String("id", c.ID))
  164. pc.containersLock.Lock()
  165. defer pc.containersLock.Unlock()
  166. pc.containers[c.ID] = c
  167. }
  168. func (pc *ContainerScraper) removeContainer(cid string) {
  169. pc.containersLock.Lock()
  170. defer pc.containersLock.Unlock()
  171. delete(pc.containers, cid)
  172. pc.logger.Debug("Removed container from stores.", zap.String("id", cid))
  173. }