// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package awscloudwatchreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchreceiver" import ( "context" "fmt" "path/filepath" "testing" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" ) func TestStart(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Region = "us-west-1" cfg.Logs.Groups.AutodiscoverConfig = nil sink := &consumertest.LogsSink{} logsRcvr := newLogsReceiver(cfg, zap.NewNop(), sink) err := logsRcvr.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) err = logsRcvr.Shutdown(context.Background()) require.NoError(t, err) } func TestPrefixedConfig(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Region = "us-west-1" cfg.Logs.PollInterval = 1 * time.Second cfg.Logs.Groups = GroupConfig{ NamedConfigs: map[string]StreamConfig{ testLogGroupName: { Names: []*string{&testLogStreamName}, }, }, } sink := &consumertest.LogsSink{} alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink) alertRcvr.client = defaultMockClient() err := alertRcvr.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) require.Eventually(t, func() bool { return sink.LogRecordCount() > 0 }, 2*time.Second, 10*time.Millisecond) err = alertRcvr.Shutdown(context.Background()) require.NoError(t, err) logs := sink.AllLogs()[0] expected, err := golden.ReadLogs(filepath.Join("testdata", "processed", "prefixed.yaml")) require.NoError(t, err) require.NoError(t, plogtest.CompareLogs(expected, logs, plogtest.IgnoreObservedTimestamp())) } func TestPrefixedNamedStreamsConfig(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Region = "us-west-1" cfg.Logs.PollInterval = 1 * time.Second cfg.Logs.Groups = GroupConfig{ NamedConfigs: map[string]StreamConfig{ testLogGroupName: { Prefixes: []*string{&testLogStreamPrefix}, }, }, } sink := &consumertest.LogsSink{} alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink) alertRcvr.client = defaultMockClient() err := alertRcvr.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) require.Eventually(t, func() bool { return sink.LogRecordCount() > 0 }, 2*time.Second, 10*time.Millisecond) groupRequests := alertRcvr.groupRequests require.Len(t, groupRequests, 1) require.Equal(t, groupRequests[0].groupName(), "test-log-group-name") err = alertRcvr.Shutdown(context.Background()) require.NoError(t, err) logs := sink.AllLogs()[0] expected, err := golden.ReadLogs(filepath.Join("testdata", "processed", "prefixed.yaml")) require.NoError(t, err) require.NoError(t, plogtest.CompareLogs(expected, logs, plogtest.IgnoreObservedTimestamp())) } func TestDiscovery(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Region = "us-west-1" cfg.Logs.PollInterval = 1 * time.Second cfg.Logs.Groups = GroupConfig{ AutodiscoverConfig: &AutodiscoverConfig{ Limit: 1, Streams: StreamConfig{ Prefixes: []*string{&testLogStreamPrefix}, Names: []*string{&testLogStreamMessage}, }, }, } sink := &consumertest.LogsSink{} logsRcvr := newLogsReceiver(cfg, zap.NewNop(), sink) logsRcvr.client = defaultMockClient() require.NoError(t, logsRcvr.Start(context.Background(), componenttest.NewNopHost())) require.Eventually(t, func() bool { return sink.LogRecordCount() > 0 }, 2*time.Second, 10*time.Millisecond) require.Equal(t, len(logsRcvr.groupRequests), 2) require.NoError(t, logsRcvr.Shutdown(context.Background())) } // Test to ensure that mid collection while streaming results we will // return early if Shutdown is called func TestShutdownWhileCollecting(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Region = "us-west-1" cfg.Logs.PollInterval = 1 * time.Second cfg.Logs.Groups = GroupConfig{ NamedConfigs: map[string]StreamConfig{ testLogGroupName: { Names: []*string{&testLogStreamName}, }, }, } sink := &consumertest.LogsSink{} alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink) doneChan := make(chan time.Time, 1) mc := &mockClient{} mc.On("FilterLogEventsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(&cloudwatchlogs.FilterLogEventsOutput{ Events: []*cloudwatchlogs.FilteredLogEvent{}, NextToken: aws.String("next"), }, nil). WaitUntil(doneChan) alertRcvr.client = mc err := alertRcvr.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) require.Never(t, func() bool { return sink.LogRecordCount() > 0 }, 3*time.Second, 10*time.Millisecond) close(doneChan) require.NoError(t, alertRcvr.Shutdown(context.Background())) } func TestAutodiscoverLimit(t *testing.T) { mc := &mockClient{} logGroups := []*cloudwatchlogs.LogGroup{} for i := 0; i <= 100; i++ { logGroups = append(logGroups, &cloudwatchlogs.LogGroup{ LogGroupName: aws.String(fmt.Sprintf("test log group: %d", i)), }) } token := "token" mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return( &cloudwatchlogs.DescribeLogGroupsOutput{ LogGroups: logGroups[:50], NextToken: &token, }, nil).Once() mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return( &cloudwatchlogs.DescribeLogGroupsOutput{ LogGroups: logGroups[50:], NextToken: nil, }, nil) numGroups := 100 cfg := createDefaultConfig().(*Config) cfg.Region = "us-west-1" cfg.Logs.Groups = GroupConfig{ AutodiscoverConfig: &AutodiscoverConfig{ Prefix: "/aws/", Limit: numGroups, }, } sink := &consumertest.LogsSink{} alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink) alertRcvr.client = mc grs, err := alertRcvr.discoverGroups(context.Background(), cfg.Logs.Groups.AutodiscoverConfig) require.NoError(t, err) require.Len(t, grs, cfg.Logs.Groups.AutodiscoverConfig.Limit) } func defaultMockClient() client { mc := &mockClient{} mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return( &cloudwatchlogs.DescribeLogGroupsOutput{ LogGroups: []*cloudwatchlogs.LogGroup{ { LogGroupName: &testLogGroupName, }, }, NextToken: nil, }, nil) mc.On("FilterLogEventsWithContext", mock.Anything, mock.Anything, mock.Anything).Return( &cloudwatchlogs.FilterLogEventsOutput{ Events: []*cloudwatchlogs.FilteredLogEvent{ { EventId: &testEventIDs[0], IngestionTime: aws.Int64(testIngestionTime), LogStreamName: aws.String(testLogStreamName), Message: aws.String(testLogStreamMessage), Timestamp: aws.Int64(testTimeStamp), }, { EventId: &testEventIDs[1], IngestionTime: aws.Int64(testIngestionTime), LogStreamName: aws.String(testLogStreamName), Message: aws.String(testLogStreamMessage), Timestamp: aws.Int64(testTimeStamp), }, { EventId: &testEventIDs[2], IngestionTime: aws.Int64(testIngestionTime), LogStreamName: aws.String(testLogStreamName2), Message: aws.String(testLogStreamMessage), Timestamp: aws.Int64(testTimeStamp), }, { EventId: &testEventIDs[3], IngestionTime: aws.Int64(testIngestionTime), LogStreamName: aws.String(testLogStreamName2), Message: aws.String(testLogStreamMessage), Timestamp: aws.Int64(testTimeStamp), }, }, NextToken: nil, }, nil) return mc } var ( testLogGroupName = "test-log-group-name" testLogStreamName = "test-log-stream-name" testLogStreamName2 = "test-log-stream-name-2" testLogStreamPrefix = "test-log-stream" testEventIDs = []string{ "37134448277055698880077365577645869800162629528367333379", "37134448277055698880077365577645869800162629528367333380", "37134448277055698880077365577645869800162629528367333381", "37134448277055698880077365577645869800162629528367333382", } testIngestionTime = int64(1665166252124) testTimeStamp = int64(1665166251014) testLogStreamMessage = `"time=\"2022-10-07T18:10:46Z\" level=info msg=\"access granted\" arn=\"arn:aws:iam::892146088969:role/AWSWesleyClusterManagerLambda-NodeManagerRole-16UPVDKA1KBGI\" client=\"127.0.0.1:50252\" groups=\"[]\" method=POST path=/authenticate uid=\"aws-iam-authenticator:892146088969:AROA47OAM7QE2NWPDFDCW\" username=\"eks:node-manager\""` ) type mockClient struct { mock.Mock } func (mc *mockClient) DescribeLogGroupsWithContext(ctx context.Context, input *cloudwatchlogs.DescribeLogGroupsInput, opts ...request.Option) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { args := mc.Called(ctx, input, opts) return args.Get(0).(*cloudwatchlogs.DescribeLogGroupsOutput), args.Error(1) } func (mc *mockClient) FilterLogEventsWithContext(ctx context.Context, input *cloudwatchlogs.FilterLogEventsInput, opts ...request.Option) (*cloudwatchlogs.FilterLogEventsOutput, error) { args := mc.Called(ctx, input, opts) return args.Get(0).(*cloudwatchlogs.FilterLogEventsOutput), args.Error(1) }