123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package signalfxreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver"
- import (
- "compress/gzip"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "sync"
- "time"
- "unsafe"
- "github.com/gorilla/mux"
- sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
- "go.opencensus.io/trace"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/receiverhelper"
- conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver/internal/metadata"
- )
- const (
- defaultServerTimeout = 20 * time.Second
- responseOK = "OK"
- responseInvalidMethod = "Only \"POST\" method is supported"
- responseInvalidContentType = "\"Content-Type\" must be \"application/x-protobuf\""
- responseInvalidEncoding = "\"Content-Encoding\" must be \"gzip\" or empty"
- responseErrGzipReader = "Error on gzip body"
- responseErrReadBody = "Failed to read message body"
- responseErrUnmarshalBody = "Failed to unmarshal message body"
- responseErrNextConsumer = "Internal Server Error"
- responseErrLogsNotConfigured = "Log pipeline has not been configured to handle events"
- responseErrMetricsNotConfigured = "Metric pipeline has not been configured to handle datapoints"
- // Centralizing some HTTP and related string constants.
- protobufContentType = "application/x-protobuf"
- gzipEncoding = "gzip"
- httpContentTypeHeader = "Content-Type"
- httpContentEncodingHeader = "Content-Encoding"
- )
- var (
- okRespBody = initJSONResponse(responseOK)
- invalidMethodRespBody = initJSONResponse(responseInvalidMethod)
- invalidContentRespBody = initJSONResponse(responseInvalidContentType)
- invalidEncodingRespBody = initJSONResponse(responseInvalidEncoding)
- errGzipReaderRespBody = initJSONResponse(responseErrGzipReader)
- errReadBodyRespBody = initJSONResponse(responseErrReadBody)
- errUnmarshalBodyRespBody = initJSONResponse(responseErrUnmarshalBody)
- errNextConsumerRespBody = initJSONResponse(responseErrNextConsumer)
- errLogsNotConfigured = initJSONResponse(responseErrLogsNotConfigured)
- errMetricsNotConfigured = initJSONResponse(responseErrMetricsNotConfigured)
- translator = &signalfx.ToTranslator{}
- )
- // sfxReceiver implements the receiver.Metrics for SignalFx metric protocol.
- type sfxReceiver struct {
- settings receiver.CreateSettings
- config *Config
- metricsConsumer consumer.Metrics
- logsConsumer consumer.Logs
- server *http.Server
- shutdownWG sync.WaitGroup
- obsrecv *receiverhelper.ObsReport
- }
- var _ receiver.Metrics = (*sfxReceiver)(nil)
- // New creates the SignalFx receiver with the given configuration.
- func newReceiver(
- settings receiver.CreateSettings,
- config Config,
- ) (*sfxReceiver, error) {
- transport := "http"
- if config.TLSSetting != nil {
- transport = "https"
- }
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
- ReceiverID: settings.ID,
- Transport: transport,
- ReceiverCreateSettings: settings,
- })
- if err != nil {
- return nil, err
- }
- r := &sfxReceiver{
- settings: settings,
- config: &config,
- obsrecv: obsrecv,
- }
- return r, nil
- }
- func (r *sfxReceiver) RegisterMetricsConsumer(mc consumer.Metrics) {
- r.metricsConsumer = mc
- }
- func (r *sfxReceiver) RegisterLogsConsumer(lc consumer.Logs) {
- r.logsConsumer = lc
- }
- // Start tells the receiver to start its processing.
- // By convention the consumer of the received data is set when the receiver
- // instance is created.
- func (r *sfxReceiver) Start(_ context.Context, host component.Host) error {
- if r.metricsConsumer == nil && r.logsConsumer == nil {
- return component.ErrNilNextConsumer
- }
- if r.server != nil {
- return nil
- }
- // set up the listener
- ln, err := r.config.HTTPServerSettings.ToListener()
- if err != nil {
- return fmt.Errorf("failed to bind to address %s: %w", r.config.Endpoint, err)
- }
- mx := mux.NewRouter()
- mx.HandleFunc("/v2/datapoint", r.handleDatapointReq)
- mx.HandleFunc("/v2/event", r.handleEventReq)
- r.server, err = r.config.HTTPServerSettings.ToServer(host, r.settings.TelemetrySettings, mx)
- if err != nil {
- return err
- }
- // TODO: Evaluate what properties should be configurable, for now
- // set some hard-coded values.
- r.server.ReadHeaderTimeout = defaultServerTimeout
- r.server.WriteTimeout = defaultServerTimeout
- r.shutdownWG.Add(1)
- go func() {
- defer r.shutdownWG.Done()
- if errHTTP := r.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
- host.ReportFatalError(errHTTP)
- }
- }()
- return nil
- }
- // Shutdown tells the receiver that should stop reception,
- // giving it a chance to perform any necessary clean-up.
- func (r *sfxReceiver) Shutdown(context.Context) error {
- if r.server == nil {
- return nil
- }
- err := r.server.Close()
- r.shutdownWG.Wait()
- return err
- }
- func (r *sfxReceiver) readBody(ctx context.Context, resp http.ResponseWriter, req *http.Request) ([]byte, bool) {
- if req.Method != http.MethodPost {
- r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil)
- return nil, false
- }
- if req.Header.Get(httpContentTypeHeader) != protobufContentType {
- r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidContentRespBody, nil)
- return nil, false
- }
- encoding := req.Header.Get(httpContentEncodingHeader)
- if encoding != "" && encoding != gzipEncoding {
- r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, nil)
- return nil, false
- }
- bodyReader := req.Body
- if encoding == gzipEncoding {
- var err error
- bodyReader, err = gzip.NewReader(bodyReader)
- if err != nil {
- r.failRequest(ctx, resp, http.StatusBadRequest, errGzipReaderRespBody, err)
- return nil, false
- }
- }
- body, err := io.ReadAll(bodyReader)
- if err != nil {
- r.failRequest(ctx, resp, http.StatusBadRequest, errReadBodyRespBody, err)
- return nil, false
- }
- return body, true
- }
- func (r *sfxReceiver) writeResponse(ctx context.Context, resp http.ResponseWriter, err error) {
- if err != nil {
- r.failRequest(ctx, resp, http.StatusInternalServerError, errNextConsumerRespBody, err)
- return
- }
- resp.WriteHeader(http.StatusOK)
- _, err = resp.Write(okRespBody)
- if err != nil {
- r.failRequest(ctx, resp, http.StatusInternalServerError, errNextConsumerRespBody, err)
- }
- }
- func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Request) {
- ctx := r.obsrecv.StartMetricsOp(req.Context())
- if r.metricsConsumer == nil {
- r.failRequest(ctx, resp, http.StatusBadRequest, errMetricsNotConfigured, nil)
- return
- }
- body, ok := r.readBody(ctx, resp, req)
- if !ok {
- return
- }
- msg := &sfxpb.DataPointUploadMessage{}
- if err := msg.Unmarshal(body); err != nil {
- r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
- return
- }
- if len(msg.Datapoints) == 0 {
- r.obsrecv.EndMetricsOp(ctx, metadata.Type, 0, nil)
- _, _ = resp.Write(okRespBody)
- return
- }
- md, err := translator.ToMetrics(msg.Datapoints)
- if err != nil {
- r.settings.Logger.Debug("SignalFx conversion error", zap.Error(err))
- }
- if r.config.AccessTokenPassthrough {
- if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
- for i := 0; i < md.ResourceMetrics().Len(); i++ {
- rm := md.ResourceMetrics().At(i)
- res := rm.Resource()
- res.Attributes().PutStr(splunk.SFxAccessTokenLabel, accessToken)
- }
- }
- }
- err = r.metricsConsumer.ConsumeMetrics(ctx, md)
- r.obsrecv.EndMetricsOp(ctx, metadata.Type, len(msg.Datapoints), err)
- r.writeResponse(ctx, resp, err)
- }
- func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request) {
- ctx := r.obsrecv.StartMetricsOp(req.Context())
- if r.logsConsumer == nil {
- r.failRequest(ctx, resp, http.StatusBadRequest, errLogsNotConfigured, nil)
- return
- }
- body, ok := r.readBody(ctx, resp, req)
- if !ok {
- return
- }
- msg := &sfxpb.EventUploadMessage{}
- if err := msg.Unmarshal(body); err != nil {
- r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
- return
- }
- if len(msg.Events) == 0 {
- r.obsrecv.EndMetricsOp(ctx, metadata.Type, 0, nil)
- _, _ = resp.Write(okRespBody)
- return
- }
- ld := plog.NewLogs()
- rl := ld.ResourceLogs().AppendEmpty()
- sl := rl.ScopeLogs().AppendEmpty()
- signalFxV2EventsToLogRecords(msg.Events, sl.LogRecords())
- if r.config.AccessTokenPassthrough {
- if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
- rl.Resource().Attributes().PutStr(splunk.SFxAccessTokenLabel, accessToken)
- }
- }
- err := r.logsConsumer.ConsumeLogs(ctx, ld)
- r.obsrecv.EndMetricsOp(
- ctx,
- metadata.Type,
- len(msg.Events),
- err)
- r.writeResponse(ctx, resp, err)
- }
- func (r *sfxReceiver) failRequest(
- ctx context.Context,
- resp http.ResponseWriter,
- httpStatusCode int,
- jsonResponse []byte,
- err error,
- ) {
- resp.WriteHeader(httpStatusCode)
- if len(jsonResponse) > 0 {
- // The response needs to be written as a JSON string.
- _, writeErr := resp.Write(jsonResponse)
- if writeErr != nil {
- r.settings.Logger.Warn(
- "Error writing HTTP response message",
- zap.Error(writeErr),
- zap.String("receiver", r.settings.ID.String()))
- }
- }
- // Use the same pattern as strings.Builder String().
- msg := *(*string)(unsafe.Pointer(&jsonResponse))
- reqSpan := trace.FromContext(ctx)
- reqSpan.AddAttributes(
- trace.Int64Attribute(conventions.AttributeHTTPStatusCode, int64(httpStatusCode)),
- trace.StringAttribute("http.status_text", msg))
- traceStatus := trace.Status{
- Code: trace.StatusCodeInvalidArgument,
- }
- if httpStatusCode == http.StatusInternalServerError {
- traceStatus.Code = trace.StatusCodeInternal
- }
- if err != nil {
- traceStatus.Message = err.Error()
- }
- reqSpan.SetStatus(traceStatus)
- reqSpan.End()
- r.settings.Logger.Debug(
- "SignalFx receiver request failed",
- zap.Int("http_status_code", httpStatusCode),
- zap.String("msg", msg),
- zap.Error(err), // It handles nil error
- )
- }
- func initJSONResponse(s string) []byte {
- respBody, err := json.Marshal(s)
- if err != nil {
- // This is to be used in initialization so panic here is fine.
- panic(err)
- }
- return respBody
- }
|