123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package awsemfexporter
- import (
- "context"
- "errors"
- "testing"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/consumer/consumererror"
- "go.opentelemetry.io/collector/exporter/exportertest"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- "go.uber.org/zap/zaptest/observer"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
- )
- const defaultRetryCount = 1
- type mockPusher struct {
- mock.Mock
- }
- func (p *mockPusher) AddLogEntry(_ *cwlogs.Event) error {
- args := p.Called(nil)
- errorStr := args.String(0)
- if errorStr != "" {
- return awserr.NewRequestFailure(nil, 400, "").(error)
- }
- return nil
- }
- func (p *mockPusher) ForceFlush() error {
- args := p.Called(nil)
- errorStr := args.String(0)
- if errorStr != "" {
- return awserr.NewRequestFailure(nil, 400, "").(error)
- }
- return nil
- }
- func TestConsumeMetrics(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = 0
- exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- md := generateTestMetrics(testMetric{
- metricNames: []string{"metric_1", "metric_2"},
- metricValues: [][]float64{{100}, {4}},
- })
- require.Error(t, exp.pushMetricsData(ctx, md))
- require.NoError(t, exp.shutdown(ctx))
- }
- func TestConsumeMetricsWithNaNValues(t *testing.T) {
- tests := []struct {
- testName string
- generateFunc func(string) pmetric.Metrics
- }{
- {
- "histograme-with-nan",
- generateTestHistogramMetricWithNaNs,
- }, {
- "gauge-with-nan",
- generateTestGaugeMetricNaN,
- }, {
- "summary-with-nan",
- generateTestSummaryMetricWithNaN,
- }, {
- "exponentialHistogram-with-nan",
- generateTestExponentialHistogramMetricWithNaNs,
- },
- }
- for _, tc := range tests {
- t.Run(tc.testName, func(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = 0
- expCfg.OutputDestination = "stdout"
- exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- md := tc.generateFunc(tc.testName)
- require.NoError(t, exp.pushMetricsData(ctx, md))
- require.NoError(t, exp.shutdown(ctx))
- })
- }
- }
- func TestConsumeMetricsWithOutputDestination(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = 0
- expCfg.OutputDestination = "stdout"
- exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- md := generateTestMetrics(testMetric{
- metricNames: []string{"metric_1", "metric_2"},
- metricValues: [][]float64{{100}, {4}},
- })
- require.NoError(t, exp.pushMetricsData(ctx, md))
- require.NoError(t, exp.shutdown(ctx))
- }
- func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = defaultRetryCount
- expCfg.LogGroupName = "test-logGroupName"
- expCfg.LogStreamName = "test-logStreamName"
- exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- md := generateTestMetrics(testMetric{
- metricNames: []string{"metric_1", "metric_2"},
- metricValues: [][]float64{{100}, {4}},
- })
- require.Error(t, exp.pushMetricsData(ctx, md))
- require.NoError(t, exp.shutdown(ctx))
- pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
- LogGroupName: expCfg.LogGroupName,
- LogStreamName: expCfg.LogStreamName,
- }]
- assert.True(t, ok)
- assert.NotNil(t, pusherMap)
- }
- func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = defaultRetryCount
- expCfg.LogGroupName = "/aws/ecs/containerinsights/{ClusterName}/performance"
- expCfg.LogStreamName = "{TaskId}"
- exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- md := generateTestMetrics(testMetric{
- metricNames: []string{"metric_1", "metric_2"},
- metricValues: [][]float64{{100}, {4}},
- resourceAttributeMap: map[string]any{
- "aws.ecs.cluster.name": "test-cluster-name",
- "aws.ecs.task.id": "test-task-id",
- },
- })
- require.Error(t, exp.pushMetricsData(ctx, md))
- require.NoError(t, exp.shutdown(ctx))
- pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
- LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance",
- LogStreamName: "test-task-id",
- }]
- assert.True(t, ok)
- assert.NotNil(t, pusherMap)
- }
- func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = defaultRetryCount
- expCfg.LogGroupName = "test-logGroupName"
- expCfg.LogStreamName = "{TaskId}"
- exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- md := generateTestMetrics(testMetric{
- metricNames: []string{"metric_1", "metric_2"},
- metricValues: [][]float64{{100}, {4}},
- resourceAttributeMap: map[string]any{
- "aws.ecs.cluster.name": "test-cluster-name",
- "aws.ecs.task.id": "test-task-id",
- },
- })
- require.Error(t, exp.pushMetricsData(ctx, md))
- require.NoError(t, exp.shutdown(ctx))
- pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
- LogGroupName: expCfg.LogGroupName,
- LogStreamName: "test-task-id",
- }]
- assert.True(t, ok)
- assert.NotNil(t, pusherMap)
- }
- func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = defaultRetryCount
- expCfg.LogGroupName = "test-logGroupName"
- expCfg.LogStreamName = "{WrongKey}"
- exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- md := generateTestMetrics(testMetric{
- metricNames: []string{"metric_1", "metric_2"},
- metricValues: [][]float64{{100}, {4}},
- resourceAttributeMap: map[string]any{
- "aws.ecs.cluster.name": "test-cluster-name",
- "aws.ecs.task.id": "test-task-id",
- },
- })
- require.Error(t, exp.pushMetricsData(ctx, md))
- require.NoError(t, exp.shutdown(ctx))
- pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
- LogGroupName: expCfg.LogGroupName,
- LogStreamName: expCfg.LogStreamName,
- }]
- assert.True(t, ok)
- assert.NotNil(t, pusherMap)
- }
- func TestPushMetricsDataWithErr(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = 0
- expCfg.LogGroupName = "test-logGroupName"
- expCfg.LogStreamName = "test-logStreamName"
- exp, err := newEmfExporter(expCfg, exportertest.NewNopCreateSettings())
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- logPusher := new(mockPusher)
- logPusher.On("AddLogEntry", nil).Return("some error").Once()
- logPusher.On("AddLogEntry", nil).Return("").Twice()
- logPusher.On("ForceFlush", nil).Return("some error").Once()
- logPusher.On("ForceFlush", nil).Return("").Once()
- logPusher.On("ForceFlush", nil).Return("some error").Once()
- exp.pusherMap = map[cwlogs.StreamKey]cwlogs.Pusher{}
- exp.pusherMap[cwlogs.StreamKey{
- LogGroupName: "test-logGroupName",
- LogStreamName: "test-logStreamName",
- }] = logPusher
- md := generateTestMetrics(testMetric{
- metricNames: []string{"metric_1", "metric_2"},
- metricValues: [][]float64{{100}, {4}},
- })
- assert.NotNil(t, exp.pushMetricsData(ctx, md))
- assert.NotNil(t, exp.pushMetricsData(ctx, md))
- assert.Nil(t, exp.pushMetricsData(ctx, md))
- assert.Nil(t, exp.shutdown(ctx))
- }
- func TestNewExporterWithoutConfig(t *testing.T) {
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- settings := exportertest.NewNopCreateSettings()
- t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake")
- exp, err := newEmfExporter(expCfg, settings)
- assert.NotNil(t, err)
- assert.Nil(t, exp)
- assert.Equal(t, settings.Logger, expCfg.logger)
- }
- func TestNewExporterWithMetricDeclarations(t *testing.T) {
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- expCfg.Region = "us-west-2"
- expCfg.MaxRetries = defaultRetryCount
- expCfg.LogGroupName = "test-logGroupName"
- expCfg.LogStreamName = "test-logStreamName"
- mds := []*MetricDeclaration{
- {
- MetricNameSelectors: []string{"a", "b"},
- },
- {
- MetricNameSelectors: []string{"c", "d"},
- },
- {
- MetricNameSelectors: nil,
- },
- {
- Dimensions: [][]string{
- {"foo"},
- {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"},
- },
- MetricNameSelectors: []string{"a"},
- },
- }
- expCfg.MetricDeclarations = mds
- obs, logs := observer.New(zap.WarnLevel)
- params := exportertest.NewNopCreateSettings()
- params.Logger = zap.New(obs)
- exp, err := newEmfExporter(expCfg, params)
- assert.Nil(t, err)
- assert.NotNil(t, exp)
- err = expCfg.Validate()
- assert.Nil(t, err)
- // Invalid metric declaration should be filtered out
- assert.Equal(t, 3, len(exp.config.MetricDeclarations))
- // Invalid dimensions (> 10 dims) should be filtered out
- assert.Equal(t, 1, len(exp.config.MetricDeclarations[2].Dimensions))
- // Test output warning logs
- expectedLogs := []observer.LoggedEntry{
- {
- Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "the default value for DimensionRollupOption will be changing to NoDimensionRollup" +
- "in a future release. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23997 for more" +
- "information"},
- Context: []zapcore.Field{},
- },
- {
- Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Dropped metric declaration."},
- Context: []zapcore.Field{zap.Error(errors.New("invalid metric declaration: no metric name selectors defined"))},
- },
- {
- Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Dropped dimension set: > 10 dimensions specified."},
- Context: []zapcore.Field{zap.String("dimensions", "a,b,c,d,e,f,g,h,i,j,k")},
- },
- }
- assert.Equal(t, len(expectedLogs), logs.Len())
- assert.Equal(t, expectedLogs, logs.AllUntimed())
- }
- func TestNewExporterWithoutSession(t *testing.T) {
- exp, err := newEmfExporter(nil, exportertest.NewNopCreateSettings())
- assert.NotNil(t, err)
- assert.Nil(t, exp)
- }
- func TestWrapErrorIfBadRequest(t *testing.T) {
- awsErr := awserr.NewRequestFailure(nil, 400, "").(error)
- err := wrapErrorIfBadRequest(awsErr)
- assert.True(t, consumererror.IsPermanent(err))
- awsErr = awserr.NewRequestFailure(nil, 500, "").(error)
- err = wrapErrorIfBadRequest(awsErr)
- assert.False(t, consumererror.IsPermanent(err))
- }
- // This test verifies that if func newEmfExporter() returns an error then newEmfExporter()
- // will do so.
- func TestNewEmfExporterWithoutConfig(t *testing.T) {
- factory := NewFactory()
- expCfg := factory.CreateDefaultConfig().(*Config)
- settings := exportertest.NewNopCreateSettings()
- t.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake")
- exp, err := newEmfExporter(expCfg, settings)
- assert.NotNil(t, err)
- assert.Nil(t, exp)
- assert.Equal(t, settings.Logger, expCfg.logger)
- }
|