docker.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package docker // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker"
  4. import (
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "strings"
  11. "sync"
  12. "time"
  13. dtypes "github.com/docker/docker/api/types"
  14. devents "github.com/docker/docker/api/types/events"
  15. dfilters "github.com/docker/docker/api/types/filters"
  16. docker "github.com/docker/docker/client"
  17. "go.uber.org/zap"
  18. )
  19. const (
  20. minimalRequiredDockerAPIVersion = 1.22
  21. userAgent = "OpenTelemetry-Collector Docker Stats Receiver/v0.0.1"
  22. )
  23. // Container is client.ContainerInspect() response container
  24. // stats and translated environment string map for potential labels.
  25. type Container struct {
  26. *dtypes.ContainerJSON
  27. EnvMap map[string]string
  28. }
  29. // Client provides the core metric gathering functionality from the Docker Daemon.
  30. // It retrieves container information in two forms to produce metric data: dtypes.ContainerJSON
  31. // from client.ContainerInspect() for container information (id, name, hostname, labels, and env)
  32. // and dtypes.StatsJSON from client.ContainerStats() for metric values.
  33. type Client struct {
  34. client *docker.Client
  35. config *Config
  36. containers map[string]Container
  37. containersLock sync.Mutex
  38. excludedImageMatcher *stringMatcher
  39. logger *zap.Logger
  40. }
  41. func NewDockerClient(config *Config, logger *zap.Logger, opts ...docker.Opt) (*Client, error) {
  42. client, err := docker.NewClientWithOpts(
  43. append([]docker.Opt{
  44. docker.WithHost(config.Endpoint),
  45. docker.WithVersion(fmt.Sprintf("v%v", config.DockerAPIVersion)),
  46. docker.WithHTTPHeaders(map[string]string{"User-Agent": userAgent}),
  47. }, opts...)...,
  48. )
  49. if err != nil {
  50. return nil, fmt.Errorf("could not create docker client: %w", err)
  51. }
  52. excludedImageMatcher, err := newStringMatcher(config.ExcludedImages)
  53. if err != nil {
  54. return nil, fmt.Errorf("could not determine docker client excluded images: %w", err)
  55. }
  56. dc := &Client{
  57. client: client,
  58. config: config,
  59. logger: logger,
  60. containers: make(map[string]Container),
  61. containersLock: sync.Mutex{},
  62. excludedImageMatcher: excludedImageMatcher,
  63. }
  64. return dc, nil
  65. }
  66. // Containers provides a slice of Container to use for individual FetchContainerStats calls.
  67. func (dc *Client) Containers() []Container {
  68. dc.containersLock.Lock()
  69. defer dc.containersLock.Unlock()
  70. containers := make([]Container, 0, len(dc.containers))
  71. for _, container := range dc.containers {
  72. containers = append(containers, container)
  73. }
  74. return containers
  75. }
  76. // LoadContainerList will load the initial running container maps for
  77. // inspection and establishing which containers warrant stat gathering calls
  78. // by the receiver.
  79. func (dc *Client) LoadContainerList(ctx context.Context) error {
  80. // Build initial container maps before starting loop
  81. filters := dfilters.NewArgs()
  82. filters.Add("status", "running")
  83. options := dtypes.ContainerListOptions{
  84. Filters: filters,
  85. }
  86. listCtx, cancel := context.WithTimeout(ctx, dc.config.Timeout)
  87. containerList, err := dc.client.ContainerList(listCtx, options)
  88. defer cancel()
  89. if err != nil {
  90. return err
  91. }
  92. wg := sync.WaitGroup{}
  93. for _, c := range containerList {
  94. wg.Add(1)
  95. go func(container dtypes.Container) {
  96. if !dc.shouldBeExcluded(container.Image) {
  97. dc.InspectAndPersistContainer(ctx, container.ID)
  98. } else {
  99. dc.logger.Debug(
  100. "Not monitoring container per ExcludedImages",
  101. zap.String("image", container.Image),
  102. zap.String("id", container.ID),
  103. )
  104. }
  105. wg.Done()
  106. }(c)
  107. }
  108. wg.Wait()
  109. return nil
  110. }
  111. // FetchContainerStatsAsJSON will query the desired container stats
  112. // and return them as StatsJSON
  113. func (dc *Client) FetchContainerStatsAsJSON(
  114. ctx context.Context,
  115. container Container,
  116. ) (*dtypes.StatsJSON, error) {
  117. containerStats, err := dc.FetchContainerStats(ctx, container)
  118. if err != nil {
  119. return nil, err
  120. }
  121. statsJSON, err := dc.toStatsJSON(containerStats, &container)
  122. if err != nil {
  123. return nil, err
  124. }
  125. return statsJSON, nil
  126. }
  127. // FetchContainerStats will query the desired container stats
  128. // and return them as ContainerStats
  129. func (dc *Client) FetchContainerStats(
  130. ctx context.Context,
  131. container Container,
  132. ) (dtypes.ContainerStats, error) {
  133. dc.logger.Debug("Fetching container stats.", zap.String("id", container.ID))
  134. statsCtx, cancel := context.WithTimeout(ctx, dc.config.Timeout)
  135. containerStats, err := dc.client.ContainerStats(statsCtx, container.ID, false)
  136. defer cancel()
  137. if err != nil {
  138. if docker.IsErrNotFound(err) {
  139. dc.logger.Debug(
  140. "Daemon reported container doesn't exist. Will no longer monitor.",
  141. zap.String("id", container.ID),
  142. )
  143. dc.RemoveContainer(container.ID)
  144. } else {
  145. dc.logger.Warn(
  146. "Could not fetch docker containerStats for container",
  147. zap.String("id", container.ID),
  148. zap.Error(err),
  149. )
  150. }
  151. }
  152. return containerStats, err
  153. }
  154. func (dc *Client) toStatsJSON(
  155. containerStats dtypes.ContainerStats,
  156. container *Container,
  157. ) (*dtypes.StatsJSON, error) {
  158. var statsJSON dtypes.StatsJSON
  159. err := json.NewDecoder(containerStats.Body).Decode(&statsJSON)
  160. containerStats.Body.Close()
  161. if err != nil {
  162. // EOF means there aren't any containerStats, perhaps because the container has been removed.
  163. if errors.Is(err, io.EOF) {
  164. // It isn't indicative of actual error.
  165. return nil, err
  166. }
  167. dc.logger.Error(
  168. "Could not parse docker containerStats for container id",
  169. zap.String("id", container.ID),
  170. zap.Error(err),
  171. )
  172. return nil, err
  173. }
  174. return &statsJSON, nil
  175. }
  176. // Events exposes the underlying Docker clients Events channel.
  177. // Caller should close the events channel by canceling the context.
  178. // If an error occurs, processing stops and caller must reinvoke this method.
  179. func (dc *Client) Events(ctx context.Context, options dtypes.EventsOptions) (<-chan devents.Message, <-chan error) {
  180. return dc.client.Events(ctx, options)
  181. }
  182. func (dc *Client) ContainerEventLoop(ctx context.Context) {
  183. filters := dfilters.NewArgs([]dfilters.KeyValuePair{
  184. {Key: "type", Value: "container"},
  185. {Key: "event", Value: "destroy"},
  186. {Key: "event", Value: "die"},
  187. {Key: "event", Value: "pause"},
  188. {Key: "event", Value: "rename"},
  189. {Key: "event", Value: "stop"},
  190. {Key: "event", Value: "start"},
  191. {Key: "event", Value: "unpause"},
  192. {Key: "event", Value: "update"},
  193. }...)
  194. lastTime := time.Now()
  195. EVENT_LOOP:
  196. for {
  197. options := dtypes.EventsOptions{
  198. Filters: filters,
  199. Since: lastTime.Format(time.RFC3339Nano),
  200. }
  201. eventCh, errCh := dc.Events(ctx, options)
  202. for {
  203. select {
  204. case <-ctx.Done():
  205. return
  206. case event := <-eventCh:
  207. switch event.Action {
  208. case "destroy":
  209. dc.logger.Debug("Docker container was destroyed:", zap.String("id", event.ID))
  210. dc.RemoveContainer(event.ID)
  211. default:
  212. dc.logger.Debug(
  213. "Docker container update:",
  214. zap.String("id", event.ID),
  215. zap.String("action", event.Action),
  216. )
  217. dc.InspectAndPersistContainer(ctx, event.ID)
  218. }
  219. if event.TimeNano > lastTime.UnixNano() {
  220. lastTime = time.Unix(0, event.TimeNano)
  221. }
  222. case err := <-errCh:
  223. // We are only interested when the context hasn't been canceled since requests made
  224. // with a closed context are guaranteed to fail.
  225. if ctx.Err() == nil {
  226. dc.logger.Error("Error watching docker container events", zap.Error(err))
  227. // Either decoding or connection error has occurred, so we should resume the event loop after
  228. // waiting a moment. In cases of extended daemon unavailability this will retry until
  229. // collector teardown or background context is closed.
  230. select {
  231. case <-time.After(3 * time.Second):
  232. continue EVENT_LOOP
  233. case <-ctx.Done():
  234. return
  235. }
  236. }
  237. }
  238. }
  239. }
  240. }
  241. // InspectAndPersistContainer queries inspect api and returns *ContainerJSON and true when container should be queried for stats,
  242. // nil and false otherwise. Persists the container in the cache if container is
  243. // running and not excluded.
  244. func (dc *Client) InspectAndPersistContainer(ctx context.Context, cid string) (*dtypes.ContainerJSON, bool) {
  245. if container, ok := dc.inspectedContainerIsOfInterest(ctx, cid); ok {
  246. dc.persistContainer(container)
  247. return container, ok
  248. }
  249. return nil, false
  250. }
  251. // Queries inspect api and returns *ContainerJSON and true when container should be queried for stats,
  252. // nil and false otherwise.
  253. func (dc *Client) inspectedContainerIsOfInterest(ctx context.Context, cid string) (*dtypes.ContainerJSON, bool) {
  254. inspectCtx, cancel := context.WithTimeout(ctx, dc.config.Timeout)
  255. container, err := dc.client.ContainerInspect(inspectCtx, cid)
  256. defer cancel()
  257. if err != nil {
  258. dc.logger.Error(
  259. "Could not inspect updated container",
  260. zap.String("id", cid),
  261. zap.Error(err),
  262. )
  263. } else if !dc.shouldBeExcluded(container.Config.Image) {
  264. return &container, true
  265. }
  266. return nil, false
  267. }
  268. func (dc *Client) persistContainer(containerJSON *dtypes.ContainerJSON) {
  269. if containerJSON == nil {
  270. return
  271. }
  272. cid := containerJSON.ID
  273. if !containerJSON.State.Running || containerJSON.State.Paused {
  274. dc.logger.Debug("Docker container not running. Will not persist.", zap.String("id", cid))
  275. dc.RemoveContainer(cid)
  276. return
  277. }
  278. dc.logger.Debug("Monitoring Docker container", zap.String("id", cid))
  279. dc.containersLock.Lock()
  280. defer dc.containersLock.Unlock()
  281. dc.containers[cid] = Container{
  282. ContainerJSON: containerJSON,
  283. EnvMap: ContainerEnvToMap(containerJSON.Config.Env),
  284. }
  285. }
  286. func (dc *Client) RemoveContainer(cid string) {
  287. dc.containersLock.Lock()
  288. defer dc.containersLock.Unlock()
  289. delete(dc.containers, cid)
  290. dc.logger.Debug("Removed container from stores.", zap.String("id", cid))
  291. }
  292. func (dc *Client) shouldBeExcluded(image string) bool {
  293. return dc.excludedImageMatcher != nil && dc.excludedImageMatcher.matches(image)
  294. }
  295. func ContainerEnvToMap(env []string) map[string]string {
  296. out := make(map[string]string, len(env))
  297. for _, v := range env {
  298. parts := strings.Split(v, "=")
  299. if len(parts) < 2 || strings.TrimSpace(parts[1]) == "" {
  300. continue
  301. }
  302. out[parts[0]] = parts[1]
  303. }
  304. return out
  305. }