exporter.go 9.7 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"
  4. import (
  5. "bytes"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "math"
  11. "net/http"
  12. "net/url"
  13. "strings"
  14. "sync"
  15. "github.com/cenkalti/backoff/v4"
  16. "github.com/gogo/protobuf/proto"
  17. "github.com/golang/snappy"
  18. "github.com/prometheus/prometheus/prompb"
  19. "go.opentelemetry.io/collector/component"
  20. "go.opentelemetry.io/collector/config/confighttp"
  21. "go.opentelemetry.io/collector/consumer/consumererror"
  22. "go.opentelemetry.io/collector/exporter"
  23. "go.opentelemetry.io/collector/exporter/exporterhelper"
  24. "go.opentelemetry.io/collector/pdata/pmetric"
  25. "go.uber.org/multierr"
  26. prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
  27. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"
  28. )
  29. // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
  30. type prwExporter struct {
  31. endpointURL *url.URL
  32. client *http.Client
  33. wg *sync.WaitGroup
  34. closeChan chan struct{}
  35. concurrency int
  36. userAgentHeader string
  37. maxBatchSizeBytes int
  38. clientSettings *confighttp.HTTPClientSettings
  39. settings component.TelemetrySettings
  40. retrySettings exporterhelper.RetrySettings
  41. wal *prweWAL
  42. exporterSettings prometheusremotewrite.Settings
  43. }
  44. // newPRWExporter initializes a new prwExporter instance and sets fields accordingly.
  45. func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, error) {
  46. sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg)
  47. if err != nil {
  48. return nil, err
  49. }
  50. endpointURL, err := url.ParseRequestURI(cfg.HTTPClientSettings.Endpoint)
  51. if err != nil {
  52. return nil, errors.New("invalid endpoint")
  53. }
  54. userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)
  55. prwe := &prwExporter{
  56. endpointURL: endpointURL,
  57. wg: new(sync.WaitGroup),
  58. closeChan: make(chan struct{}),
  59. userAgentHeader: userAgentHeader,
  60. maxBatchSizeBytes: cfg.MaxBatchSizeBytes,
  61. concurrency: cfg.RemoteWriteQueue.NumConsumers,
  62. clientSettings: &cfg.HTTPClientSettings,
  63. settings: set.TelemetrySettings,
  64. retrySettings: cfg.RetrySettings,
  65. exporterSettings: prometheusremotewrite.Settings{
  66. Namespace: cfg.Namespace,
  67. ExternalLabels: sanitizedLabels,
  68. DisableTargetInfo: !cfg.TargetInfo.Enabled,
  69. ExportCreatedMetric: cfg.CreatedMetric.Enabled,
  70. AddMetricSuffixes: cfg.AddMetricSuffixes,
  71. SendMetadata: cfg.SendMetadata,
  72. },
  73. }
  74. if cfg.WAL == nil {
  75. return prwe, nil
  76. }
  77. prwe.wal, err = newWAL(cfg.WAL, prwe.export)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return prwe, nil
  82. }
  83. // Start creates the prometheus client
  84. func (prwe *prwExporter) Start(ctx context.Context, host component.Host) (err error) {
  85. prwe.client, err = prwe.clientSettings.ToClient(host, prwe.settings)
  86. if err != nil {
  87. return err
  88. }
  89. return prwe.turnOnWALIfEnabled(contextWithLogger(ctx, prwe.settings.Logger.Named("prw.wal")))
  90. }
  91. func (prwe *prwExporter) shutdownWALIfEnabled() error {
  92. if !prwe.walEnabled() {
  93. return nil
  94. }
  95. return prwe.wal.stop()
  96. }
  97. // Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
  98. // to finish before returning
  99. func (prwe *prwExporter) Shutdown(context.Context) error {
  100. select {
  101. case <-prwe.closeChan:
  102. default:
  103. close(prwe.closeChan)
  104. }
  105. err := prwe.shutdownWALIfEnabled()
  106. prwe.wg.Wait()
  107. return err
  108. }
  109. // PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
  110. // TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
  111. // exports the map.
  112. func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) error {
  113. prwe.wg.Add(1)
  114. defer prwe.wg.Done()
  115. select {
  116. case <-prwe.closeChan:
  117. return errors.New("shutdown has been called")
  118. default:
  119. tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
  120. if err != nil {
  121. err = consumererror.NewPermanent(err)
  122. }
  123. var m []*prompb.MetricMetadata
  124. if prwe.exporterSettings.SendMetadata {
  125. m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
  126. }
  127. // Call export even if a conversion error, since there may be points that were successfully converted.
  128. return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m))
  129. }
  130. }
  131. func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
  132. sanitizedLabels := make(map[string]string)
  133. for key, value := range cfg.ExternalLabels {
  134. if key == "" || value == "" {
  135. return nil, fmt.Errorf("prometheus remote write: external labels configuration contains an empty key or value")
  136. }
  137. sanitizedLabels[prometheustranslator.NormalizeLabel(key)] = value
  138. }
  139. return sanitizedLabels, nil
  140. }
  141. func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error {
  142. // There are no metrics to export, so return.
  143. if len(tsMap) == 0 {
  144. return nil
  145. }
  146. // Calls the helper function to convert and batch the TsMap to the desired format
  147. requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m)
  148. if err != nil {
  149. return err
  150. }
  151. if !prwe.walEnabled() {
  152. // Perform a direct export otherwise.
  153. return prwe.export(ctx, requests)
  154. }
  155. // Otherwise the WAL is enabled, and just persist the requests to the WAL
  156. // and they'll be exported in another goroutine to the RemoteWrite endpoint.
  157. if err = prwe.wal.persistToWAL(requests); err != nil {
  158. return consumererror.NewPermanent(err)
  159. }
  160. return nil
  161. }
  162. // export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
  163. func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteRequest) error {
  164. input := make(chan *prompb.WriteRequest, len(requests))
  165. for _, request := range requests {
  166. input <- request
  167. }
  168. close(input)
  169. var wg sync.WaitGroup
  170. concurrencyLimit := int(math.Min(float64(prwe.concurrency), float64(len(requests))))
  171. wg.Add(concurrencyLimit) // used to wait for workers to be finished
  172. var mu sync.Mutex
  173. var errs error
  174. // Run concurrencyLimit of workers until there
  175. // is no more requests to execute in the input channel.
  176. for i := 0; i < concurrencyLimit; i++ {
  177. go func() {
  178. defer wg.Done()
  179. for {
  180. select {
  181. case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled.
  182. return
  183. case request, ok := <-input:
  184. if !ok {
  185. return
  186. }
  187. if errExecute := prwe.execute(ctx, request); errExecute != nil {
  188. mu.Lock()
  189. errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
  190. mu.Unlock()
  191. }
  192. }
  193. }
  194. }()
  195. }
  196. wg.Wait()
  197. return errs
  198. }
  199. func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
  200. // executeFunc can be used for backoff and non backoff scenarios.
  201. executeFunc := func() error {
  202. // Uses proto.Marshal to convert the WriteRequest into bytes array
  203. data, err := proto.Marshal(writeReq)
  204. if err != nil {
  205. return backoff.Permanent(consumererror.NewPermanent(err))
  206. }
  207. buf := make([]byte, len(data), cap(data))
  208. compressedData := snappy.Encode(buf, data)
  209. // Create the HTTP POST request to send to the endpoint
  210. req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
  211. if err != nil {
  212. return backoff.Permanent(consumererror.NewPermanent(err))
  213. }
  214. // Add necessary headers specified by:
  215. // https://cortexmetrics.io/docs/apis/#remote-api
  216. req.Header.Add("Content-Encoding", "snappy")
  217. req.Header.Set("Content-Type", "application/x-protobuf")
  218. req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
  219. req.Header.Set("User-Agent", prwe.userAgentHeader)
  220. resp, err := prwe.client.Do(req)
  221. if err != nil {
  222. return err
  223. }
  224. defer resp.Body.Close()
  225. // 2xx status code is considered a success
  226. // 5xx errors are recoverable and the exporter should retry
  227. // Reference for different behavior according to status code:
  228. // https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186
  229. if resp.StatusCode >= 200 && resp.StatusCode < 300 {
  230. return nil
  231. }
  232. body, err := io.ReadAll(io.LimitReader(resp.Body, 256))
  233. rerr := fmt.Errorf("remote write returned HTTP status %v; err = %w: %s", resp.Status, err, body)
  234. if resp.StatusCode >= 500 && resp.StatusCode < 600 {
  235. return rerr
  236. }
  237. return backoff.Permanent(consumererror.NewPermanent(rerr))
  238. }
  239. var err error
  240. if prwe.retrySettings.Enabled {
  241. // Use the BackOff instance to retry the func with exponential backoff.
  242. err = backoff.Retry(executeFunc, &backoff.ExponentialBackOff{
  243. InitialInterval: prwe.retrySettings.InitialInterval,
  244. RandomizationFactor: prwe.retrySettings.RandomizationFactor,
  245. Multiplier: prwe.retrySettings.Multiplier,
  246. MaxInterval: prwe.retrySettings.MaxInterval,
  247. MaxElapsedTime: prwe.retrySettings.MaxElapsedTime,
  248. Stop: backoff.Stop,
  249. Clock: backoff.SystemClock,
  250. })
  251. } else {
  252. err = executeFunc()
  253. }
  254. if err != nil {
  255. return consumererror.NewPermanent(err)
  256. }
  257. return err
  258. }
  259. func (prwe *prwExporter) walEnabled() bool { return prwe.wal != nil }
  260. func (prwe *prwExporter) turnOnWALIfEnabled(ctx context.Context) error {
  261. if !prwe.walEnabled() {
  262. return nil
  263. }
  264. cancelCtx, cancel := context.WithCancel(ctx)
  265. go func() {
  266. <-prwe.closeChan
  267. cancel()
  268. }()
  269. return prwe.wal.run(cancelCtx)
  270. }