123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- //go:build !windows
- // +build !windows
- package podmanreceiver
- import (
- "context"
- "fmt"
- "net/http"
- "net/http/httptest"
- "net/url"
- "os"
- "strings"
- "sync"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/receiver/scraperhelper"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- "go.uber.org/zap/zaptest/observer"
- )
- type MockClient struct {
- PingF func(context.Context) error
- StatsF func(context.Context, url.Values) ([]containerStats, error)
- ListF func(context.Context, url.Values) ([]container, error)
- EventsF func(context.Context, url.Values) (<-chan event, <-chan error)
- }
- func (c *MockClient) ping(ctx context.Context) error {
- return c.PingF(ctx)
- }
- func (c *MockClient) stats(ctx context.Context, options url.Values) ([]containerStats, error) {
- return c.StatsF(ctx, options)
- }
- func (c *MockClient) list(ctx context.Context, options url.Values) ([]container, error) {
- return c.ListF(ctx, options)
- }
- func (c *MockClient) events(ctx context.Context, options url.Values) (<-chan event, <-chan error) {
- return c.EventsF(ctx, options)
- }
- var baseClient = MockClient{
- PingF: func(context.Context) error {
- return nil
- },
- StatsF: func(context.Context, url.Values) ([]containerStats, error) {
- return nil, nil
- },
- ListF: func(context.Context, url.Values) ([]container, error) {
- return nil, nil
- },
- EventsF: func(context.Context, url.Values) (<-chan event, <-chan error) {
- return nil, nil
- },
- }
- func TestWatchingTimeouts(t *testing.T) {
- listener, addr := tmpSock(t)
- defer listener.Close()
- defer os.Remove(addr)
- config := &Config{
- Endpoint: fmt.Sprintf("unix://%s", addr),
- ScraperControllerSettings: scraperhelper.ScraperControllerSettings{
- Timeout: 50 * time.Millisecond,
- },
- }
- client, err := newLibpodClient(zap.NewNop(), config)
- assert.Nil(t, err)
- cli := newContainerScraper(client, zap.NewNop(), config)
- assert.NotNil(t, cli)
- expectedError := "context deadline exceeded"
- shouldHaveTaken := time.Now().Add(100 * time.Millisecond).UnixNano()
- err = cli.loadContainerList(context.Background())
- require.Error(t, err)
- ctx, fetchCancel := context.WithTimeout(context.Background(), config.Timeout)
- defer fetchCancel()
- container, err := cli.fetchContainerStats(ctx, container{})
- require.Error(t, err)
- assert.Contains(t, err.Error(), expectedError)
- assert.Empty(t, container)
- assert.GreaterOrEqual(
- t, time.Now().UnixNano(), shouldHaveTaken,
- "Client timeouts don't appear to have been exercised.",
- )
- }
- func TestEventLoopHandlesError(t *testing.T) {
- wg := sync.WaitGroup{}
- wg.Add(2) // confirm retry occurs
- listener, addr := tmpSock(t)
- defer listener.Close()
- defer os.Remove(addr)
- srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if strings.Contains(r.URL.Path, "/events") {
- wg.Done()
- }
- _, err := w.Write([]byte{})
- assert.NoError(t, err)
- }))
- srv.Listener = listener
- srv.Start()
- defer srv.Close()
- observed, logs := observer.New(zapcore.WarnLevel)
- config := &Config{
- Endpoint: fmt.Sprintf("unix://%s", addr),
- ScraperControllerSettings: scraperhelper.ScraperControllerSettings{
- Timeout: 50 * time.Millisecond,
- },
- }
- client, err := newLibpodClient(zap.NewNop(), config)
- assert.Nil(t, err)
- cli := newContainerScraper(client, zap.New(observed), config)
- assert.NotNil(t, cli)
- go cli.containerEventLoop(context.Background())
- assert.Eventually(t, func() bool {
- for _, l := range logs.All() {
- assert.Contains(t, l.Message, "Error watching podman container events")
- assert.Contains(t, l.ContextMap()["error"], "EOF")
- }
- return len(logs.All()) > 0
- }, 1*time.Second, 1*time.Millisecond, "failed to find desired error logs.")
- finished := make(chan struct{})
- go func() {
- defer close(finished)
- wg.Wait()
- }()
- select {
- case <-time.After(5 * time.Second):
- t.Fatal("failed to retry events endpoint after error")
- case <-finished:
- return
- }
- }
- func TestEventLoopHandles(t *testing.T) {
- eventChan := make(chan event)
- errChan := make(chan error)
- eventClient := baseClient
- eventClient.EventsF = func(context.Context, url.Values) (<-chan event, <-chan error) {
- return eventChan, errChan
- }
- eventClient.ListF = func(context.Context, url.Values) ([]container, error) {
- return []container{{
- ID: "c1",
- }}, nil
- }
- cli := newContainerScraper(&eventClient, zap.NewNop(), &Config{})
- assert.NotNil(t, cli)
- assert.Equal(t, 0, len(cli.containers))
- go cli.containerEventLoop(context.Background())
- eventChan <- event{ID: "c1", Status: "start"}
- assert.Eventually(t, func() bool {
- cli.containersLock.Lock()
- defer cli.containersLock.Unlock()
- return assert.Equal(t, 1, len(cli.containers))
- }, 1*time.Second, 1*time.Millisecond, "failed to update containers list.")
- eventChan <- event{ID: "c1", Status: "died"}
- assert.Eventually(t, func() bool {
- cli.containersLock.Lock()
- defer cli.containersLock.Unlock()
- return assert.Equal(t, 0, len(cli.containers))
- }, 1*time.Second, 1*time.Millisecond, "failed to update containers list.")
- }
- func TestInspectAndPersistContainer(t *testing.T) {
- inspectClient := baseClient
- inspectClient.ListF = func(context.Context, url.Values) ([]container, error) {
- return []container{{
- ID: "c1",
- }}, nil
- }
- cli := newContainerScraper(&inspectClient, zap.NewNop(), &Config{})
- assert.NotNil(t, cli)
- assert.Equal(t, 0, len(cli.containers))
- stats, ok := cli.inspectAndPersistContainer(context.Background(), "c1")
- assert.True(t, ok)
- assert.NotNil(t, stats)
- assert.Equal(t, 1, len(cli.containers))
- }
|