// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package sapmreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmreceiver" import ( "bytes" "compress/gzip" "context" "errors" "fmt" "io" "net/http" "sync" "github.com/gorilla/mux" splunksapm "github.com/signalfx/sapm-proto/gen" "github.com/signalfx/sapm-proto/sapmprotocol" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" ) var gzipWriterPool = &sync.Pool{ New: func() any { return gzip.NewWriter(io.Discard) }, } // sapmReceiver receives spans in the Splunk SAPM format over HTTP type sapmReceiver struct { settings component.TelemetrySettings config *Config server *http.Server shutdownWG sync.WaitGroup nextConsumer consumer.Traces // defaultResponse is a placeholder. For now this receiver returns an empty sapm response. // This defaultResponse is an optimization so we don't have to proto.Marshal the response // for every request. At some point this may be removed when there is actual content to return. defaultResponse []byte obsrecv *receiverhelper.ObsReport } // handleRequest parses an http request containing sapm and passes the trace data to the next consumer func (sr *sapmReceiver) handleRequest(req *http.Request) error { sapm, err := sapmprotocol.ParseTraceV2Request(req) // errors processing the request should return http.StatusBadRequest if err != nil { return err } ctx := sr.obsrecv.StartTracesOp(req.Context()) td, err := jaeger.ProtoToTraces(sapm.Batches) if err != nil { return err } if sr.config.AccessTokenPassthrough { if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" { rSpans := td.ResourceSpans() for i := 0; i < rSpans.Len(); i++ { rSpan := rSpans.At(i) attrs := rSpan.Resource().Attributes() attrs.PutStr(splunk.SFxAccessTokenLabel, accessToken) } } } // pass the trace data to the next consumer err = sr.nextConsumer.ConsumeTraces(ctx, td) if err != nil { err = fmt.Errorf("error passing trace data to next consumer: %w", err) } sr.obsrecv.EndTracesOp(ctx, "protobuf", td.SpanCount(), err) return err } // HTTPHandlerFunc returns an http.HandlerFunc that handles SAPM requests func (sr *sapmReceiver) HTTPHandlerFunc(rw http.ResponseWriter, req *http.Request) { // handle the request payload err := sr.handleRequest(req) if err != nil { // TODO account for this error (throttled logging or metrics) rw.WriteHeader(http.StatusBadRequest) return } // respBytes are bytes to write to the http.Response // build the response message // NOTE currently the response is an empty struct. As an optimization this receiver will pass a // byte array that was generated in the receiver's constructor. If this receiver needs to return // more than an empty struct, then the sapm.PostSpansResponse{} struct will need to be marshaled // and on error a http.StatusInternalServerError should be written to the http.ResponseWriter and // this function should immediately return. var respBytes = sr.defaultResponse rw.Header().Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue) // write the response if client does not accept gzip encoding if req.Header.Get(sapmprotocol.AcceptEncodingHeaderName) != sapmprotocol.GZipEncodingHeaderValue { // write the response bytes _, err = rw.Write(respBytes) if err != nil { rw.WriteHeader(http.StatusBadRequest) } return } // gzip the response // get the gzip writer writer := gzipWriterPool.Get().(*gzip.Writer) defer gzipWriterPool.Put(writer) var gzipBuffer bytes.Buffer // reset the writer with the gzip buffer writer.Reset(&gzipBuffer) // gzip the responseBytes _, err = writer.Write(respBytes) if err != nil { rw.WriteHeader(http.StatusInternalServerError) return } // close the gzip writer and write gzip footer err = writer.Close() if err != nil { rw.WriteHeader(http.StatusInternalServerError) return } // write the successfully gzipped payload rw.Header().Set(sapmprotocol.ContentEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue) _, err = rw.Write(gzipBuffer.Bytes()) if err != nil { rw.WriteHeader(http.StatusBadRequest) } } // Start starts the sapmReceiver's server. func (sr *sapmReceiver) Start(_ context.Context, host component.Host) error { // server.Handler will be nil on initial call, otherwise noop. if sr.server != nil && sr.server.Handler != nil { return nil } // set up the listener ln, err := sr.config.HTTPServerSettings.ToListener() if err != nil { return fmt.Errorf("failed to bind to address %s: %w", sr.config.Endpoint, err) } // use gorilla mux to create a router/handler nr := mux.NewRouter() nr.HandleFunc(sapmprotocol.TraceEndpointV2, sr.HTTPHandlerFunc) // create a server with the handler sr.server, err = sr.config.HTTPServerSettings.ToServer(host, sr.settings, nr) if err != nil { return err } sr.shutdownWG.Add(1) // run the server on a routine go func() { defer sr.shutdownWG.Done() if errHTTP := sr.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil { host.ReportFatalError(errHTTP) } }() return nil } // Shutdown stops the the sapmReceiver's server. func (sr *sapmReceiver) Shutdown(context.Context) error { if sr.server == nil { return nil } err := sr.server.Close() sr.shutdownWG.Wait() return err } // this validates at compile time that sapmReceiver implements the receiver.Traces interface var _ receiver.Traces = (*sapmReceiver)(nil) // newReceiver creates a sapmReceiver that receives SAPM over http func newReceiver( params receiver.CreateSettings, config *Config, nextConsumer consumer.Traces, ) (receiver.Traces, error) { // build the response message defaultResponse := &splunksapm.PostSpansResponse{} defaultResponseBytes, err := defaultResponse.Marshal() if err != nil { return nil, fmt.Errorf("failed to marshal default response body for %v receiver: %w", params.ID, err) } transport := "http" if config.TLSSetting != nil { transport = "https" } obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: params.ID, Transport: transport, ReceiverCreateSettings: params, }) if err != nil { return nil, err } return &sapmReceiver{ settings: params.TelemetrySettings, config: config, nextConsumer: nextConsumer, defaultResponse: defaultResponseBytes, obsrecv: obsrecv, }, nil }