header_extraction_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkareceiver
  4. import (
  5. "context"
  6. "sync"
  7. "testing"
  8. "github.com/IBM/sarama"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. "go.opentelemetry.io/collector/consumer/consumertest"
  12. "go.opentelemetry.io/collector/pdata/pcommon"
  13. "go.opentelemetry.io/collector/pdata/pmetric"
  14. "go.opentelemetry.io/collector/pdata/ptrace"
  15. "go.opentelemetry.io/collector/receiver/receiverhelper"
  16. "go.opentelemetry.io/collector/receiver/receivertest"
  17. "go.uber.org/zap/zaptest"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
  19. )
  20. func TestHeaderExtractionTraces(t *testing.T) {
  21. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  22. ReceiverCreateSettings: receivertest.NewNopCreateSettings(),
  23. })
  24. require.NoError(t, err)
  25. nextConsumer := &consumertest.TracesSink{}
  26. c := tracesConsumerGroupHandler{
  27. unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
  28. logger: zaptest.NewLogger(t),
  29. ready: make(chan bool),
  30. nextConsumer: nextConsumer,
  31. obsrecv: obsrecv,
  32. }
  33. headers := []string{"headerKey1", "headerKey2"}
  34. c.headerExtractor = &headerExtractor{
  35. logger: zaptest.NewLogger(t),
  36. headers: headers,
  37. }
  38. groupClaim := &testConsumerGroupClaim{
  39. messageChan: make(chan *sarama.ConsumerMessage),
  40. }
  41. ctx, cancelFunc := context.WithCancel(context.Background())
  42. defer close(groupClaim.messageChan)
  43. testSession := testConsumerGroupSession{ctx: ctx}
  44. require.NoError(t, c.Setup(testSession))
  45. wg := sync.WaitGroup{}
  46. wg.Add(1)
  47. go func() {
  48. err = c.ConsumeClaim(testSession, groupClaim)
  49. for _, trace := range nextConsumer.AllTraces() {
  50. for i := 0; i < trace.ResourceSpans().Len(); i++ {
  51. rs := trace.ResourceSpans().At(i)
  52. validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValue1")
  53. validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValue2")
  54. }
  55. }
  56. assert.NoError(t, err)
  57. wg.Done()
  58. }()
  59. td := ptrace.NewTraces()
  60. td.ResourceSpans().AppendEmpty().Resource()
  61. td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans().AppendEmpty()
  62. unmarshaler := &ptrace.ProtoMarshaler{}
  63. bts, err := unmarshaler.MarshalTraces(td)
  64. groupClaim.messageChan <- &sarama.ConsumerMessage{
  65. Headers: []*sarama.RecordHeader{
  66. {
  67. Key: []byte("headerKey1"),
  68. Value: []byte("headerValue1"),
  69. },
  70. {
  71. Key: []byte("headerKey2"),
  72. Value: []byte("headerValue2"),
  73. },
  74. },
  75. Value: bts,
  76. }
  77. cancelFunc()
  78. wg.Wait()
  79. }
  80. func TestHeaderExtractionLogs(t *testing.T) {
  81. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  82. ReceiverCreateSettings: receivertest.NewNopCreateSettings(),
  83. })
  84. require.NoError(t, err)
  85. nextConsumer := &consumertest.LogsSink{}
  86. unmarshaler := newTextLogsUnmarshaler()
  87. unmarshaler, err = unmarshaler.WithEnc("utf-8")
  88. c := logsConsumerGroupHandler{
  89. unmarshaler: unmarshaler,
  90. logger: zaptest.NewLogger(t),
  91. ready: make(chan bool),
  92. nextConsumer: nextConsumer,
  93. obsrecv: obsrecv,
  94. }
  95. headers := []string{"headerKey1", "headerKey2"}
  96. c.headerExtractor = &headerExtractor{
  97. logger: zaptest.NewLogger(t),
  98. headers: headers,
  99. }
  100. groupClaim := &testConsumerGroupClaim{
  101. messageChan: make(chan *sarama.ConsumerMessage),
  102. }
  103. ctx, cancelFunc := context.WithCancel(context.Background())
  104. defer close(groupClaim.messageChan)
  105. testSession := testConsumerGroupSession{ctx: ctx}
  106. require.NoError(t, c.Setup(testSession))
  107. wg := sync.WaitGroup{}
  108. wg.Add(1)
  109. go func() {
  110. err = c.ConsumeClaim(testSession, groupClaim)
  111. for _, logs := range nextConsumer.AllLogs() {
  112. for i := 0; i < logs.ResourceLogs().Len(); i++ {
  113. rs := logs.ResourceLogs().At(i)
  114. validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValueLog1")
  115. validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValueLog2")
  116. }
  117. }
  118. assert.NoError(t, err)
  119. wg.Done()
  120. }()
  121. groupClaim.messageChan <- &sarama.ConsumerMessage{
  122. Headers: []*sarama.RecordHeader{
  123. {
  124. Key: []byte("headerKey1"),
  125. Value: []byte("headerValueLog1"),
  126. },
  127. {
  128. Key: []byte("headerKey2"),
  129. Value: []byte("headerValueLog2"),
  130. },
  131. },
  132. Value: []byte("Message"),
  133. }
  134. cancelFunc()
  135. wg.Wait()
  136. }
  137. func TestHeaderExtractionMetrics(t *testing.T) {
  138. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  139. ReceiverCreateSettings: receivertest.NewNopCreateSettings(),
  140. })
  141. require.NoError(t, err)
  142. nextConsumer := &consumertest.MetricsSink{}
  143. c := metricsConsumerGroupHandler{
  144. unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
  145. logger: zaptest.NewLogger(t),
  146. ready: make(chan bool),
  147. nextConsumer: nextConsumer,
  148. obsrecv: obsrecv,
  149. }
  150. headers := []string{"headerKey1", "headerKey2"}
  151. c.headerExtractor = &headerExtractor{
  152. logger: zaptest.NewLogger(t),
  153. headers: headers,
  154. }
  155. groupClaim := &testConsumerGroupClaim{
  156. messageChan: make(chan *sarama.ConsumerMessage),
  157. }
  158. ctx, cancelFunc := context.WithCancel(context.Background())
  159. defer close(groupClaim.messageChan)
  160. testSession := testConsumerGroupSession{ctx: ctx}
  161. require.NoError(t, c.Setup(testSession))
  162. wg := sync.WaitGroup{}
  163. wg.Add(1)
  164. go func() {
  165. err = c.ConsumeClaim(testSession, groupClaim)
  166. for _, metric := range nextConsumer.AllMetrics() {
  167. for i := 0; i < metric.ResourceMetrics().Len(); i++ {
  168. rs := metric.ResourceMetrics().At(i)
  169. validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValueMetric1")
  170. validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValueMetric2")
  171. }
  172. }
  173. assert.NoError(t, err)
  174. wg.Done()
  175. }()
  176. ld := testdata.GenerateMetricsOneMetric()
  177. unmarshaler := &pmetric.ProtoMarshaler{}
  178. bts, err := unmarshaler.MarshalMetrics(ld)
  179. groupClaim.messageChan <- &sarama.ConsumerMessage{
  180. Headers: []*sarama.RecordHeader{
  181. {
  182. Key: []byte("headerKey1"),
  183. Value: []byte("headerValueMetric1"),
  184. },
  185. {
  186. Key: []byte("headerKey2"),
  187. Value: []byte("headerValueMetric2"),
  188. },
  189. },
  190. Value: bts,
  191. }
  192. cancelFunc()
  193. wg.Wait()
  194. }
  195. func validateHeader(t *testing.T, rs pcommon.Resource, headerKey string, headerValue string) {
  196. val, ok := rs.Attributes().Get(headerKey)
  197. assert.Equal(t, ok, true)
  198. assert.Equal(t, val.Str(), headerValue)
  199. }