123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package main
- import (
- "context"
- "errors"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/processor"
- "go.opentelemetry.io/collector/processor/memorylimiterprocessor"
- "go.opentelemetry.io/collector/processor/processortest"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"
- "github.com/open-telemetry/opentelemetry-collector-contrib/processor/remotetapprocessor"
- "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
- "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor"
- )
- func TestDefaultProcessors(t *testing.T) {
- t.Parallel()
- allFactories, err := components()
- require.NoError(t, err)
- procFactories := allFactories.Processors
- tests := []struct {
- getConfigFn getProcessorConfigFn
- processor component.Type
- skipLifecycle bool
- }{
- {
- processor: "attributes",
- getConfigFn: func() component.Config {
- cfg := procFactories["attributes"].CreateDefaultConfig().(*attributesprocessor.Config)
- cfg.Actions = []attraction.ActionKeyValue{
- {Key: "attribute1", Action: attraction.INSERT, Value: 123},
- }
- return cfg
- },
- },
- {
- processor: "batch",
- },
- {
- processor: "datadog",
- skipLifecycle: true, // requires external exporters to be configured to route data
- },
- {
- processor: "deltatorate",
- },
- {
- processor: "filter",
- },
- {
- processor: "groupbyattrs",
- },
- {
- processor: "groupbytrace",
- },
- {
- processor: "k8sattributes",
- skipLifecycle: true, // Requires a k8s API to communicate with
- },
- {
- processor: "memory_limiter",
- getConfigFn: func() component.Config {
- cfg := procFactories["memory_limiter"].CreateDefaultConfig().(*memorylimiterprocessor.Config)
- cfg.CheckInterval = 100 * time.Millisecond
- cfg.MemoryLimitMiB = 1024 * 1024
- return cfg
- },
- },
- {
- processor: "metricstransform",
- },
- {
- processor: "experimental_metricsgeneration",
- },
- {
- processor: "probabilistic_sampler",
- },
- {
- processor: "resourcedetection",
- },
- {
- processor: "resource",
- getConfigFn: func() component.Config {
- cfg := procFactories["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
- cfg.AttributesActions = []attraction.ActionKeyValue{
- {Key: "attribute1", Action: attraction.INSERT, Value: 123},
- }
- return cfg
- },
- },
- {
- processor: "routing",
- skipLifecycle: true, // Requires external exporters to be configured to route data
- },
- {
- processor: "span",
- getConfigFn: func() component.Config {
- cfg := procFactories["span"].CreateDefaultConfig().(*spanprocessor.Config)
- cfg.Rename.FromAttributes = []string{"test-key"}
- return cfg
- },
- },
- {
- processor: "servicegraph",
- skipLifecycle: true,
- },
- {
- processor: "spanmetrics",
- skipLifecycle: true, // Requires a running exporter to convert data to/from
- },
- {
- processor: "cumulativetodelta",
- },
- {
- processor: "tail_sampling",
- },
- {
- processor: "transform",
- },
- {
- processor: "redaction",
- },
- {
- processor: "remotetap",
- getConfigFn: func() component.Config {
- cfg := procFactories["remotetap"].CreateDefaultConfig().(*remotetapprocessor.Config)
- cfg.Endpoint = "localhost:0"
- return cfg
- },
- },
- {
- processor: "sumologic",
- },
- }
- assert.Equal(t, len(procFactories), len(tests), "All processors must be added to lifecycle tests")
- for _, tt := range tests {
- t.Run(string(tt.processor), func(t *testing.T) {
- factory := procFactories[tt.processor]
- assert.Equal(t, tt.processor, factory.Type())
- t.Run("shutdown", func(t *testing.T) {
- verifyProcessorShutdown(t, factory, tt.getConfigFn)
- })
- t.Run("lifecycle", func(t *testing.T) {
- if tt.skipLifecycle {
- t.SkipNow()
- }
- verifyProcessorLifecycle(t, factory, tt.getConfigFn)
- })
- })
- }
- }
- // getProcessorConfigFn is used customize the configuration passed to the verification.
- // This is used to change ports or provide values required but not provided by the
- // default configuration.
- type getProcessorConfigFn func() component.Config
- // verifyProcessorLifecycle is used to test if a processor type can handle the typical
- // lifecycle of a component. The getConfigFn parameter only need to be specified if
- // the test can't be done with the default configuration for the component.
- func verifyProcessorLifecycle(t *testing.T, factory processor.Factory, getConfigFn getProcessorConfigFn) {
- ctx := context.Background()
- host := newAssertNoErrorHost(t)
- processorCreationSet := processortest.NewNopCreateSettings()
- if getConfigFn == nil {
- getConfigFn = factory.CreateDefaultConfig
- }
- createFns := map[component.DataType]createProcessorFn{
- component.DataTypeLogs: wrapCreateLogsProc(factory),
- component.DataTypeTraces: wrapCreateTracesProc(factory),
- component.DataTypeMetrics: wrapCreateMetricsProc(factory),
- }
- for i := 0; i < 2; i++ {
- procs := make(map[component.DataType]component.Component)
- for dataType, createFn := range createFns {
- proc, err := createFn(ctx, processorCreationSet, getConfigFn())
- if errors.Is(err, component.ErrDataTypeIsNotSupported) {
- continue
- }
- require.NoError(t, err)
- procs[dataType] = proc
- require.NoError(t, proc.Start(ctx, host))
- }
- for dataType, proc := range procs {
- assert.NotPanics(t, func() {
- switch dataType {
- case component.DataTypeLogs:
- logsProc := proc.(processor.Logs)
- logs := testdata.GenerateLogsManyLogRecordsSameResource(2)
- if !logsProc.Capabilities().MutatesData {
- logs.MarkReadOnly()
- }
- assert.NoError(t, logsProc.ConsumeLogs(ctx, logs))
- case component.DataTypeMetrics:
- metricsProc := proc.(processor.Metrics)
- metrics := testdata.GenerateMetricsTwoMetrics()
- if !metricsProc.Capabilities().MutatesData {
- metrics.MarkReadOnly()
- }
- assert.NoError(t, metricsProc.ConsumeMetrics(ctx, metrics))
- case component.DataTypeTraces:
- tracesProc := proc.(processor.Traces)
- traces := testdata.GenerateTracesTwoSpansSameResource()
- if !tracesProc.Capabilities().MutatesData {
- traces.MarkReadOnly()
- }
- assert.NoError(t, tracesProc.ConsumeTraces(ctx, traces))
- }
- })
- require.NoError(t, proc.Shutdown(ctx))
- }
- }
- }
- // verifyProcessorShutdown is used to test if a processor type can be shutdown without being started first.
- // We disregard errors being returned by shutdown, we're just making sure the processors don't panic.
- func verifyProcessorShutdown(tb testing.TB, factory processor.Factory, getConfigFn getProcessorConfigFn) {
- ctx := context.Background()
- processorCreationSet := processortest.NewNopCreateSettings()
- if getConfigFn == nil {
- getConfigFn = factory.CreateDefaultConfig
- }
- createFns := []createProcessorFn{
- wrapCreateLogsProc(factory),
- wrapCreateTracesProc(factory),
- wrapCreateMetricsProc(factory),
- }
- for _, createFn := range createFns {
- p, err := createFn(ctx, processorCreationSet, getConfigFn())
- if errors.Is(err, component.ErrDataTypeIsNotSupported) {
- continue
- }
- if p == nil {
- continue
- }
- assert.NotPanics(tb, func() {
- _ = p.Shutdown(ctx)
- })
- }
- }
- type createProcessorFn func(
- ctx context.Context,
- set processor.CreateSettings,
- cfg component.Config,
- ) (component.Component, error)
- func wrapCreateLogsProc(factory processor.Factory) createProcessorFn {
- return func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) {
- return factory.CreateLogsProcessor(ctx, set, cfg, consumertest.NewNop())
- }
- }
- func wrapCreateMetricsProc(factory processor.Factory) createProcessorFn {
- return func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) {
- return factory.CreateMetricsProcessor(ctx, set, cfg, consumertest.NewNop())
- }
- }
- func wrapCreateTracesProc(factory processor.Factory) createProcessorFn {
- return func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) {
- return factory.CreateTracesProcessor(ctx, set, cfg, consumertest.NewNop())
- }
- }
|