123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
- import (
- "context"
- "time"
- "github.com/IBM/sarama"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/exporter"
- "go.opentelemetry.io/collector/exporter/exporterhelper"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata"
- )
- const (
- defaultTracesTopic = "otlp_spans"
- defaultMetricsTopic = "otlp_metrics"
- defaultLogsTopic = "otlp_logs"
- defaultEncoding = "otlp_proto"
- defaultBroker = "localhost:9092"
- // default from sarama.NewConfig()
- defaultMetadataRetryMax = 3
- // default from sarama.NewConfig()
- defaultMetadataRetryBackoff = time.Millisecond * 250
- // default from sarama.NewConfig()
- defaultMetadataFull = true
- // default max.message.bytes for the producer
- defaultProducerMaxMessageBytes = 1000000
- // default required_acks for the producer
- defaultProducerRequiredAcks = sarama.WaitForLocal
- // default from sarama.NewConfig()
- defaultCompression = "none"
- // default from sarama.NewConfig()
- defaultFluxMaxMessages = 0
- )
- // FactoryOption applies changes to kafkaExporterFactory.
- type FactoryOption func(factory *kafkaExporterFactory)
- // withTracesMarshalers adds tracesMarshalers.
- func withTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
- return func(factory *kafkaExporterFactory) {
- for _, marshaler := range tracesMarshalers {
- factory.tracesMarshalers[marshaler.Encoding()] = marshaler
- }
- }
- }
- // withMetricsMarshalers adds additional metric marshalers to the exporter factory.
- func withMetricsMarshalers(metricMarshalers ...MetricsMarshaler) FactoryOption {
- return func(factory *kafkaExporterFactory) {
- for _, marshaler := range metricMarshalers {
- factory.metricsMarshalers[marshaler.Encoding()] = marshaler
- }
- }
- }
- // withLogsMarshalers adds additional log marshalers to the exporter factory.
- func withLogsMarshalers(logsMarshalers ...LogsMarshaler) FactoryOption {
- return func(factory *kafkaExporterFactory) {
- for _, marshaler := range logsMarshalers {
- factory.logsMarshalers[marshaler.Encoding()] = marshaler
- }
- }
- }
- // NewFactory creates Kafka exporter factory.
- func NewFactory(options ...FactoryOption) exporter.Factory {
- f := &kafkaExporterFactory{
- tracesMarshalers: tracesMarshalers(),
- metricsMarshalers: metricsMarshalers(),
- logsMarshalers: logsMarshalers(),
- }
- for _, o := range options {
- o(f)
- }
- return exporter.NewFactory(
- metadata.Type,
- createDefaultConfig,
- exporter.WithTraces(f.createTracesExporter, metadata.TracesStability),
- exporter.WithMetrics(f.createMetricsExporter, metadata.MetricsStability),
- exporter.WithLogs(f.createLogsExporter, metadata.LogsStability),
- )
- }
- func createDefaultConfig() component.Config {
- return &Config{
- TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
- RetrySettings: exporterhelper.NewDefaultRetrySettings(),
- QueueSettings: exporterhelper.NewDefaultQueueSettings(),
- Brokers: []string{defaultBroker},
- // using an empty topic to track when it has not been set by user, default is based on traces or metrics.
- Topic: "",
- Encoding: defaultEncoding,
- Metadata: Metadata{
- Full: defaultMetadataFull,
- Retry: MetadataRetry{
- Max: defaultMetadataRetryMax,
- Backoff: defaultMetadataRetryBackoff,
- },
- },
- Producer: Producer{
- MaxMessageBytes: defaultProducerMaxMessageBytes,
- RequiredAcks: defaultProducerRequiredAcks,
- Compression: defaultCompression,
- FlushMaxMessages: defaultFluxMaxMessages,
- },
- }
- }
- type kafkaExporterFactory struct {
- tracesMarshalers map[string]TracesMarshaler
- metricsMarshalers map[string]MetricsMarshaler
- logsMarshalers map[string]LogsMarshaler
- }
- func (f *kafkaExporterFactory) createTracesExporter(
- ctx context.Context,
- set exporter.CreateSettings,
- cfg component.Config,
- ) (exporter.Traces, error) {
- oCfg := *(cfg.(*Config)) // Clone the config
- if oCfg.Topic == "" {
- oCfg.Topic = defaultTracesTopic
- }
- if oCfg.Encoding == "otlp_json" {
- set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
- }
- exp, err := newTracesExporter(oCfg, set, f.tracesMarshalers)
- if err != nil {
- return nil, err
- }
- return exporterhelper.NewTracesExporter(
- ctx,
- set,
- &oCfg,
- exp.tracesPusher,
- exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
- // Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
- // and will rely on the sarama Producer Timeout logic.
- exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
- exporterhelper.WithRetry(oCfg.RetrySettings),
- exporterhelper.WithQueue(oCfg.QueueSettings),
- exporterhelper.WithShutdown(exp.Close))
- }
- func (f *kafkaExporterFactory) createMetricsExporter(
- ctx context.Context,
- set exporter.CreateSettings,
- cfg component.Config,
- ) (exporter.Metrics, error) {
- oCfg := *(cfg.(*Config)) // Clone the config
- if oCfg.Topic == "" {
- oCfg.Topic = defaultMetricsTopic
- }
- if oCfg.Encoding == "otlp_json" {
- set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
- }
- exp, err := newMetricsExporter(oCfg, set, f.metricsMarshalers)
- if err != nil {
- return nil, err
- }
- return exporterhelper.NewMetricsExporter(
- ctx,
- set,
- &oCfg,
- exp.metricsDataPusher,
- exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
- // Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
- // and will rely on the sarama Producer Timeout logic.
- exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
- exporterhelper.WithRetry(oCfg.RetrySettings),
- exporterhelper.WithQueue(oCfg.QueueSettings),
- exporterhelper.WithShutdown(exp.Close))
- }
- func (f *kafkaExporterFactory) createLogsExporter(
- ctx context.Context,
- set exporter.CreateSettings,
- cfg component.Config,
- ) (exporter.Logs, error) {
- oCfg := *(cfg.(*Config)) // Clone the config
- if oCfg.Topic == "" {
- oCfg.Topic = defaultLogsTopic
- }
- if oCfg.Encoding == "otlp_json" {
- set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
- }
- exp, err := newLogsExporter(oCfg, set, f.logsMarshalers)
- if err != nil {
- return nil, err
- }
- return exporterhelper.NewLogsExporter(
- ctx,
- set,
- &oCfg,
- exp.logsDataPusher,
- exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
- // Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
- // and will rely on the sarama Producer Timeout logic.
- exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
- exporterhelper.WithRetry(oCfg.RetrySettings),
- exporterhelper.WithQueue(oCfg.QueueSettings),
- exporterhelper.WithShutdown(exp.Close))
- }
|