header_extraction.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
  4. import (
  5. "fmt"
  6. "github.com/IBM/sarama"
  7. "go.opentelemetry.io/collector/pdata/plog"
  8. "go.opentelemetry.io/collector/pdata/pmetric"
  9. "go.opentelemetry.io/collector/pdata/ptrace"
  10. "go.uber.org/zap"
  11. )
  12. func getAttribute(key string) string {
  13. return fmt.Sprintf("kafka.header.%s", key)
  14. }
  15. type HeaderExtractor interface {
  16. extractHeadersTraces(ptrace.Traces, *sarama.ConsumerMessage)
  17. extractHeadersMetrics(pmetric.Metrics, *sarama.ConsumerMessage)
  18. extractHeadersLogs(plog.Logs, *sarama.ConsumerMessage)
  19. }
  20. type headerExtractor struct {
  21. logger *zap.Logger
  22. headers []string
  23. }
  24. func (he *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) {
  25. for _, header := range he.headers {
  26. value, ok := getHeaderValue(message.Headers, header)
  27. if !ok {
  28. he.logger.Debug("Header key not found in the trace: ", zap.String("key", header))
  29. continue
  30. }
  31. for i := 0; i < traces.ResourceSpans().Len(); i++ {
  32. rs := traces.ResourceSpans().At(i)
  33. rs.Resource().Attributes().PutStr(getAttribute(header), value)
  34. }
  35. }
  36. }
  37. func (he *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) {
  38. for _, header := range he.headers {
  39. value, ok := getHeaderValue(message.Headers, header)
  40. if !ok {
  41. he.logger.Debug("Header key not found in the log: ", zap.String("key", header))
  42. continue
  43. }
  44. for i := 0; i < logs.ResourceLogs().Len(); i++ {
  45. rl := logs.ResourceLogs().At(i)
  46. rl.Resource().Attributes().PutStr(getAttribute(header), value)
  47. }
  48. }
  49. }
  50. func (he *headerExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) {
  51. for _, header := range he.headers {
  52. value, ok := getHeaderValue(message.Headers, header)
  53. if !ok {
  54. he.logger.Debug("Header key not found in the metric: ", zap.String("key", header))
  55. continue
  56. }
  57. for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
  58. rm := metrics.ResourceMetrics().At(i)
  59. rm.Resource().Attributes().PutStr(getAttribute(header), value)
  60. }
  61. }
  62. }
  63. func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool) {
  64. for _, kafkaHeader := range headers {
  65. headerKey := string(kafkaHeader.Key)
  66. if headerKey == header {
  67. // matching header found
  68. return string(kafkaHeader.Value), true
  69. }
  70. }
  71. // no header found matching the key, report to the user
  72. return "", false
  73. }
  74. type nopHeaderExtractor struct{}
  75. func (he *nopHeaderExtractor) extractHeadersTraces(_ ptrace.Traces, _ *sarama.ConsumerMessage) {
  76. }
  77. func (he *nopHeaderExtractor) extractHeadersLogs(_ plog.Logs, _ *sarama.ConsumerMessage) {
  78. }
  79. func (he *nopHeaderExtractor) extractHeadersMetrics(_ pmetric.Metrics, _ *sarama.ConsumerMessage) {
  80. }