processors_test.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package main
  4. import (
  5. "context"
  6. "errors"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. "go.opentelemetry.io/collector/component"
  12. "go.opentelemetry.io/collector/consumer/consumertest"
  13. "go.opentelemetry.io/collector/processor"
  14. "go.opentelemetry.io/collector/processor/memorylimiterprocessor"
  15. "go.opentelemetry.io/collector/processor/processortest"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/processor/remotetapprocessor"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
  21. "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor"
  22. )
  23. func TestDefaultProcessors(t *testing.T) {
  24. t.Parallel()
  25. allFactories, err := components()
  26. require.NoError(t, err)
  27. procFactories := allFactories.Processors
  28. tests := []struct {
  29. getConfigFn getProcessorConfigFn
  30. processor component.Type
  31. skipLifecycle bool
  32. }{
  33. {
  34. processor: "attributes",
  35. getConfigFn: func() component.Config {
  36. cfg := procFactories["attributes"].CreateDefaultConfig().(*attributesprocessor.Config)
  37. cfg.Actions = []attraction.ActionKeyValue{
  38. {Key: "attribute1", Action: attraction.INSERT, Value: 123},
  39. }
  40. return cfg
  41. },
  42. },
  43. {
  44. processor: "batch",
  45. },
  46. {
  47. processor: "datadog",
  48. skipLifecycle: true, // requires external exporters to be configured to route data
  49. },
  50. {
  51. processor: "deltatorate",
  52. },
  53. {
  54. processor: "filter",
  55. },
  56. {
  57. processor: "groupbyattrs",
  58. },
  59. {
  60. processor: "groupbytrace",
  61. },
  62. {
  63. processor: "k8sattributes",
  64. skipLifecycle: true, // Requires a k8s API to communicate with
  65. },
  66. {
  67. processor: "memory_limiter",
  68. getConfigFn: func() component.Config {
  69. cfg := procFactories["memory_limiter"].CreateDefaultConfig().(*memorylimiterprocessor.Config)
  70. cfg.CheckInterval = 100 * time.Millisecond
  71. cfg.MemoryLimitMiB = 1024 * 1024
  72. return cfg
  73. },
  74. },
  75. {
  76. processor: "metricstransform",
  77. },
  78. {
  79. processor: "experimental_metricsgeneration",
  80. },
  81. {
  82. processor: "probabilistic_sampler",
  83. },
  84. {
  85. processor: "resourcedetection",
  86. },
  87. {
  88. processor: "resource",
  89. getConfigFn: func() component.Config {
  90. cfg := procFactories["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
  91. cfg.AttributesActions = []attraction.ActionKeyValue{
  92. {Key: "attribute1", Action: attraction.INSERT, Value: 123},
  93. }
  94. return cfg
  95. },
  96. },
  97. {
  98. processor: "routing",
  99. skipLifecycle: true, // Requires external exporters to be configured to route data
  100. },
  101. {
  102. processor: "span",
  103. getConfigFn: func() component.Config {
  104. cfg := procFactories["span"].CreateDefaultConfig().(*spanprocessor.Config)
  105. cfg.Rename.FromAttributes = []string{"test-key"}
  106. return cfg
  107. },
  108. },
  109. {
  110. processor: "servicegraph",
  111. skipLifecycle: true,
  112. },
  113. {
  114. processor: "spanmetrics",
  115. skipLifecycle: true, // Requires a running exporter to convert data to/from
  116. },
  117. {
  118. processor: "cumulativetodelta",
  119. },
  120. {
  121. processor: "tail_sampling",
  122. },
  123. {
  124. processor: "transform",
  125. },
  126. {
  127. processor: "redaction",
  128. },
  129. {
  130. processor: "remotetap",
  131. getConfigFn: func() component.Config {
  132. cfg := procFactories["remotetap"].CreateDefaultConfig().(*remotetapprocessor.Config)
  133. cfg.Endpoint = "localhost:0"
  134. return cfg
  135. },
  136. },
  137. {
  138. processor: "sumologic",
  139. },
  140. }
  141. assert.Equal(t, len(procFactories), len(tests), "All processors must be added to lifecycle tests")
  142. for _, tt := range tests {
  143. t.Run(string(tt.processor), func(t *testing.T) {
  144. factory := procFactories[tt.processor]
  145. assert.Equal(t, tt.processor, factory.Type())
  146. t.Run("shutdown", func(t *testing.T) {
  147. verifyProcessorShutdown(t, factory, tt.getConfigFn)
  148. })
  149. t.Run("lifecycle", func(t *testing.T) {
  150. if tt.skipLifecycle {
  151. t.SkipNow()
  152. }
  153. verifyProcessorLifecycle(t, factory, tt.getConfigFn)
  154. })
  155. })
  156. }
  157. }
  158. // getProcessorConfigFn is used customize the configuration passed to the verification.
  159. // This is used to change ports or provide values required but not provided by the
  160. // default configuration.
  161. type getProcessorConfigFn func() component.Config
  162. // verifyProcessorLifecycle is used to test if a processor type can handle the typical
  163. // lifecycle of a component. The getConfigFn parameter only need to be specified if
  164. // the test can't be done with the default configuration for the component.
  165. func verifyProcessorLifecycle(t *testing.T, factory processor.Factory, getConfigFn getProcessorConfigFn) {
  166. ctx := context.Background()
  167. host := newAssertNoErrorHost(t)
  168. processorCreationSet := processortest.NewNopCreateSettings()
  169. if getConfigFn == nil {
  170. getConfigFn = factory.CreateDefaultConfig
  171. }
  172. createFns := map[component.DataType]createProcessorFn{
  173. component.DataTypeLogs: wrapCreateLogsProc(factory),
  174. component.DataTypeTraces: wrapCreateTracesProc(factory),
  175. component.DataTypeMetrics: wrapCreateMetricsProc(factory),
  176. }
  177. for i := 0; i < 2; i++ {
  178. procs := make(map[component.DataType]component.Component)
  179. for dataType, createFn := range createFns {
  180. proc, err := createFn(ctx, processorCreationSet, getConfigFn())
  181. if errors.Is(err, component.ErrDataTypeIsNotSupported) {
  182. continue
  183. }
  184. require.NoError(t, err)
  185. procs[dataType] = proc
  186. require.NoError(t, proc.Start(ctx, host))
  187. }
  188. for dataType, proc := range procs {
  189. assert.NotPanics(t, func() {
  190. switch dataType {
  191. case component.DataTypeLogs:
  192. logsProc := proc.(processor.Logs)
  193. logs := testdata.GenerateLogsManyLogRecordsSameResource(2)
  194. if !logsProc.Capabilities().MutatesData {
  195. logs.MarkReadOnly()
  196. }
  197. assert.NoError(t, logsProc.ConsumeLogs(ctx, logs))
  198. case component.DataTypeMetrics:
  199. metricsProc := proc.(processor.Metrics)
  200. metrics := testdata.GenerateMetricsTwoMetrics()
  201. if !metricsProc.Capabilities().MutatesData {
  202. metrics.MarkReadOnly()
  203. }
  204. assert.NoError(t, metricsProc.ConsumeMetrics(ctx, metrics))
  205. case component.DataTypeTraces:
  206. tracesProc := proc.(processor.Traces)
  207. traces := testdata.GenerateTracesTwoSpansSameResource()
  208. if !tracesProc.Capabilities().MutatesData {
  209. traces.MarkReadOnly()
  210. }
  211. assert.NoError(t, tracesProc.ConsumeTraces(ctx, traces))
  212. }
  213. })
  214. require.NoError(t, proc.Shutdown(ctx))
  215. }
  216. }
  217. }
  218. // verifyProcessorShutdown is used to test if a processor type can be shutdown without being started first.
  219. // We disregard errors being returned by shutdown, we're just making sure the processors don't panic.
  220. func verifyProcessorShutdown(tb testing.TB, factory processor.Factory, getConfigFn getProcessorConfigFn) {
  221. ctx := context.Background()
  222. processorCreationSet := processortest.NewNopCreateSettings()
  223. if getConfigFn == nil {
  224. getConfigFn = factory.CreateDefaultConfig
  225. }
  226. createFns := []createProcessorFn{
  227. wrapCreateLogsProc(factory),
  228. wrapCreateTracesProc(factory),
  229. wrapCreateMetricsProc(factory),
  230. }
  231. for _, createFn := range createFns {
  232. p, err := createFn(ctx, processorCreationSet, getConfigFn())
  233. if errors.Is(err, component.ErrDataTypeIsNotSupported) {
  234. continue
  235. }
  236. if p == nil {
  237. continue
  238. }
  239. assert.NotPanics(tb, func() {
  240. _ = p.Shutdown(ctx)
  241. })
  242. }
  243. }
  244. type createProcessorFn func(
  245. ctx context.Context,
  246. set processor.CreateSettings,
  247. cfg component.Config,
  248. ) (component.Component, error)
  249. func wrapCreateLogsProc(factory processor.Factory) createProcessorFn {
  250. return func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) {
  251. return factory.CreateLogsProcessor(ctx, set, cfg, consumertest.NewNop())
  252. }
  253. }
  254. func wrapCreateMetricsProc(factory processor.Factory) createProcessorFn {
  255. return func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) {
  256. return factory.CreateMetricsProcessor(ctx, set, cfg, consumertest.NewNop())
  257. }
  258. }
  259. func wrapCreateTracesProc(factory processor.Factory) createProcessorFn {
  260. return func(ctx context.Context, set processor.CreateSettings, cfg component.Config) (component.Component, error) {
  261. return factory.CreateTracesProcessor(ctx, set, cfg, consumertest.NewNop())
  262. }
  263. }