123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package sapmreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmreceiver"
- import (
- "bytes"
- "compress/gzip"
- "context"
- "errors"
- "fmt"
- "io"
- "net/http"
- "sync"
- "github.com/gorilla/mux"
- splunksapm "github.com/signalfx/sapm-proto/gen"
- "github.com/signalfx/sapm-proto/sapmprotocol"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/receiverhelper"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
- )
- var gzipWriterPool = &sync.Pool{
- New: func() any {
- return gzip.NewWriter(io.Discard)
- },
- }
- // sapmReceiver receives spans in the Splunk SAPM format over HTTP
- type sapmReceiver struct {
- settings component.TelemetrySettings
- config *Config
- server *http.Server
- shutdownWG sync.WaitGroup
- nextConsumer consumer.Traces
- // defaultResponse is a placeholder. For now this receiver returns an empty sapm response.
- // This defaultResponse is an optimization so we don't have to proto.Marshal the response
- // for every request. At some point this may be removed when there is actual content to return.
- defaultResponse []byte
- obsrecv *receiverhelper.ObsReport
- }
- // handleRequest parses an http request containing sapm and passes the trace data to the next consumer
- func (sr *sapmReceiver) handleRequest(req *http.Request) error {
- sapm, err := sapmprotocol.ParseTraceV2Request(req)
- // errors processing the request should return http.StatusBadRequest
- if err != nil {
- return err
- }
- ctx := sr.obsrecv.StartTracesOp(req.Context())
- td, err := jaeger.ProtoToTraces(sapm.Batches)
- if err != nil {
- return err
- }
- if sr.config.AccessTokenPassthrough {
- if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
- rSpans := td.ResourceSpans()
- for i := 0; i < rSpans.Len(); i++ {
- rSpan := rSpans.At(i)
- attrs := rSpan.Resource().Attributes()
- attrs.PutStr(splunk.SFxAccessTokenLabel, accessToken)
- }
- }
- }
- // pass the trace data to the next consumer
- err = sr.nextConsumer.ConsumeTraces(ctx, td)
- if err != nil {
- err = fmt.Errorf("error passing trace data to next consumer: %w", err)
- }
- sr.obsrecv.EndTracesOp(ctx, "protobuf", td.SpanCount(), err)
- return err
- }
- // HTTPHandlerFunc returns an http.HandlerFunc that handles SAPM requests
- func (sr *sapmReceiver) HTTPHandlerFunc(rw http.ResponseWriter, req *http.Request) {
- // handle the request payload
- err := sr.handleRequest(req)
- if err != nil {
- // TODO account for this error (throttled logging or metrics)
- rw.WriteHeader(http.StatusBadRequest)
- return
- }
- // respBytes are bytes to write to the http.Response
- // build the response message
- // NOTE currently the response is an empty struct. As an optimization this receiver will pass a
- // byte array that was generated in the receiver's constructor. If this receiver needs to return
- // more than an empty struct, then the sapm.PostSpansResponse{} struct will need to be marshaled
- // and on error a http.StatusInternalServerError should be written to the http.ResponseWriter and
- // this function should immediately return.
- var respBytes = sr.defaultResponse
- rw.Header().Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue)
- // write the response if client does not accept gzip encoding
- if req.Header.Get(sapmprotocol.AcceptEncodingHeaderName) != sapmprotocol.GZipEncodingHeaderValue {
- // write the response bytes
- _, err = rw.Write(respBytes)
- if err != nil {
- rw.WriteHeader(http.StatusBadRequest)
- }
- return
- }
- // gzip the response
- // get the gzip writer
- writer := gzipWriterPool.Get().(*gzip.Writer)
- defer gzipWriterPool.Put(writer)
- var gzipBuffer bytes.Buffer
- // reset the writer with the gzip buffer
- writer.Reset(&gzipBuffer)
- // gzip the responseBytes
- _, err = writer.Write(respBytes)
- if err != nil {
- rw.WriteHeader(http.StatusInternalServerError)
- return
- }
- // close the gzip writer and write gzip footer
- err = writer.Close()
- if err != nil {
- rw.WriteHeader(http.StatusInternalServerError)
- return
- }
- // write the successfully gzipped payload
- rw.Header().Set(sapmprotocol.ContentEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue)
- _, err = rw.Write(gzipBuffer.Bytes())
- if err != nil {
- rw.WriteHeader(http.StatusBadRequest)
- }
- }
- // Start starts the sapmReceiver's server.
- func (sr *sapmReceiver) Start(_ context.Context, host component.Host) error {
- // server.Handler will be nil on initial call, otherwise noop.
- if sr.server != nil && sr.server.Handler != nil {
- return nil
- }
- // set up the listener
- ln, err := sr.config.HTTPServerSettings.ToListener()
- if err != nil {
- return fmt.Errorf("failed to bind to address %s: %w", sr.config.Endpoint, err)
- }
- // use gorilla mux to create a router/handler
- nr := mux.NewRouter()
- nr.HandleFunc(sapmprotocol.TraceEndpointV2, sr.HTTPHandlerFunc)
- // create a server with the handler
- sr.server, err = sr.config.HTTPServerSettings.ToServer(host, sr.settings, nr)
- if err != nil {
- return err
- }
- sr.shutdownWG.Add(1)
- // run the server on a routine
- go func() {
- defer sr.shutdownWG.Done()
- if errHTTP := sr.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
- host.ReportFatalError(errHTTP)
- }
- }()
- return nil
- }
- // Shutdown stops the the sapmReceiver's server.
- func (sr *sapmReceiver) Shutdown(context.Context) error {
- if sr.server == nil {
- return nil
- }
- err := sr.server.Close()
- sr.shutdownWG.Wait()
- return err
- }
- // this validates at compile time that sapmReceiver implements the receiver.Traces interface
- var _ receiver.Traces = (*sapmReceiver)(nil)
- // newReceiver creates a sapmReceiver that receives SAPM over http
- func newReceiver(
- params receiver.CreateSettings,
- config *Config,
- nextConsumer consumer.Traces,
- ) (receiver.Traces, error) {
- // build the response message
- defaultResponse := &splunksapm.PostSpansResponse{}
- defaultResponseBytes, err := defaultResponse.Marshal()
- if err != nil {
- return nil, fmt.Errorf("failed to marshal default response body for %v receiver: %w", params.ID, err)
- }
- transport := "http"
- if config.TLSSetting != nil {
- transport = "https"
- }
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
- ReceiverID: params.ID,
- Transport: transport,
- ReceiverCreateSettings: params,
- })
- if err != nil {
- return nil, err
- }
- return &sapmReceiver{
- settings: params.TelemetrySettings,
- config: config,
- nextConsumer: nextConsumer,
- defaultResponse: defaultResponseBytes,
- obsrecv: obsrecv,
- }, nil
- }
|