receiver.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package influxdbreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/influxdata/influxdb-observability/common"
  12. "github.com/influxdata/influxdb-observability/influx2otel"
  13. "github.com/influxdata/line-protocol/v2/lineprotocol"
  14. "go.opentelemetry.io/collector/component"
  15. "go.opentelemetry.io/collector/config/confighttp"
  16. "go.opentelemetry.io/collector/consumer"
  17. "go.opentelemetry.io/collector/consumer/consumererror"
  18. "go.opentelemetry.io/collector/receiver"
  19. "go.opentelemetry.io/collector/receiver/receiverhelper"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
  21. )
  22. type metricsReceiver struct {
  23. nextConsumer consumer.Metrics
  24. httpServerSettings *confighttp.HTTPServerSettings
  25. converter *influx2otel.LineProtocolToOtelMetrics
  26. server *http.Server
  27. wg sync.WaitGroup
  28. logger common.Logger
  29. obsrecv *receiverhelper.ObsReport
  30. settings component.TelemetrySettings
  31. }
  32. func newMetricsReceiver(config *Config, settings receiver.CreateSettings, nextConsumer consumer.Metrics) (*metricsReceiver, error) {
  33. influxLogger := newZapInfluxLogger(settings.TelemetrySettings.Logger)
  34. converter, err := influx2otel.NewLineProtocolToOtelMetrics(influxLogger)
  35. if err != nil {
  36. return nil, err
  37. }
  38. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  39. ReceiverID: settings.ID,
  40. Transport: "http",
  41. ReceiverCreateSettings: settings,
  42. })
  43. if err != nil {
  44. return nil, err
  45. }
  46. return &metricsReceiver{
  47. nextConsumer: nextConsumer,
  48. httpServerSettings: &config.HTTPServerSettings,
  49. converter: converter,
  50. logger: influxLogger,
  51. obsrecv: obsrecv,
  52. settings: settings.TelemetrySettings,
  53. }, err
  54. }
  55. func (r *metricsReceiver) Start(_ context.Context, host component.Host) error {
  56. ln, err := r.httpServerSettings.ToListener()
  57. if err != nil {
  58. return fmt.Errorf("failed to bind to address %s: %w", r.httpServerSettings.Endpoint, err)
  59. }
  60. router := http.NewServeMux()
  61. router.HandleFunc("/write", r.handleWrite) // InfluxDB 1.x
  62. router.HandleFunc("/api/v2/write", r.handleWrite) // InfluxDB 2.x
  63. r.wg.Add(1)
  64. r.server, err = r.httpServerSettings.ToServer(host, r.settings, router)
  65. if err != nil {
  66. return err
  67. }
  68. go func() {
  69. defer r.wg.Done()
  70. if errHTTP := r.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
  71. host.ReportFatalError(errHTTP)
  72. }
  73. }()
  74. return nil
  75. }
  76. func (r *metricsReceiver) Shutdown(_ context.Context) error {
  77. if r.server == nil {
  78. return nil
  79. }
  80. if err := r.server.Close(); err != nil {
  81. return err
  82. }
  83. r.wg.Wait()
  84. return nil
  85. }
  86. const (
  87. defaultPrecision = lineprotocol.Nanosecond
  88. dataFormat = "influxdb"
  89. )
  90. var precisions = map[string]lineprotocol.Precision{
  91. "ns": lineprotocol.Nanosecond,
  92. "n": lineprotocol.Nanosecond,
  93. "µs": lineprotocol.Microsecond,
  94. "µ": lineprotocol.Microsecond,
  95. "us": lineprotocol.Microsecond,
  96. "u": lineprotocol.Microsecond,
  97. "ms": lineprotocol.Millisecond,
  98. "s": lineprotocol.Second,
  99. }
  100. func (r *metricsReceiver) handleWrite(w http.ResponseWriter, req *http.Request) {
  101. defer func() {
  102. _ = req.Body.Close()
  103. }()
  104. precision := defaultPrecision
  105. if precisionStr := req.URL.Query().Get("precision"); precisionStr != "" {
  106. var ok bool
  107. if precision, ok = precisions[precisionStr]; !ok {
  108. w.WriteHeader(http.StatusBadRequest)
  109. _, _ = fmt.Fprintf(w, "unrecognized precision '%s'", sanitize.String(precisionStr))
  110. return
  111. }
  112. }
  113. batch := r.converter.NewBatch()
  114. lpDecoder := lineprotocol.NewDecoder(req.Body)
  115. ctx := r.obsrecv.StartMetricsOp(req.Context())
  116. var k, vTag []byte
  117. var vField lineprotocol.Value
  118. for line := 0; lpDecoder.Next(); line++ {
  119. measurement, err := lpDecoder.Measurement()
  120. if err != nil {
  121. w.WriteHeader(http.StatusBadRequest)
  122. _, _ = fmt.Fprintf(w, "failed to parse measurement on line %d", line)
  123. return
  124. }
  125. tags := make(map[string]string)
  126. for k, vTag, err = lpDecoder.NextTag(); k != nil && err == nil; k, vTag, err = lpDecoder.NextTag() {
  127. tags[string(k)] = string(vTag)
  128. }
  129. if err != nil {
  130. w.WriteHeader(http.StatusBadRequest)
  131. _, _ = fmt.Fprintf(w, "failed to parse tag on line %d", line)
  132. return
  133. }
  134. fields := make(map[string]any)
  135. for k, vField, err = lpDecoder.NextField(); k != nil && err == nil; k, vField, err = lpDecoder.NextField() {
  136. fields[string(k)] = vField.Interface()
  137. }
  138. if err != nil {
  139. w.WriteHeader(http.StatusBadRequest)
  140. _, _ = fmt.Fprintf(w, "failed to parse field on line %d", line)
  141. return
  142. }
  143. ts, err := lpDecoder.Time(precision, time.Time{})
  144. if err != nil {
  145. w.WriteHeader(http.StatusBadRequest)
  146. _, _ = fmt.Fprintf(w, "failed to parse timestamp on line %d", line)
  147. return
  148. }
  149. if err = lpDecoder.Err(); err != nil {
  150. w.WriteHeader(http.StatusBadRequest)
  151. _, _ = fmt.Fprintf(w, "failed to parse line: %s", err.Error())
  152. return
  153. }
  154. err = batch.AddPoint(string(measurement), tags, fields, ts, common.InfluxMetricValueTypeUntyped)
  155. if err != nil {
  156. w.WriteHeader(http.StatusBadRequest)
  157. _, _ = fmt.Fprintf(w, "failed to append to the batch")
  158. return
  159. }
  160. }
  161. err := r.nextConsumer.ConsumeMetrics(req.Context(), batch.GetMetrics())
  162. r.obsrecv.EndMetricsOp(ctx, dataFormat, batch.GetMetrics().DataPointCount(), err)
  163. if err != nil {
  164. if consumererror.IsPermanent(err) {
  165. w.WriteHeader(http.StatusBadRequest)
  166. } else {
  167. w.WriteHeader(http.StatusInternalServerError)
  168. }
  169. r.logger.Debug("failed to pass metrics to next consumer: %s", err)
  170. return
  171. }
  172. w.WriteHeader(http.StatusNoContent)
  173. }