12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
- import (
- "fmt"
- "github.com/IBM/sarama"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.uber.org/zap"
- )
- func getAttribute(key string) string {
- return fmt.Sprintf("kafka.header.%s", key)
- }
- type HeaderExtractor interface {
- extractHeadersTraces(ptrace.Traces, *sarama.ConsumerMessage)
- extractHeadersMetrics(pmetric.Metrics, *sarama.ConsumerMessage)
- extractHeadersLogs(plog.Logs, *sarama.ConsumerMessage)
- }
- type headerExtractor struct {
- logger *zap.Logger
- headers []string
- }
- func (he *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) {
- for _, header := range he.headers {
- value, ok := getHeaderValue(message.Headers, header)
- if !ok {
- he.logger.Debug("Header key not found in the trace: ", zap.String("key", header))
- continue
- }
- for i := 0; i < traces.ResourceSpans().Len(); i++ {
- rs := traces.ResourceSpans().At(i)
- rs.Resource().Attributes().PutStr(getAttribute(header), value)
- }
- }
- }
- func (he *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) {
- for _, header := range he.headers {
- value, ok := getHeaderValue(message.Headers, header)
- if !ok {
- he.logger.Debug("Header key not found in the log: ", zap.String("key", header))
- continue
- }
- for i := 0; i < logs.ResourceLogs().Len(); i++ {
- rl := logs.ResourceLogs().At(i)
- rl.Resource().Attributes().PutStr(getAttribute(header), value)
- }
- }
- }
- func (he *headerExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) {
- for _, header := range he.headers {
- value, ok := getHeaderValue(message.Headers, header)
- if !ok {
- he.logger.Debug("Header key not found in the metric: ", zap.String("key", header))
- continue
- }
- for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
- rm := metrics.ResourceMetrics().At(i)
- rm.Resource().Attributes().PutStr(getAttribute(header), value)
- }
- }
- }
- func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool) {
- for _, kafkaHeader := range headers {
- headerKey := string(kafkaHeader.Key)
- if headerKey == header {
- // matching header found
- return string(kafkaHeader.Value), true
- }
- }
- // no header found matching the key, report to the user
- return "", false
- }
- type nopHeaderExtractor struct{}
- func (he *nopHeaderExtractor) extractHeadersTraces(_ ptrace.Traces, _ *sarama.ConsumerMessage) {
- }
- func (he *nopHeaderExtractor) extractHeadersLogs(_ plog.Logs, _ *sarama.ConsumerMessage) {
- }
- func (he *nopHeaderExtractor) extractHeadersMetrics(_ pmetric.Metrics, _ *sarama.ConsumerMessage) {
- }
|