logs_test.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awscloudwatchreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchreceiver"
  4. import (
  5. "context"
  6. "fmt"
  7. "path/filepath"
  8. "testing"
  9. "time"
  10. "github.com/aws/aws-sdk-go/aws"
  11. "github.com/aws/aws-sdk-go/aws/request"
  12. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  13. "github.com/stretchr/testify/mock"
  14. "github.com/stretchr/testify/require"
  15. "go.opentelemetry.io/collector/component/componenttest"
  16. "go.opentelemetry.io/collector/consumer/consumertest"
  17. "go.uber.org/zap"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
  20. )
  21. func TestStart(t *testing.T) {
  22. cfg := createDefaultConfig().(*Config)
  23. cfg.Region = "us-west-1"
  24. cfg.Logs.Groups.AutodiscoverConfig = nil
  25. sink := &consumertest.LogsSink{}
  26. logsRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
  27. err := logsRcvr.Start(context.Background(), componenttest.NewNopHost())
  28. require.NoError(t, err)
  29. err = logsRcvr.Shutdown(context.Background())
  30. require.NoError(t, err)
  31. }
  32. func TestPrefixedConfig(t *testing.T) {
  33. cfg := createDefaultConfig().(*Config)
  34. cfg.Region = "us-west-1"
  35. cfg.Logs.PollInterval = 1 * time.Second
  36. cfg.Logs.Groups = GroupConfig{
  37. NamedConfigs: map[string]StreamConfig{
  38. testLogGroupName: {
  39. Names: []*string{&testLogStreamName},
  40. },
  41. },
  42. }
  43. sink := &consumertest.LogsSink{}
  44. alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
  45. alertRcvr.client = defaultMockClient()
  46. err := alertRcvr.Start(context.Background(), componenttest.NewNopHost())
  47. require.NoError(t, err)
  48. require.Eventually(t, func() bool {
  49. return sink.LogRecordCount() > 0
  50. }, 2*time.Second, 10*time.Millisecond)
  51. err = alertRcvr.Shutdown(context.Background())
  52. require.NoError(t, err)
  53. logs := sink.AllLogs()[0]
  54. expected, err := golden.ReadLogs(filepath.Join("testdata", "processed", "prefixed.yaml"))
  55. require.NoError(t, err)
  56. require.NoError(t, plogtest.CompareLogs(expected, logs, plogtest.IgnoreObservedTimestamp()))
  57. }
  58. func TestPrefixedNamedStreamsConfig(t *testing.T) {
  59. cfg := createDefaultConfig().(*Config)
  60. cfg.Region = "us-west-1"
  61. cfg.Logs.PollInterval = 1 * time.Second
  62. cfg.Logs.Groups = GroupConfig{
  63. NamedConfigs: map[string]StreamConfig{
  64. testLogGroupName: {
  65. Prefixes: []*string{&testLogStreamPrefix},
  66. },
  67. },
  68. }
  69. sink := &consumertest.LogsSink{}
  70. alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
  71. alertRcvr.client = defaultMockClient()
  72. err := alertRcvr.Start(context.Background(), componenttest.NewNopHost())
  73. require.NoError(t, err)
  74. require.Eventually(t, func() bool {
  75. return sink.LogRecordCount() > 0
  76. }, 2*time.Second, 10*time.Millisecond)
  77. groupRequests := alertRcvr.groupRequests
  78. require.Len(t, groupRequests, 1)
  79. require.Equal(t, groupRequests[0].groupName(), "test-log-group-name")
  80. err = alertRcvr.Shutdown(context.Background())
  81. require.NoError(t, err)
  82. logs := sink.AllLogs()[0]
  83. expected, err := golden.ReadLogs(filepath.Join("testdata", "processed", "prefixed.yaml"))
  84. require.NoError(t, err)
  85. require.NoError(t, plogtest.CompareLogs(expected, logs, plogtest.IgnoreObservedTimestamp()))
  86. }
  87. func TestDiscovery(t *testing.T) {
  88. cfg := createDefaultConfig().(*Config)
  89. cfg.Region = "us-west-1"
  90. cfg.Logs.PollInterval = 1 * time.Second
  91. cfg.Logs.Groups = GroupConfig{
  92. AutodiscoverConfig: &AutodiscoverConfig{
  93. Limit: 1,
  94. Streams: StreamConfig{
  95. Prefixes: []*string{&testLogStreamPrefix},
  96. Names: []*string{&testLogStreamMessage},
  97. },
  98. },
  99. }
  100. sink := &consumertest.LogsSink{}
  101. logsRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
  102. logsRcvr.client = defaultMockClient()
  103. require.NoError(t, logsRcvr.Start(context.Background(), componenttest.NewNopHost()))
  104. require.Eventually(t, func() bool {
  105. return sink.LogRecordCount() > 0
  106. }, 2*time.Second, 10*time.Millisecond)
  107. require.Equal(t, len(logsRcvr.groupRequests), 2)
  108. require.NoError(t, logsRcvr.Shutdown(context.Background()))
  109. }
  110. // Test to ensure that mid collection while streaming results we will
  111. // return early if Shutdown is called
  112. func TestShutdownWhileCollecting(t *testing.T) {
  113. cfg := createDefaultConfig().(*Config)
  114. cfg.Region = "us-west-1"
  115. cfg.Logs.PollInterval = 1 * time.Second
  116. cfg.Logs.Groups = GroupConfig{
  117. NamedConfigs: map[string]StreamConfig{
  118. testLogGroupName: {
  119. Names: []*string{&testLogStreamName},
  120. },
  121. },
  122. }
  123. sink := &consumertest.LogsSink{}
  124. alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
  125. doneChan := make(chan time.Time, 1)
  126. mc := &mockClient{}
  127. mc.On("FilterLogEventsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(&cloudwatchlogs.FilterLogEventsOutput{
  128. Events: []*cloudwatchlogs.FilteredLogEvent{},
  129. NextToken: aws.String("next"),
  130. }, nil).
  131. WaitUntil(doneChan)
  132. alertRcvr.client = mc
  133. err := alertRcvr.Start(context.Background(), componenttest.NewNopHost())
  134. require.NoError(t, err)
  135. require.Never(t, func() bool {
  136. return sink.LogRecordCount() > 0
  137. }, 3*time.Second, 10*time.Millisecond)
  138. close(doneChan)
  139. require.NoError(t, alertRcvr.Shutdown(context.Background()))
  140. }
  141. func TestAutodiscoverLimit(t *testing.T) {
  142. mc := &mockClient{}
  143. logGroups := []*cloudwatchlogs.LogGroup{}
  144. for i := 0; i <= 100; i++ {
  145. logGroups = append(logGroups, &cloudwatchlogs.LogGroup{
  146. LogGroupName: aws.String(fmt.Sprintf("test log group: %d", i)),
  147. })
  148. }
  149. token := "token"
  150. mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(
  151. &cloudwatchlogs.DescribeLogGroupsOutput{
  152. LogGroups: logGroups[:50],
  153. NextToken: &token,
  154. }, nil).Once()
  155. mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(
  156. &cloudwatchlogs.DescribeLogGroupsOutput{
  157. LogGroups: logGroups[50:],
  158. NextToken: nil,
  159. }, nil)
  160. numGroups := 100
  161. cfg := createDefaultConfig().(*Config)
  162. cfg.Region = "us-west-1"
  163. cfg.Logs.Groups = GroupConfig{
  164. AutodiscoverConfig: &AutodiscoverConfig{
  165. Prefix: "/aws/",
  166. Limit: numGroups,
  167. },
  168. }
  169. sink := &consumertest.LogsSink{}
  170. alertRcvr := newLogsReceiver(cfg, zap.NewNop(), sink)
  171. alertRcvr.client = mc
  172. grs, err := alertRcvr.discoverGroups(context.Background(), cfg.Logs.Groups.AutodiscoverConfig)
  173. require.NoError(t, err)
  174. require.Len(t, grs, cfg.Logs.Groups.AutodiscoverConfig.Limit)
  175. }
  176. func defaultMockClient() client {
  177. mc := &mockClient{}
  178. mc.On("DescribeLogGroupsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(
  179. &cloudwatchlogs.DescribeLogGroupsOutput{
  180. LogGroups: []*cloudwatchlogs.LogGroup{
  181. {
  182. LogGroupName: &testLogGroupName,
  183. },
  184. },
  185. NextToken: nil,
  186. }, nil)
  187. mc.On("FilterLogEventsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(
  188. &cloudwatchlogs.FilterLogEventsOutput{
  189. Events: []*cloudwatchlogs.FilteredLogEvent{
  190. {
  191. EventId: &testEventIDs[0],
  192. IngestionTime: aws.Int64(testIngestionTime),
  193. LogStreamName: aws.String(testLogStreamName),
  194. Message: aws.String(testLogStreamMessage),
  195. Timestamp: aws.Int64(testTimeStamp),
  196. },
  197. {
  198. EventId: &testEventIDs[1],
  199. IngestionTime: aws.Int64(testIngestionTime),
  200. LogStreamName: aws.String(testLogStreamName),
  201. Message: aws.String(testLogStreamMessage),
  202. Timestamp: aws.Int64(testTimeStamp),
  203. },
  204. {
  205. EventId: &testEventIDs[2],
  206. IngestionTime: aws.Int64(testIngestionTime),
  207. LogStreamName: aws.String(testLogStreamName2),
  208. Message: aws.String(testLogStreamMessage),
  209. Timestamp: aws.Int64(testTimeStamp),
  210. },
  211. {
  212. EventId: &testEventIDs[3],
  213. IngestionTime: aws.Int64(testIngestionTime),
  214. LogStreamName: aws.String(testLogStreamName2),
  215. Message: aws.String(testLogStreamMessage),
  216. Timestamp: aws.Int64(testTimeStamp),
  217. },
  218. },
  219. NextToken: nil,
  220. }, nil)
  221. return mc
  222. }
  223. var (
  224. testLogGroupName = "test-log-group-name"
  225. testLogStreamName = "test-log-stream-name"
  226. testLogStreamName2 = "test-log-stream-name-2"
  227. testLogStreamPrefix = "test-log-stream"
  228. testEventIDs = []string{
  229. "37134448277055698880077365577645869800162629528367333379",
  230. "37134448277055698880077365577645869800162629528367333380",
  231. "37134448277055698880077365577645869800162629528367333381",
  232. "37134448277055698880077365577645869800162629528367333382",
  233. }
  234. testIngestionTime = int64(1665166252124)
  235. testTimeStamp = int64(1665166251014)
  236. testLogStreamMessage = `"time=\"2022-10-07T18:10:46Z\" level=info msg=\"access granted\" arn=\"arn:aws:iam::892146088969:role/AWSWesleyClusterManagerLambda-NodeManagerRole-16UPVDKA1KBGI\" client=\"127.0.0.1:50252\" groups=\"[]\" method=POST path=/authenticate uid=\"aws-iam-authenticator:892146088969:AROA47OAM7QE2NWPDFDCW\" username=\"eks:node-manager\""`
  237. )
  238. type mockClient struct {
  239. mock.Mock
  240. }
  241. func (mc *mockClient) DescribeLogGroupsWithContext(ctx context.Context, input *cloudwatchlogs.DescribeLogGroupsInput, opts ...request.Option) (*cloudwatchlogs.DescribeLogGroupsOutput, error) {
  242. args := mc.Called(ctx, input, opts)
  243. return args.Get(0).(*cloudwatchlogs.DescribeLogGroupsOutput), args.Error(1)
  244. }
  245. func (mc *mockClient) FilterLogEventsWithContext(ctx context.Context, input *cloudwatchlogs.FilterLogEventsInput, opts ...request.Option) (*cloudwatchlogs.FilterLogEventsOutput, error) {
  246. args := mc.Called(ctx, input, opts)
  247. return args.Get(0).(*cloudwatchlogs.FilterLogEventsOutput), args.Error(1)
  248. }