receiver.go 10 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package signalfxreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver"
  4. import (
  5. "compress/gzip"
  6. "context"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net/http"
  12. "sync"
  13. "time"
  14. "unsafe"
  15. "github.com/gorilla/mux"
  16. sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
  17. "go.opencensus.io/trace"
  18. "go.opentelemetry.io/collector/component"
  19. "go.opentelemetry.io/collector/consumer"
  20. "go.opentelemetry.io/collector/pdata/plog"
  21. "go.opentelemetry.io/collector/receiver"
  22. "go.opentelemetry.io/collector/receiver/receiverhelper"
  23. conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
  24. "go.uber.org/zap"
  25. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
  26. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx"
  27. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver/internal/metadata"
  28. )
  29. const (
  30. defaultServerTimeout = 20 * time.Second
  31. responseOK = "OK"
  32. responseInvalidMethod = "Only \"POST\" method is supported"
  33. responseInvalidContentType = "\"Content-Type\" must be \"application/x-protobuf\""
  34. responseInvalidEncoding = "\"Content-Encoding\" must be \"gzip\" or empty"
  35. responseErrGzipReader = "Error on gzip body"
  36. responseErrReadBody = "Failed to read message body"
  37. responseErrUnmarshalBody = "Failed to unmarshal message body"
  38. responseErrNextConsumer = "Internal Server Error"
  39. responseErrLogsNotConfigured = "Log pipeline has not been configured to handle events"
  40. responseErrMetricsNotConfigured = "Metric pipeline has not been configured to handle datapoints"
  41. // Centralizing some HTTP and related string constants.
  42. protobufContentType = "application/x-protobuf"
  43. gzipEncoding = "gzip"
  44. httpContentTypeHeader = "Content-Type"
  45. httpContentEncodingHeader = "Content-Encoding"
  46. )
  47. var (
  48. okRespBody = initJSONResponse(responseOK)
  49. invalidMethodRespBody = initJSONResponse(responseInvalidMethod)
  50. invalidContentRespBody = initJSONResponse(responseInvalidContentType)
  51. invalidEncodingRespBody = initJSONResponse(responseInvalidEncoding)
  52. errGzipReaderRespBody = initJSONResponse(responseErrGzipReader)
  53. errReadBodyRespBody = initJSONResponse(responseErrReadBody)
  54. errUnmarshalBodyRespBody = initJSONResponse(responseErrUnmarshalBody)
  55. errNextConsumerRespBody = initJSONResponse(responseErrNextConsumer)
  56. errLogsNotConfigured = initJSONResponse(responseErrLogsNotConfigured)
  57. errMetricsNotConfigured = initJSONResponse(responseErrMetricsNotConfigured)
  58. translator = &signalfx.ToTranslator{}
  59. )
  60. // sfxReceiver implements the receiver.Metrics for SignalFx metric protocol.
  61. type sfxReceiver struct {
  62. settings receiver.CreateSettings
  63. config *Config
  64. metricsConsumer consumer.Metrics
  65. logsConsumer consumer.Logs
  66. server *http.Server
  67. shutdownWG sync.WaitGroup
  68. obsrecv *receiverhelper.ObsReport
  69. }
  70. var _ receiver.Metrics = (*sfxReceiver)(nil)
  71. // New creates the SignalFx receiver with the given configuration.
  72. func newReceiver(
  73. settings receiver.CreateSettings,
  74. config Config,
  75. ) (*sfxReceiver, error) {
  76. transport := "http"
  77. if config.TLSSetting != nil {
  78. transport = "https"
  79. }
  80. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  81. ReceiverID: settings.ID,
  82. Transport: transport,
  83. ReceiverCreateSettings: settings,
  84. })
  85. if err != nil {
  86. return nil, err
  87. }
  88. r := &sfxReceiver{
  89. settings: settings,
  90. config: &config,
  91. obsrecv: obsrecv,
  92. }
  93. return r, nil
  94. }
  95. func (r *sfxReceiver) RegisterMetricsConsumer(mc consumer.Metrics) {
  96. r.metricsConsumer = mc
  97. }
  98. func (r *sfxReceiver) RegisterLogsConsumer(lc consumer.Logs) {
  99. r.logsConsumer = lc
  100. }
  101. // Start tells the receiver to start its processing.
  102. // By convention the consumer of the received data is set when the receiver
  103. // instance is created.
  104. func (r *sfxReceiver) Start(_ context.Context, host component.Host) error {
  105. if r.metricsConsumer == nil && r.logsConsumer == nil {
  106. return component.ErrNilNextConsumer
  107. }
  108. if r.server != nil {
  109. return nil
  110. }
  111. // set up the listener
  112. ln, err := r.config.HTTPServerSettings.ToListener()
  113. if err != nil {
  114. return fmt.Errorf("failed to bind to address %s: %w", r.config.Endpoint, err)
  115. }
  116. mx := mux.NewRouter()
  117. mx.HandleFunc("/v2/datapoint", r.handleDatapointReq)
  118. mx.HandleFunc("/v2/event", r.handleEventReq)
  119. r.server, err = r.config.HTTPServerSettings.ToServer(host, r.settings.TelemetrySettings, mx)
  120. if err != nil {
  121. return err
  122. }
  123. // TODO: Evaluate what properties should be configurable, for now
  124. // set some hard-coded values.
  125. r.server.ReadHeaderTimeout = defaultServerTimeout
  126. r.server.WriteTimeout = defaultServerTimeout
  127. r.shutdownWG.Add(1)
  128. go func() {
  129. defer r.shutdownWG.Done()
  130. if errHTTP := r.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
  131. host.ReportFatalError(errHTTP)
  132. }
  133. }()
  134. return nil
  135. }
  136. // Shutdown tells the receiver that should stop reception,
  137. // giving it a chance to perform any necessary clean-up.
  138. func (r *sfxReceiver) Shutdown(context.Context) error {
  139. if r.server == nil {
  140. return nil
  141. }
  142. err := r.server.Close()
  143. r.shutdownWG.Wait()
  144. return err
  145. }
  146. func (r *sfxReceiver) readBody(ctx context.Context, resp http.ResponseWriter, req *http.Request) ([]byte, bool) {
  147. if req.Method != http.MethodPost {
  148. r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil)
  149. return nil, false
  150. }
  151. if req.Header.Get(httpContentTypeHeader) != protobufContentType {
  152. r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidContentRespBody, nil)
  153. return nil, false
  154. }
  155. encoding := req.Header.Get(httpContentEncodingHeader)
  156. if encoding != "" && encoding != gzipEncoding {
  157. r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, nil)
  158. return nil, false
  159. }
  160. bodyReader := req.Body
  161. if encoding == gzipEncoding {
  162. var err error
  163. bodyReader, err = gzip.NewReader(bodyReader)
  164. if err != nil {
  165. r.failRequest(ctx, resp, http.StatusBadRequest, errGzipReaderRespBody, err)
  166. return nil, false
  167. }
  168. }
  169. body, err := io.ReadAll(bodyReader)
  170. if err != nil {
  171. r.failRequest(ctx, resp, http.StatusBadRequest, errReadBodyRespBody, err)
  172. return nil, false
  173. }
  174. return body, true
  175. }
  176. func (r *sfxReceiver) writeResponse(ctx context.Context, resp http.ResponseWriter, err error) {
  177. if err != nil {
  178. r.failRequest(ctx, resp, http.StatusInternalServerError, errNextConsumerRespBody, err)
  179. return
  180. }
  181. resp.WriteHeader(http.StatusOK)
  182. _, err = resp.Write(okRespBody)
  183. if err != nil {
  184. r.failRequest(ctx, resp, http.StatusInternalServerError, errNextConsumerRespBody, err)
  185. }
  186. }
  187. func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Request) {
  188. ctx := r.obsrecv.StartMetricsOp(req.Context())
  189. if r.metricsConsumer == nil {
  190. r.failRequest(ctx, resp, http.StatusBadRequest, errMetricsNotConfigured, nil)
  191. return
  192. }
  193. body, ok := r.readBody(ctx, resp, req)
  194. if !ok {
  195. return
  196. }
  197. msg := &sfxpb.DataPointUploadMessage{}
  198. if err := msg.Unmarshal(body); err != nil {
  199. r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
  200. return
  201. }
  202. if len(msg.Datapoints) == 0 {
  203. r.obsrecv.EndMetricsOp(ctx, metadata.Type, 0, nil)
  204. _, _ = resp.Write(okRespBody)
  205. return
  206. }
  207. md, err := translator.ToMetrics(msg.Datapoints)
  208. if err != nil {
  209. r.settings.Logger.Debug("SignalFx conversion error", zap.Error(err))
  210. }
  211. if r.config.AccessTokenPassthrough {
  212. if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
  213. for i := 0; i < md.ResourceMetrics().Len(); i++ {
  214. rm := md.ResourceMetrics().At(i)
  215. res := rm.Resource()
  216. res.Attributes().PutStr(splunk.SFxAccessTokenLabel, accessToken)
  217. }
  218. }
  219. }
  220. err = r.metricsConsumer.ConsumeMetrics(ctx, md)
  221. r.obsrecv.EndMetricsOp(ctx, metadata.Type, len(msg.Datapoints), err)
  222. r.writeResponse(ctx, resp, err)
  223. }
  224. func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request) {
  225. ctx := r.obsrecv.StartMetricsOp(req.Context())
  226. if r.logsConsumer == nil {
  227. r.failRequest(ctx, resp, http.StatusBadRequest, errLogsNotConfigured, nil)
  228. return
  229. }
  230. body, ok := r.readBody(ctx, resp, req)
  231. if !ok {
  232. return
  233. }
  234. msg := &sfxpb.EventUploadMessage{}
  235. if err := msg.Unmarshal(body); err != nil {
  236. r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
  237. return
  238. }
  239. if len(msg.Events) == 0 {
  240. r.obsrecv.EndMetricsOp(ctx, metadata.Type, 0, nil)
  241. _, _ = resp.Write(okRespBody)
  242. return
  243. }
  244. ld := plog.NewLogs()
  245. rl := ld.ResourceLogs().AppendEmpty()
  246. sl := rl.ScopeLogs().AppendEmpty()
  247. signalFxV2EventsToLogRecords(msg.Events, sl.LogRecords())
  248. if r.config.AccessTokenPassthrough {
  249. if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
  250. rl.Resource().Attributes().PutStr(splunk.SFxAccessTokenLabel, accessToken)
  251. }
  252. }
  253. err := r.logsConsumer.ConsumeLogs(ctx, ld)
  254. r.obsrecv.EndMetricsOp(
  255. ctx,
  256. metadata.Type,
  257. len(msg.Events),
  258. err)
  259. r.writeResponse(ctx, resp, err)
  260. }
  261. func (r *sfxReceiver) failRequest(
  262. ctx context.Context,
  263. resp http.ResponseWriter,
  264. httpStatusCode int,
  265. jsonResponse []byte,
  266. err error,
  267. ) {
  268. resp.WriteHeader(httpStatusCode)
  269. if len(jsonResponse) > 0 {
  270. // The response needs to be written as a JSON string.
  271. _, writeErr := resp.Write(jsonResponse)
  272. if writeErr != nil {
  273. r.settings.Logger.Warn(
  274. "Error writing HTTP response message",
  275. zap.Error(writeErr),
  276. zap.String("receiver", r.settings.ID.String()))
  277. }
  278. }
  279. // Use the same pattern as strings.Builder String().
  280. msg := *(*string)(unsafe.Pointer(&jsonResponse))
  281. reqSpan := trace.FromContext(ctx)
  282. reqSpan.AddAttributes(
  283. trace.Int64Attribute(conventions.AttributeHTTPStatusCode, int64(httpStatusCode)),
  284. trace.StringAttribute("http.status_text", msg))
  285. traceStatus := trace.Status{
  286. Code: trace.StatusCodeInvalidArgument,
  287. }
  288. if httpStatusCode == http.StatusInternalServerError {
  289. traceStatus.Code = trace.StatusCodeInternal
  290. }
  291. if err != nil {
  292. traceStatus.Message = err.Error()
  293. }
  294. reqSpan.SetStatus(traceStatus)
  295. reqSpan.End()
  296. r.settings.Logger.Debug(
  297. "SignalFx receiver request failed",
  298. zap.Int("http_status_code", httpStatusCode),
  299. zap.String("msg", msg),
  300. zap.Error(err), // It handles nil error
  301. )
  302. }
  303. func initJSONResponse(s string) []byte {
  304. respBody, err := json.Marshal(s)
  305. if err != nil {
  306. // This is to be used in initialization so panic here is fine.
  307. panic(err)
  308. }
  309. return respBody
  310. }