resource_processor_test.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package tests
  4. import (
  5. "context"
  6. "path/filepath"
  7. "testing"
  8. "github.com/stretchr/testify/require"
  9. "go.opentelemetry.io/collector/pdata/pmetric"
  10. "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
  11. )
  12. var (
  13. mockedConsumedResourceWithType = func() pmetric.Metrics {
  14. md := pmetric.NewMetrics()
  15. rm := md.ResourceMetrics().AppendEmpty()
  16. rm.Resource().Attributes().PutStr("opencensus.resourcetype", "host")
  17. rm.Resource().Attributes().PutStr("label-key", "label-value")
  18. m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
  19. m.SetName("metric-name")
  20. m.SetDescription("metric-description")
  21. m.SetUnit("metric-unit")
  22. m.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(0)
  23. return md
  24. }()
  25. mockedConsumedResourceEmpty = func() pmetric.Metrics {
  26. md := pmetric.NewMetrics()
  27. rm := md.ResourceMetrics().AppendEmpty()
  28. m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
  29. m.SetName("metric-name")
  30. m.SetDescription("metric-description")
  31. m.SetUnit("metric-unit")
  32. m.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(0)
  33. return md
  34. }()
  35. )
  36. type resourceProcessorTestCase struct {
  37. name string
  38. resourceProcessorConfig string
  39. mockedConsumedMetrics pmetric.Metrics
  40. expectedMetrics pmetric.Metrics
  41. }
  42. func getResourceProcessorTestCases() []resourceProcessorTestCase {
  43. tests := []resourceProcessorTestCase{
  44. {
  45. name: "update_and_rename_existing_attributes",
  46. resourceProcessorConfig: `
  47. resource:
  48. attributes:
  49. - key: label-key
  50. value: new-label-value
  51. action: update
  52. - key: resource-type
  53. from_attribute: opencensus.resourcetype
  54. action: upsert
  55. - key: opencensus.resourcetype
  56. action: delete
  57. `,
  58. mockedConsumedMetrics: mockedConsumedResourceWithType,
  59. expectedMetrics: func() pmetric.Metrics {
  60. md := pmetric.NewMetrics()
  61. rm := md.ResourceMetrics().AppendEmpty()
  62. rm.Resource().Attributes().PutStr("label-key", "new-label-value")
  63. rm.Resource().Attributes().PutStr("resource-type", "host")
  64. return md
  65. }(),
  66. },
  67. {
  68. name: "set_attribute_on_empty_resource",
  69. resourceProcessorConfig: `
  70. resource:
  71. attributes:
  72. - key: additional-label-key
  73. value: additional-label-value
  74. action: insert
  75. `,
  76. mockedConsumedMetrics: mockedConsumedResourceEmpty,
  77. expectedMetrics: func() pmetric.Metrics {
  78. md := pmetric.NewMetrics()
  79. rm := md.ResourceMetrics().AppendEmpty()
  80. rm.Resource().Attributes().PutStr("additional-label-key", "additional-label-value")
  81. return md
  82. }(),
  83. },
  84. }
  85. return tests
  86. }
  87. func TestMetricResourceProcessor(t *testing.T) {
  88. sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
  89. receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))
  90. tests := getResourceProcessorTestCases()
  91. for _, test := range tests {
  92. t.Run(test.name, func(t *testing.T) {
  93. resultDir, err := filepath.Abs(filepath.Join("results", t.Name()))
  94. require.NoError(t, err)
  95. agentProc := testbed.NewChildProcessCollector()
  96. processors := map[string]string{
  97. "resource": test.resourceProcessorConfig,
  98. }
  99. configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
  100. configCleanup, err := agentProc.PrepareConfig(configStr)
  101. require.NoError(t, err)
  102. defer configCleanup()
  103. options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10}
  104. dataProvider := testbed.NewPerfTestDataProvider(options)
  105. tc := testbed.NewTestCase(
  106. t,
  107. dataProvider,
  108. sender,
  109. receiver,
  110. agentProc,
  111. &testbed.PerfTestValidator{},
  112. performanceResultsSummary,
  113. )
  114. defer tc.Stop()
  115. tc.StartBackend()
  116. tc.StartAgent()
  117. defer tc.StopAgent()
  118. tc.EnableRecording()
  119. require.NoError(t, sender.Start())
  120. // Clear previously received metrics.
  121. tc.MockBackend.ClearReceivedItems()
  122. startCounter := tc.MockBackend.DataItemsReceived()
  123. sender, ok := tc.Sender.(testbed.MetricDataSender)
  124. require.True(t, ok, "unsupported metric sender")
  125. require.NoError(t, sender.ConsumeMetrics(context.Background(), test.mockedConsumedMetrics))
  126. // We bypass the load generator in this test, but make sure to increment the
  127. // counter since it is used in final reports.
  128. tc.LoadGenerator.IncDataItemsSent()
  129. tc.WaitFor(func() bool { return tc.MockBackend.DataItemsReceived() == startCounter+1 },
  130. "datapoints received")
  131. // Assert Resources
  132. m := tc.MockBackend.ReceivedMetrics[0]
  133. rm := m.ResourceMetrics()
  134. require.Equal(t, 1, rm.Len())
  135. expectidMD := test.expectedMetrics
  136. require.Equal(t,
  137. expectidMD.ResourceMetrics().At(0).Resource().Attributes().AsRaw(),
  138. rm.At(0).Resource().Attributes().AsRaw(),
  139. )
  140. })
  141. }
  142. }