factory.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"
  4. import (
  5. "context"
  6. "errors"
  7. "time"
  8. "github.com/cenkalti/backoff/v4"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/config/confighttp"
  11. "go.opentelemetry.io/collector/config/configopaque"
  12. "go.opentelemetry.io/collector/exporter"
  13. "go.opentelemetry.io/collector/exporter/exporterhelper"
  14. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter/internal/metadata"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
  16. )
  17. // NewFactory creates a new Prometheus Remote Write exporter.
  18. func NewFactory() exporter.Factory {
  19. return exporter.NewFactory(
  20. metadata.Type,
  21. createDefaultConfig,
  22. exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability))
  23. }
  24. func createMetricsExporter(ctx context.Context, set exporter.CreateSettings,
  25. cfg component.Config) (exporter.Metrics, error) {
  26. prwCfg, ok := cfg.(*Config)
  27. if !ok {
  28. return nil, errors.New("invalid configuration")
  29. }
  30. prwe, err := newPRWExporter(prwCfg, set)
  31. if err != nil {
  32. return nil, err
  33. }
  34. // Don't allow users to configure the queue.
  35. // See https://github.com/open-telemetry/opentelemetry-collector/issues/2949.
  36. // Prometheus remote write samples needs to be in chronological
  37. // order for each timeseries. If we shard the incoming metrics
  38. // without considering this limitation, we experience
  39. // "out of order samples" errors.
  40. exporter, err := exporterhelper.NewMetricsExporter(
  41. ctx,
  42. set,
  43. cfg,
  44. prwe.PushMetrics,
  45. exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
  46. exporterhelper.WithQueue(exporterhelper.QueueSettings{
  47. Enabled: prwCfg.RemoteWriteQueue.Enabled,
  48. NumConsumers: 1,
  49. QueueSize: prwCfg.RemoteWriteQueue.QueueSize,
  50. }),
  51. exporterhelper.WithStart(prwe.Start),
  52. exporterhelper.WithShutdown(prwe.Shutdown),
  53. )
  54. if err != nil {
  55. return nil, err
  56. }
  57. return resourcetotelemetry.WrapMetricsExporter(prwCfg.ResourceToTelemetrySettings, exporter), nil
  58. }
  59. func createDefaultConfig() component.Config {
  60. return &Config{
  61. Namespace: "",
  62. ExternalLabels: map[string]string{},
  63. MaxBatchSizeBytes: 3000000,
  64. TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
  65. RetrySettings: exporterhelper.RetrySettings{
  66. Enabled: true,
  67. InitialInterval: 50 * time.Millisecond,
  68. MaxInterval: 200 * time.Millisecond,
  69. MaxElapsedTime: 1 * time.Minute,
  70. RandomizationFactor: backoff.DefaultRandomizationFactor,
  71. Multiplier: backoff.DefaultMultiplier,
  72. },
  73. AddMetricSuffixes: true,
  74. SendMetadata: false,
  75. HTTPClientSettings: confighttp.HTTPClientSettings{
  76. Endpoint: "http://some.url:9411/api/prom/push",
  77. // We almost read 0 bytes, so no need to tune ReadBufferSize.
  78. ReadBufferSize: 0,
  79. WriteBufferSize: 512 * 1024,
  80. Timeout: exporterhelper.NewDefaultTimeoutSettings().Timeout,
  81. Headers: map[string]configopaque.String{},
  82. },
  83. // TODO(jbd): Adjust the default queue size.
  84. RemoteWriteQueue: RemoteWriteQueue{
  85. Enabled: true,
  86. QueueSize: 10000,
  87. NumConsumers: 5,
  88. },
  89. TargetInfo: &TargetInfo{
  90. Enabled: true,
  91. },
  92. CreatedMetric: &CreatedMetric{
  93. Enabled: false,
  94. },
  95. }
  96. }