emf_exporter_test.go 12 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awsemfexporter
  4. import (
  5. "context"
  6. "errors"
  7. "testing"
  8. "github.com/aws/aws-sdk-go/aws/awserr"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/mock"
  11. "github.com/stretchr/testify/require"
  12. "go.opentelemetry.io/collector/consumer/consumererror"
  13. "go.opentelemetry.io/collector/exporter/exportertest"
  14. "go.opentelemetry.io/collector/pdata/pmetric"
  15. "go.uber.org/zap"
  16. "go.uber.org/zap/zapcore"
  17. "go.uber.org/zap/zaptest/observer"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
  19. )
  20. const defaultRetryCount = 1
  21. type mockPusher struct {
  22. mock.Mock
  23. }
  24. func (p *mockPusher) AddLogEntry(_ *cwlogs.Event) error {
  25. args := p.Called(nil)
  26. errorStr := args.String(0)
  27. if errorStr != "" {
  28. return awserr.NewRequestFailure(nil, 400, "").(error)
  29. }
  30. return nil
  31. }
  32. func (p *mockPusher) ForceFlush() error {
  33. args := p.Called(nil)
  34. errorStr := args.String(0)
  35. if errorStr != "" {
  36. return awserr.NewRequestFailure(nil, 400, "").(error)
  37. }
  38. return nil
  39. }
  40. func TestConsumeMetrics(t *testing.T) {
  41. ctx, cancel := context.WithCancel(context.Background())
  42. defer cancel()
  43. factory := NewFactory()
  44. expCfg := factory.CreateDefaultConfig().(*Config)
  45. expCfg.Region = "us-west-2"
  46. expCfg.MaxRetries = 0
  47. exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
  48. assert.Nil(t, err)
  49. assert.NotNil(t, exp)
  50. md := generateTestMetrics(testMetric{
  51. metricNames: []string{"metric_1", "metric_2"},
  52. metricValues: [][]float64{{100}, {4}},
  53. })
  54. require.Error(t, exp.pushMetricsData(ctx, md))
  55. require.NoError(t, exp.shutdown(ctx))
  56. }
  57. func TestConsumeMetricsWithNaNValues(t *testing.T) {
  58. tests := []struct {
  59. testName string
  60. generateFunc func(string) pmetric.Metrics
  61. }{
  62. {
  63. "histograme-with-nan",
  64. generateTestHistogramMetricWithNaNs,
  65. }, {
  66. "gauge-with-nan",
  67. generateTestGaugeMetricNaN,
  68. }, {
  69. "summary-with-nan",
  70. generateTestSummaryMetricWithNaN,
  71. }, {
  72. "exponentialHistogram-with-nan",
  73. generateTestExponentialHistogramMetricWithNaNs,
  74. },
  75. }
  76. for _, tc := range tests {
  77. t.Run(tc.testName, func(t *testing.T) {
  78. ctx, cancel := context.WithCancel(context.Background())
  79. defer cancel()
  80. factory := NewFactory()
  81. expCfg := factory.CreateDefaultConfig().(*Config)
  82. expCfg.Region = "us-west-2"
  83. expCfg.MaxRetries = 0
  84. expCfg.OutputDestination = "stdout"
  85. exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
  86. assert.Nil(t, err)
  87. assert.NotNil(t, exp)
  88. md := tc.generateFunc(tc.testName)
  89. require.NoError(t, exp.pushMetricsData(ctx, md))
  90. require.NoError(t, exp.shutdown(ctx))
  91. })
  92. }
  93. }
  94. func TestConsumeMetricsWithOutputDestination(t *testing.T) {
  95. ctx, cancel := context.WithCancel(context.Background())
  96. defer cancel()
  97. factory := NewFactory()
  98. expCfg := factory.CreateDefaultConfig().(*Config)
  99. expCfg.Region = "us-west-2"
  100. expCfg.MaxRetries = 0
  101. expCfg.OutputDestination = "stdout"
  102. exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
  103. assert.Nil(t, err)
  104. assert.NotNil(t, exp)
  105. md := generateTestMetrics(testMetric{
  106. metricNames: []string{"metric_1", "metric_2"},
  107. metricValues: [][]float64{{100}, {4}},
  108. })
  109. require.NoError(t, exp.pushMetricsData(ctx, md))
  110. require.NoError(t, exp.shutdown(ctx))
  111. }
  112. func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
  113. ctx, cancel := context.WithCancel(context.Background())
  114. defer cancel()
  115. factory := NewFactory()
  116. expCfg := factory.CreateDefaultConfig().(*Config)
  117. expCfg.Region = "us-west-2"
  118. expCfg.MaxRetries = defaultRetryCount
  119. expCfg.LogGroupName = "test-logGroupName"
  120. expCfg.LogStreamName = "test-logStreamName"
  121. exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
  122. assert.Nil(t, err)
  123. assert.NotNil(t, exp)
  124. md := generateTestMetrics(testMetric{
  125. metricNames: []string{"metric_1", "metric_2"},
  126. metricValues: [][]float64{{100}, {4}},
  127. })
  128. require.Error(t, exp.pushMetricsData(ctx, md))
  129. require.NoError(t, exp.shutdown(ctx))
  130. pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
  131. LogGroupName: expCfg.LogGroupName,
  132. LogStreamName: expCfg.LogStreamName,
  133. }]
  134. assert.True(t, ok)
  135. assert.NotNil(t, pusherMap)
  136. }
  137. func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
  138. ctx, cancel := context.WithCancel(context.Background())
  139. defer cancel()
  140. factory := NewFactory()
  141. expCfg := factory.CreateDefaultConfig().(*Config)
  142. expCfg.Region = "us-west-2"
  143. expCfg.MaxRetries = defaultRetryCount
  144. expCfg.LogGroupName = "/aws/ecs/containerinsights/{ClusterName}/performance"
  145. expCfg.LogStreamName = "{TaskId}"
  146. exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
  147. assert.Nil(t, err)
  148. assert.NotNil(t, exp)
  149. md := generateTestMetrics(testMetric{
  150. metricNames: []string{"metric_1", "metric_2"},
  151. metricValues: [][]float64{{100}, {4}},
  152. resourceAttributeMap: map[string]any{
  153. "aws.ecs.cluster.name": "test-cluster-name",
  154. "aws.ecs.task.id": "test-task-id",
  155. },
  156. })
  157. require.Error(t, exp.pushMetricsData(ctx, md))
  158. require.NoError(t, exp.shutdown(ctx))
  159. pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
  160. LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance",
  161. LogStreamName: "test-task-id",
  162. }]
  163. assert.True(t, ok)
  164. assert.NotNil(t, pusherMap)
  165. }
  166. func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
  167. ctx, cancel := context.WithCancel(context.Background())
  168. defer cancel()
  169. factory := NewFactory()
  170. expCfg := factory.CreateDefaultConfig().(*Config)
  171. expCfg.Region = "us-west-2"
  172. expCfg.MaxRetries = defaultRetryCount
  173. expCfg.LogGroupName = "test-logGroupName"
  174. expCfg.LogStreamName = "{TaskId}"
  175. exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
  176. assert.Nil(t, err)
  177. assert.NotNil(t, exp)
  178. md := generateTestMetrics(testMetric{
  179. metricNames: []string{"metric_1", "metric_2"},
  180. metricValues: [][]float64{{100}, {4}},
  181. resourceAttributeMap: map[string]any{
  182. "aws.ecs.cluster.name": "test-cluster-name",
  183. "aws.ecs.task.id": "test-task-id",
  184. },
  185. })
  186. require.Error(t, exp.pushMetricsData(ctx, md))
  187. require.NoError(t, exp.shutdown(ctx))
  188. pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
  189. LogGroupName: expCfg.LogGroupName,
  190. LogStreamName: "test-task-id",
  191. }]
  192. assert.True(t, ok)
  193. assert.NotNil(t, pusherMap)
  194. }
  195. func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
  196. ctx, cancel := context.WithCancel(context.Background())
  197. defer cancel()
  198. factory := NewFactory()
  199. expCfg := factory.CreateDefaultConfig().(*Config)
  200. expCfg.Region = "us-west-2"
  201. expCfg.MaxRetries = defaultRetryCount
  202. expCfg.LogGroupName = "test-logGroupName"
  203. expCfg.LogStreamName = "{WrongKey}"
  204. exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
  205. assert.Nil(t, err)
  206. assert.NotNil(t, exp)
  207. md := generateTestMetrics(testMetric{
  208. metricNames: []string{"metric_1", "metric_2"},
  209. metricValues: [][]float64{{100}, {4}},
  210. resourceAttributeMap: map[string]any{
  211. "aws.ecs.cluster.name": "test-cluster-name",
  212. "aws.ecs.task.id": "test-task-id",
  213. },
  214. })
  215. require.Error(t, exp.pushMetricsData(ctx, md))
  216. require.NoError(t, exp.shutdown(ctx))
  217. pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
  218. LogGroupName: expCfg.LogGroupName,
  219. LogStreamName: expCfg.LogStreamName,
  220. }]
  221. assert.True(t, ok)
  222. assert.NotNil(t, pusherMap)
  223. }
  224. func TestPushMetricsDataWithErr(t *testing.T) {
  225. ctx, cancel := context.WithCancel(context.Background())
  226. defer cancel()
  227. factory := NewFactory()
  228. expCfg := factory.CreateDefaultConfig().(*Config)
  229. expCfg.Region = "us-west-2"
  230. expCfg.MaxRetries = 0
  231. expCfg.LogGroupName = "test-logGroupName"
  232. expCfg.LogStreamName = "test-logStreamName"
  233. exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
  234. assert.Nil(t, err)
  235. assert.NotNil(t, exp)
  236. logPusher := new(mockPusher)
  237. logPusher.On("AddLogEntry", nil).Return("some error").Once()
  238. logPusher.On("AddLogEntry", nil).Return("").Twice()
  239. logPusher.On("ForceFlush", nil).Return("some error").Once()
  240. logPusher.On("ForceFlush", nil).Return("").Once()
  241. logPusher.On("ForceFlush", nil).Return("some error").Once()
  242. exp.pusherMap = map[cwlogs.StreamKey]cwlogs.Pusher{}
  243. exp.pusherMap[cwlogs.StreamKey{
  244. LogGroupName: "test-logGroupName",
  245. LogStreamName: "test-logStreamName",
  246. }] = logPusher
  247. md := generateTestMetrics(testMetric{
  248. metricNames: []string{"metric_1", "metric_2"},
  249. metricValues: [][]float64{{100}, {4}},
  250. })
  251. assert.NotNil(t, exp.pushMetricsData(ctx, md))
  252. assert.NotNil(t, exp.pushMetricsData(ctx, md))
  253. assert.Nil(t, exp.pushMetricsData(ctx, md))
  254. assert.Nil(t, exp.shutdown(ctx))
  255. }
  256. func TestNewExporterWithoutConfig(t *testing.T) {
  257. factory := NewFactory()
  258. expCfg := factory.CreateDefaultConfig().(*Config)
  259. settings := exportertest.NewNopCreateSettings()
  260. t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake")
  261. exp, err := newEmfExporter(expCfg, settings)
  262. assert.NotNil(t, err)
  263. assert.Nil(t, exp)
  264. assert.Equal(t, settings.Logger, expCfg.logger)
  265. }
  266. func TestNewExporterWithMetricDeclarations(t *testing.T) {
  267. factory := NewFactory()
  268. expCfg := factory.CreateDefaultConfig().(*Config)
  269. expCfg.Region = "us-west-2"
  270. expCfg.MaxRetries = defaultRetryCount
  271. expCfg.LogGroupName = "test-logGroupName"
  272. expCfg.LogStreamName = "test-logStreamName"
  273. mds := []*MetricDeclaration{
  274. {
  275. MetricNameSelectors: []string{"a", "b"},
  276. },
  277. {
  278. MetricNameSelectors: []string{"c", "d"},
  279. },
  280. {
  281. MetricNameSelectors: nil,
  282. },
  283. {
  284. Dimensions: [][]string{
  285. {"foo"},
  286. {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"},
  287. },
  288. MetricNameSelectors: []string{"a"},
  289. },
  290. }
  291. expCfg.MetricDeclarations = mds
  292. obs, logs := observer.New(zap.WarnLevel)
  293. params := exportertest.NewNopCreateSettings()
  294. params.Logger = zap.New(obs)
  295. exp, err := newEmfExporter(expCfg, params)
  296. assert.Nil(t, err)
  297. assert.NotNil(t, exp)
  298. err = expCfg.Validate()
  299. assert.Nil(t, err)
  300. // Invalid metric declaration should be filtered out
  301. assert.Equal(t, 3, len(exp.config.MetricDeclarations))
  302. // Invalid dimensions (> 10 dims) should be filtered out
  303. assert.Equal(t, 1, len(exp.config.MetricDeclarations[2].Dimensions))
  304. // Test output warning logs
  305. expectedLogs := []observer.LoggedEntry{
  306. {
  307. Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "the default value for DimensionRollupOption will be changing to NoDimensionRollup" +
  308. "in a future release. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23997 for more" +
  309. "information"},
  310. Context: []zapcore.Field{},
  311. },
  312. {
  313. Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Dropped metric declaration."},
  314. Context: []zapcore.Field{zap.Error(errors.New("invalid metric declaration: no metric name selectors defined"))},
  315. },
  316. {
  317. Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Dropped dimension set: > 10 dimensions specified."},
  318. Context: []zapcore.Field{zap.String("dimensions", "a,b,c,d,e,f,g,h,i,j,k")},
  319. },
  320. }
  321. assert.Equal(t, len(expectedLogs), logs.Len())
  322. assert.Equal(t, expectedLogs, logs.AllUntimed())
  323. }
  324. func TestNewExporterWithoutSession(t *testing.T) {
  325. exp, err := newEmfExporter(nil, exportertest.NewNopCreateSettings())
  326. assert.NotNil(t, err)
  327. assert.Nil(t, exp)
  328. }
  329. func TestWrapErrorIfBadRequest(t *testing.T) {
  330. awsErr := awserr.NewRequestFailure(nil, 400, "").(error)
  331. err := wrapErrorIfBadRequest(awsErr)
  332. assert.True(t, consumererror.IsPermanent(err))
  333. awsErr = awserr.NewRequestFailure(nil, 500, "").(error)
  334. err = wrapErrorIfBadRequest(awsErr)
  335. assert.False(t, consumererror.IsPermanent(err))
  336. }
  337. // This test verifies that if func newEmfExporter() returns an error then newEmfExporter()
  338. // will do so.
  339. func TestNewEmfExporterWithoutConfig(t *testing.T) {
  340. factory := NewFactory()
  341. expCfg := factory.CreateDefaultConfig().(*Config)
  342. settings := exportertest.NewNopCreateSettings()
  343. t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake")
  344. exp, err := newEmfExporter(expCfg, settings)
  345. assert.NotNil(t, err)
  346. assert.Nil(t, exp)
  347. assert.Equal(t, settings.Logger, expCfg.logger)
  348. }