123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package awscloudwatchreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchreceiver"
- import (
- "context"
- "fmt"
- "path/filepath"
- "testing"
- "time"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/request"
- "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
- "github.com/stretchr/testify/mock"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
- )
- func TestStart(t *testing.T) {
- cfg := createDefaultConfig().(*Config)
- cfg.Region = "us-west-1"
- cfg.Logs.Groups.AutodiscoverConfig = nil
- sink := &consumertest.LogsSink{}
- logsRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
- err := logsRcvr.Start(context.Background(), componenttest.NewNopHost())
- require.NoError(t, err)
- err = logsRcvr.Shutdown(context.Background())
- require.NoError(t, err)
- }
- func TestPrefixedConfig(t *testing.T) {
- cfg := createDefaultConfig().(*Config)
- cfg.Region = "us-west-1"
- cfg.Logs.PollInterval = 1 * time.Second
- cfg.Logs.Groups = GroupConfig{
- NamedConfigs: map[string]StreamConfig{
- testLogGroupName: {
- Names: []*string{&testLogStreamName},
- },
- },
- }
- sink := &consumertest.LogsSink{}
- alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
- alertRcvr.client = defaultMockClient()
- err := alertRcvr.Start(context.Background(), componenttest.NewNopHost())
- require.NoError(t, err)
- require.Eventually(t, func() bool {
- return sink.LogRecordCount() > 0
- }, 2*time.Second, 10*time.Millisecond)
- err = alertRcvr.Shutdown(context.Background())
- require.NoError(t, err)
- logs := sink.AllLogs()[0]
- expected, err := golden.ReadLogs(filepath.Join("testdata", "processed", "prefixed.yaml"))
- require.NoError(t, err)
- require.NoError(t, plogtest.CompareLogs(expected, logs, plogtest.IgnoreObservedTimestamp()))
- }
- func TestPrefixedNamedStreamsConfig(t *testing.T) {
- cfg := createDefaultConfig().(*Config)
- cfg.Region = "us-west-1"
- cfg.Logs.PollInterval = 1 * time.Second
- cfg.Logs.Groups = GroupConfig{
- NamedConfigs: map[string]StreamConfig{
- testLogGroupName: {
- Prefixes: []*string{&testLogStreamPrefix},
- },
- },
- }
- sink := &consumertest.LogsSink{}
- alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
- alertRcvr.client = defaultMockClient()
- err := alertRcvr.Start(context.Background(), componenttest.NewNopHost())
- require.NoError(t, err)
- require.Eventually(t, func() bool {
- return sink.LogRecordCount() > 0
- }, 2*time.Second, 10*time.Millisecond)
- groupRequests := alertRcvr.groupRequests
- require.Len(t, groupRequests, 1)
- require.Equal(t, groupRequests[0].groupName(), "test-log-group-name")
- err = alertRcvr.Shutdown(context.Background())
- require.NoError(t, err)
- logs := sink.AllLogs()[0]
- expected, err := golden.ReadLogs(filepath.Join("testdata", "processed", "prefixed.yaml"))
- require.NoError(t, err)
- require.NoError(t, plogtest.CompareLogs(expected, logs, plogtest.IgnoreObservedTimestamp()))
- }
- func TestDiscovery(t *testing.T) {
- cfg := createDefaultConfig().(*Config)
- cfg.Region = "us-west-1"
- cfg.Logs.PollInterval = 1 * time.Second
- cfg.Logs.Groups = GroupConfig{
- AutodiscoverConfig: &AutodiscoverConfig{
- Limit: 1,
- Streams: StreamConfig{
- Prefixes: []*string{&testLogStreamPrefix},
- Names: []*string{&testLogStreamMessage},
- },
- },
- }
- sink := &consumertest.LogsSink{}
- logsRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
- logsRcvr.client = defaultMockClient()
- require.NoError(t, logsRcvr.Start(context.Background(), componenttest.NewNopHost()))
- require.Eventually(t, func() bool {
- return sink.LogRecordCount() > 0
- }, 2*time.Second, 10*time.Millisecond)
- require.Equal(t, len(logsRcvr.groupRequests), 2)
- require.NoError(t, logsRcvr.Shutdown(context.Background()))
- }
- // Test to ensure that mid collection while streaming results we will
- // return early if Shutdown is called
- func TestShutdownWhileCollecting(t *testing.T) {
- cfg := createDefaultConfig().(*Config)
- cfg.Region = "us-west-1"
- cfg.Logs.PollInterval = 1 * time.Second
- cfg.Logs.Groups = GroupConfig{
- NamedConfigs: map[string]StreamConfig{
- testLogGroupName: {
- Names: []*string{&testLogStreamName},
- },
- },
- }
- sink := &consumertest.LogsSink{}
- alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
- doneChan := make(chan time.Time, 1)
- mc := &mockClient{}
- mc.On("FilterLogEventsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(&cloudwatchlogs.FilterLogEventsOutput{
- Events: []*cloudwatchlogs.FilteredLogEvent{},
- NextToken: aws.String("next"),
- }, nil).
- WaitUntil(doneChan)
- alertRcvr.client = mc
- err := alertRcvr.Start(context.Background(), componenttest.NewNopHost())
- require.NoError(t, err)
- require.Never(t, func() bool {
- return sink.LogRecordCount() > 0
- }, 3*time.Second, 10*time.Millisecond)
- close(doneChan)
- require.NoError(t, alertRcvr.Shutdown(context.Background()))
- }
- func TestAutodiscoverLimit(t *testing.T) {
- mc := &mockClient{}
- logGroups := []*cloudwatchlogs.LogGroup{}
- for i := 0; i <= 100; i++ {
- logGroups = append(logGroups, &cloudwatchlogs.LogGroup{
- LogGroupName: aws.String(fmt.Sprintf("test log group: %d", i)),
- })
- }
- token := "token"
- mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(
- &cloudwatchlogs.DescribeLogGroupsOutput{
- LogGroups: logGroups[:50],
- NextToken: &token,
- }, nil).Once()
- mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(
- &cloudwatchlogs.DescribeLogGroupsOutput{
- LogGroups: logGroups[50:],
- NextToken: nil,
- }, nil)
- numGroups := 100
- cfg := createDefaultConfig().(*Config)
- cfg.Region = "us-west-1"
- cfg.Logs.Groups = GroupConfig{
- AutodiscoverConfig: &AutodiscoverConfig{
- Prefix: "/aws/",
- Limit: numGroups,
- },
- }
- sink := &consumertest.LogsSink{}
- alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
- alertRcvr.client = mc
- grs, err := alertRcvr.discoverGroups(context.Background(), cfg.Logs.Groups.AutodiscoverConfig)
- require.NoError(t, err)
- require.Len(t, grs, cfg.Logs.Groups.AutodiscoverConfig.Limit)
- }
- func defaultMockClient() client {
- mc := &mockClient{}
- mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(
- &cloudwatchlogs.DescribeLogGroupsOutput{
- LogGroups: []*cloudwatchlogs.LogGroup{
- {
- LogGroupName: &testLogGroupName,
- },
- },
- NextToken: nil,
- }, nil)
- mc.On("FilterLogEventsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(
- &cloudwatchlogs.FilterLogEventsOutput{
- Events: []*cloudwatchlogs.FilteredLogEvent{
- {
- EventId: &testEventIDs[0],
- IngestionTime: aws.Int64(testIngestionTime),
- LogStreamName: aws.String(testLogStreamName),
- Message: aws.String(testLogStreamMessage),
- Timestamp: aws.Int64(testTimeStamp),
- },
- {
- EventId: &testEventIDs[1],
- IngestionTime: aws.Int64(testIngestionTime),
- LogStreamName: aws.String(testLogStreamName),
- Message: aws.String(testLogStreamMessage),
- Timestamp: aws.Int64(testTimeStamp),
- },
- {
- EventId: &testEventIDs[2],
- IngestionTime: aws.Int64(testIngestionTime),
- LogStreamName: aws.String(testLogStreamName2),
- Message: aws.String(testLogStreamMessage),
- Timestamp: aws.Int64(testTimeStamp),
- },
- {
- EventId: &testEventIDs[3],
- IngestionTime: aws.Int64(testIngestionTime),
- LogStreamName: aws.String(testLogStreamName2),
- Message: aws.String(testLogStreamMessage),
- Timestamp: aws.Int64(testTimeStamp),
- },
- },
- NextToken: nil,
- }, nil)
- return mc
- }
- var (
- testLogGroupName = "test-log-group-name"
- testLogStreamName = "test-log-stream-name"
- testLogStreamName2 = "test-log-stream-name-2"
- testLogStreamPrefix = "test-log-stream"
- testEventIDs = []string{
- "37134448277055698880077365577645869800162629528367333379",
- "37134448277055698880077365577645869800162629528367333380",
- "37134448277055698880077365577645869800162629528367333381",
- "37134448277055698880077365577645869800162629528367333382",
- }
- testIngestionTime = int64(1665166252124)
- testTimeStamp = int64(1665166251014)
- testLogStreamMessage = `"time=\"2022-10-07T18:10:46Z\" level=info msg=\"access granted\" arn=\"arn:aws:iam::892146088969:role/AWSWesleyClusterManagerLambda-NodeManagerRole-16UPVDKA1KBGI\" client=\"127.0.0.1:50252\" groups=\"[]\" method=POST path=/authenticate uid=\"aws-iam-authenticator:892146088969:AROA47OAM7QE2NWPDFDCW\" username=\"eks:node-manager\""`
- )
- type mockClient struct {
- mock.Mock
- }
- func (mc *mockClient) DescribeLogGroupsWithContext(ctx context.Context, input *cloudwatchlogs.DescribeLogGroupsInput, opts ...request.Option) (*cloudwatchlogs.DescribeLogGroupsOutput, error) {
- args := mc.Called(ctx, input, opts)
- return args.Get(0).(*cloudwatchlogs.DescribeLogGroupsOutput), args.Error(1)
- }
- func (mc *mockClient) FilterLogEventsWithContext(ctx context.Context, input *cloudwatchlogs.FilterLogEventsInput, opts ...request.Option) (*cloudwatchlogs.FilterLogEventsOutput, error) {
- args := mc.Called(ctx, input, opts)
- return args.Get(0).(*cloudwatchlogs.FilterLogEventsOutput), args.Error(1)
- }
|