123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- //go:build !windows
- // +build !windows
- package podmanreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver"
- import (
- "context"
- "encoding/json"
- "net/url"
- "sync"
- "time"
- "go.uber.org/zap"
- )
- type clientFactory func(logger *zap.Logger, cfg *Config) (PodmanClient, error)
- type PodmanClient interface {
- ping(context.Context) error
- stats(context.Context, url.Values) ([]containerStats, error)
- list(context.Context, url.Values) ([]container, error)
- events(context.Context, url.Values) (<-chan event, <-chan error)
- }
- type ContainerScraper struct {
- client PodmanClient
- containers map[string]container
- containersLock sync.Mutex
- logger *zap.Logger
- config *Config
- }
- func newContainerScraper(engineClient PodmanClient, logger *zap.Logger, config *Config) *ContainerScraper {
- return &ContainerScraper{
- client: engineClient,
- containers: make(map[string]container),
- logger: logger,
- config: config,
- }
- }
- // containers provides a slice of container to use for individual fetchContainerStats calls.
- func (pc *ContainerScraper) getContainers() []container {
- pc.containersLock.Lock()
- defer pc.containersLock.Unlock()
- containers := make([]container, 0, len(pc.containers))
- for _, container := range pc.containers {
- containers = append(containers, container)
- }
- return containers
- }
- // loadContainerList will load the initial running container maps for
- // inspection and establishing which containers warrant stat gathering calls
- // by the receiver.
- func (pc *ContainerScraper) loadContainerList(ctx context.Context) error {
- params := url.Values{}
- runningFilter := map[string][]string{
- "status": {"running"},
- }
- jsonFilter, err := json.Marshal(runningFilter)
- if err != nil {
- return nil
- }
- params.Add("filters", string(jsonFilter))
- listCtx, cancel := context.WithTimeout(ctx, pc.config.Timeout)
- defer cancel()
- containerList, err := pc.client.list(listCtx, params)
- if err != nil {
- return err
- }
- for _, c := range containerList {
- pc.persistContainer(c)
- }
- return nil
- }
- func (pc *ContainerScraper) events(ctx context.Context, options url.Values) (<-chan event, <-chan error) {
- return pc.client.events(ctx, options)
- }
- func (pc *ContainerScraper) containerEventLoop(ctx context.Context) {
- filters := url.Values{}
- cidFilter := map[string][]string{
- "status": {"died", "start"},
- "type": {"container"},
- }
- jsonFilter, err := json.Marshal(cidFilter)
- if err != nil {
- return
- }
- filters.Add("filters", string(jsonFilter))
- EVENT_LOOP:
- for {
- eventCh, errCh := pc.events(ctx, filters)
- for {
- select {
- case <-ctx.Done():
- return
- case podmanEvent := <-eventCh:
- pc.logger.Info("Event received", zap.String("status", podmanEvent.Status))
- switch podmanEvent.Status {
- case "died":
- pc.logger.Debug("Podman container died:", zap.String("id", podmanEvent.ID))
- pc.removeContainer(podmanEvent.ID)
- case "start":
- pc.logger.Debug(
- "Podman container started:",
- zap.String("id", podmanEvent.ID),
- zap.String("status", podmanEvent.Status),
- )
- pc.inspectAndPersistContainer(ctx, podmanEvent.ID)
- }
- case err := <-errCh:
- // We are only interested when the context hasn't been canceled since requests made
- // with a closed context are guaranteed to fail.
- if ctx.Err() == nil {
- pc.logger.Error("Error watching podman container events", zap.Error(err))
- // Either decoding or connection error has occurred, so we should resume the event loop after
- // waiting a moment. In cases of extended daemon unavailability this will retry until
- // collector teardown or background context is closed.
- select {
- case <-time.After(3 * time.Second):
- continue EVENT_LOOP
- case <-ctx.Done():
- return
- }
- }
- }
- }
- }
- }
- // inspectAndPersistContainer queries inspect api and returns *container and true when container should be queried for stats,
- // nil and false otherwise. Persists the container in the cache if container is
- // running and not excluded.
- func (pc *ContainerScraper) inspectAndPersistContainer(ctx context.Context, cid string) (*container, bool) {
- params := url.Values{}
- cidFilter := map[string][]string{
- "id": {cid},
- }
- jsonFilter, err := json.Marshal(cidFilter)
- if err != nil {
- return nil, false
- }
- params.Add("filters", string(jsonFilter))
- inspectCtx, cancel := context.WithTimeout(ctx, pc.config.Timeout)
- defer cancel()
- container, err := pc.client.list(inspectCtx, params)
- if len(container) == 1 && err == nil {
- pc.persistContainer(container[0])
- return &container[0], true
- }
- pc.logger.Error(
- "Could not inspect updated container",
- zap.String("id", cid),
- zap.Error(err),
- )
- return nil, false
- }
- // fetchContainerStats will query the desired container stats
- func (pc *ContainerScraper) fetchContainerStats(ctx context.Context, c container) (containerStats, error) {
- params := url.Values{}
- params.Add("stream", "false")
- params.Add("containers", c.ID)
- stats, err := pc.client.stats(ctx, params)
- if err != nil || len(stats) < 1 {
- return containerStats{}, err
- }
- return stats[0], nil
- }
- func (pc *ContainerScraper) persistContainer(c container) {
- pc.logger.Debug("Monitoring Podman container", zap.String("id", c.ID))
- pc.containersLock.Lock()
- defer pc.containersLock.Unlock()
- pc.containers[c.ID] = c
- }
- func (pc *ContainerScraper) removeContainer(cid string) {
- pc.containersLock.Lock()
- defer pc.containersLock.Unlock()
- delete(pc.containers, cid)
- pc.logger.Debug("Removed container from stores.", zap.String("id", cid))
- }
|