factory.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
  4. import (
  5. "context"
  6. "time"
  7. "github.com/IBM/sarama"
  8. "go.opentelemetry.io/collector/component"
  9. "go.opentelemetry.io/collector/consumer"
  10. "go.opentelemetry.io/collector/exporter"
  11. "go.opentelemetry.io/collector/exporter/exporterhelper"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata"
  13. )
  14. const (
  15. defaultTracesTopic = "otlp_spans"
  16. defaultMetricsTopic = "otlp_metrics"
  17. defaultLogsTopic = "otlp_logs"
  18. defaultEncoding = "otlp_proto"
  19. defaultBroker = "localhost:9092"
  20. // default from sarama.NewConfig()
  21. defaultMetadataRetryMax = 3
  22. // default from sarama.NewConfig()
  23. defaultMetadataRetryBackoff = time.Millisecond * 250
  24. // default from sarama.NewConfig()
  25. defaultMetadataFull = true
  26. // default max.message.bytes for the producer
  27. defaultProducerMaxMessageBytes = 1000000
  28. // default required_acks for the producer
  29. defaultProducerRequiredAcks = sarama.WaitForLocal
  30. // default from sarama.NewConfig()
  31. defaultCompression = "none"
  32. // default from sarama.NewConfig()
  33. defaultFluxMaxMessages = 0
  34. )
  35. // FactoryOption applies changes to kafkaExporterFactory.
  36. type FactoryOption func(factory *kafkaExporterFactory)
  37. // withTracesMarshalers adds tracesMarshalers.
  38. func withTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
  39. return func(factory *kafkaExporterFactory) {
  40. for _, marshaler := range tracesMarshalers {
  41. factory.tracesMarshalers[marshaler.Encoding()] = marshaler
  42. }
  43. }
  44. }
  45. // withMetricsMarshalers adds additional metric marshalers to the exporter factory.
  46. func withMetricsMarshalers(metricMarshalers ...MetricsMarshaler) FactoryOption {
  47. return func(factory *kafkaExporterFactory) {
  48. for _, marshaler := range metricMarshalers {
  49. factory.metricsMarshalers[marshaler.Encoding()] = marshaler
  50. }
  51. }
  52. }
  53. // withLogsMarshalers adds additional log marshalers to the exporter factory.
  54. func withLogsMarshalers(logsMarshalers ...LogsMarshaler) FactoryOption {
  55. return func(factory *kafkaExporterFactory) {
  56. for _, marshaler := range logsMarshalers {
  57. factory.logsMarshalers[marshaler.Encoding()] = marshaler
  58. }
  59. }
  60. }
  61. // NewFactory creates Kafka exporter factory.
  62. func NewFactory(options ...FactoryOption) exporter.Factory {
  63. f := &kafkaExporterFactory{
  64. tracesMarshalers: tracesMarshalers(),
  65. metricsMarshalers: metricsMarshalers(),
  66. logsMarshalers: logsMarshalers(),
  67. }
  68. for _, o := range options {
  69. o(f)
  70. }
  71. return exporter.NewFactory(
  72. metadata.Type,
  73. createDefaultConfig,
  74. exporter.WithTraces(f.createTracesExporter, metadata.TracesStability),
  75. exporter.WithMetrics(f.createMetricsExporter, metadata.MetricsStability),
  76. exporter.WithLogs(f.createLogsExporter, metadata.LogsStability),
  77. )
  78. }
  79. func createDefaultConfig() component.Config {
  80. return &Config{
  81. TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
  82. RetrySettings: exporterhelper.NewDefaultRetrySettings(),
  83. QueueSettings: exporterhelper.NewDefaultQueueSettings(),
  84. Brokers: []string{defaultBroker},
  85. // using an empty topic to track when it has not been set by user, default is based on traces or metrics.
  86. Topic: "",
  87. Encoding: defaultEncoding,
  88. Metadata: Metadata{
  89. Full: defaultMetadataFull,
  90. Retry: MetadataRetry{
  91. Max: defaultMetadataRetryMax,
  92. Backoff: defaultMetadataRetryBackoff,
  93. },
  94. },
  95. Producer: Producer{
  96. MaxMessageBytes: defaultProducerMaxMessageBytes,
  97. RequiredAcks: defaultProducerRequiredAcks,
  98. Compression: defaultCompression,
  99. FlushMaxMessages: defaultFluxMaxMessages,
  100. },
  101. }
  102. }
  103. type kafkaExporterFactory struct {
  104. tracesMarshalers map[string]TracesMarshaler
  105. metricsMarshalers map[string]MetricsMarshaler
  106. logsMarshalers map[string]LogsMarshaler
  107. }
  108. func (f *kafkaExporterFactory) createTracesExporter(
  109. ctx context.Context,
  110. set exporter.CreateSettings,
  111. cfg component.Config,
  112. ) (exporter.Traces, error) {
  113. oCfg := *(cfg.(*Config)) // Clone the config
  114. if oCfg.Topic == "" {
  115. oCfg.Topic = defaultTracesTopic
  116. }
  117. if oCfg.Encoding == "otlp_json" {
  118. set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
  119. }
  120. exp, err := newTracesExporter(oCfg, set, f.tracesMarshalers)
  121. if err != nil {
  122. return nil, err
  123. }
  124. return exporterhelper.NewTracesExporter(
  125. ctx,
  126. set,
  127. &oCfg,
  128. exp.tracesPusher,
  129. exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
  130. // Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
  131. // and will rely on the sarama Producer Timeout logic.
  132. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
  133. exporterhelper.WithRetry(oCfg.RetrySettings),
  134. exporterhelper.WithQueue(oCfg.QueueSettings),
  135. exporterhelper.WithShutdown(exp.Close))
  136. }
  137. func (f *kafkaExporterFactory) createMetricsExporter(
  138. ctx context.Context,
  139. set exporter.CreateSettings,
  140. cfg component.Config,
  141. ) (exporter.Metrics, error) {
  142. oCfg := *(cfg.(*Config)) // Clone the config
  143. if oCfg.Topic == "" {
  144. oCfg.Topic = defaultMetricsTopic
  145. }
  146. if oCfg.Encoding == "otlp_json" {
  147. set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
  148. }
  149. exp, err := newMetricsExporter(oCfg, set, f.metricsMarshalers)
  150. if err != nil {
  151. return nil, err
  152. }
  153. return exporterhelper.NewMetricsExporter(
  154. ctx,
  155. set,
  156. &oCfg,
  157. exp.metricsDataPusher,
  158. exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
  159. // Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
  160. // and will rely on the sarama Producer Timeout logic.
  161. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
  162. exporterhelper.WithRetry(oCfg.RetrySettings),
  163. exporterhelper.WithQueue(oCfg.QueueSettings),
  164. exporterhelper.WithShutdown(exp.Close))
  165. }
  166. func (f *kafkaExporterFactory) createLogsExporter(
  167. ctx context.Context,
  168. set exporter.CreateSettings,
  169. cfg component.Config,
  170. ) (exporter.Logs, error) {
  171. oCfg := *(cfg.(*Config)) // Clone the config
  172. if oCfg.Topic == "" {
  173. oCfg.Topic = defaultLogsTopic
  174. }
  175. if oCfg.Encoding == "otlp_json" {
  176. set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
  177. }
  178. exp, err := newLogsExporter(oCfg, set, f.logsMarshalers)
  179. if err != nil {
  180. return nil, err
  181. }
  182. return exporterhelper.NewLogsExporter(
  183. ctx,
  184. set,
  185. &oCfg,
  186. exp.logsDataPusher,
  187. exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
  188. // Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
  189. // and will rely on the sarama Producer Timeout logic.
  190. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
  191. exporterhelper.WithRetry(oCfg.RetrySettings),
  192. exporterhelper.WithQueue(oCfg.QueueSettings),
  193. exporterhelper.WithShutdown(exp.Close))
  194. }