trace_receiver.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package sapmreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmreceiver"
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "context"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net/http"
  12. "sync"
  13. "github.com/gorilla/mux"
  14. splunksapm "github.com/signalfx/sapm-proto/gen"
  15. "github.com/signalfx/sapm-proto/sapmprotocol"
  16. "go.opentelemetry.io/collector/component"
  17. "go.opentelemetry.io/collector/consumer"
  18. "go.opentelemetry.io/collector/receiver"
  19. "go.opentelemetry.io/collector/receiver/receiverhelper"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
  21. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
  22. )
  23. var gzipWriterPool = &sync.Pool{
  24. New: func() any {
  25. return gzip.NewWriter(io.Discard)
  26. },
  27. }
  28. // sapmReceiver receives spans in the Splunk SAPM format over HTTP
  29. type sapmReceiver struct {
  30. settings component.TelemetrySettings
  31. config *Config
  32. server *http.Server
  33. shutdownWG sync.WaitGroup
  34. nextConsumer consumer.Traces
  35. // defaultResponse is a placeholder. For now this receiver returns an empty sapm response.
  36. // This defaultResponse is an optimization so we don't have to proto.Marshal the response
  37. // for every request. At some point this may be removed when there is actual content to return.
  38. defaultResponse []byte
  39. obsrecv *receiverhelper.ObsReport
  40. }
  41. // handleRequest parses an http request containing sapm and passes the trace data to the next consumer
  42. func (sr *sapmReceiver) handleRequest(req *http.Request) error {
  43. sapm, err := sapmprotocol.ParseTraceV2Request(req)
  44. // errors processing the request should return http.StatusBadRequest
  45. if err != nil {
  46. return err
  47. }
  48. ctx := sr.obsrecv.StartTracesOp(req.Context())
  49. td, err := jaeger.ProtoToTraces(sapm.Batches)
  50. if err != nil {
  51. return err
  52. }
  53. if sr.config.AccessTokenPassthrough {
  54. if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
  55. rSpans := td.ResourceSpans()
  56. for i := 0; i < rSpans.Len(); i++ {
  57. rSpan := rSpans.At(i)
  58. attrs := rSpan.Resource().Attributes()
  59. attrs.PutStr(splunk.SFxAccessTokenLabel, accessToken)
  60. }
  61. }
  62. }
  63. // pass the trace data to the next consumer
  64. err = sr.nextConsumer.ConsumeTraces(ctx, td)
  65. if err != nil {
  66. err = fmt.Errorf("error passing trace data to next consumer: %w", err)
  67. }
  68. sr.obsrecv.EndTracesOp(ctx, "protobuf", td.SpanCount(), err)
  69. return err
  70. }
  71. // HTTPHandlerFunc returns an http.HandlerFunc that handles SAPM requests
  72. func (sr *sapmReceiver) HTTPHandlerFunc(rw http.ResponseWriter, req *http.Request) {
  73. // handle the request payload
  74. err := sr.handleRequest(req)
  75. if err != nil {
  76. // TODO account for this error (throttled logging or metrics)
  77. rw.WriteHeader(http.StatusBadRequest)
  78. return
  79. }
  80. // respBytes are bytes to write to the http.Response
  81. // build the response message
  82. // NOTE currently the response is an empty struct. As an optimization this receiver will pass a
  83. // byte array that was generated in the receiver's constructor. If this receiver needs to return
  84. // more than an empty struct, then the sapm.PostSpansResponse{} struct will need to be marshaled
  85. // and on error a http.StatusInternalServerError should be written to the http.ResponseWriter and
  86. // this function should immediately return.
  87. var respBytes = sr.defaultResponse
  88. rw.Header().Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue)
  89. // write the response if client does not accept gzip encoding
  90. if req.Header.Get(sapmprotocol.AcceptEncodingHeaderName) != sapmprotocol.GZipEncodingHeaderValue {
  91. // write the response bytes
  92. _, err = rw.Write(respBytes)
  93. if err != nil {
  94. rw.WriteHeader(http.StatusBadRequest)
  95. }
  96. return
  97. }
  98. // gzip the response
  99. // get the gzip writer
  100. writer := gzipWriterPool.Get().(*gzip.Writer)
  101. defer gzipWriterPool.Put(writer)
  102. var gzipBuffer bytes.Buffer
  103. // reset the writer with the gzip buffer
  104. writer.Reset(&gzipBuffer)
  105. // gzip the responseBytes
  106. _, err = writer.Write(respBytes)
  107. if err != nil {
  108. rw.WriteHeader(http.StatusInternalServerError)
  109. return
  110. }
  111. // close the gzip writer and write gzip footer
  112. err = writer.Close()
  113. if err != nil {
  114. rw.WriteHeader(http.StatusInternalServerError)
  115. return
  116. }
  117. // write the successfully gzipped payload
  118. rw.Header().Set(sapmprotocol.ContentEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue)
  119. _, err = rw.Write(gzipBuffer.Bytes())
  120. if err != nil {
  121. rw.WriteHeader(http.StatusBadRequest)
  122. }
  123. }
  124. // Start starts the sapmReceiver's server.
  125. func (sr *sapmReceiver) Start(_ context.Context, host component.Host) error {
  126. // server.Handler will be nil on initial call, otherwise noop.
  127. if sr.server != nil && sr.server.Handler != nil {
  128. return nil
  129. }
  130. // set up the listener
  131. ln, err := sr.config.HTTPServerSettings.ToListener()
  132. if err != nil {
  133. return fmt.Errorf("failed to bind to address %s: %w", sr.config.Endpoint, err)
  134. }
  135. // use gorilla mux to create a router/handler
  136. nr := mux.NewRouter()
  137. nr.HandleFunc(sapmprotocol.TraceEndpointV2, sr.HTTPHandlerFunc)
  138. // create a server with the handler
  139. sr.server, err = sr.config.HTTPServerSettings.ToServer(host, sr.settings, nr)
  140. if err != nil {
  141. return err
  142. }
  143. sr.shutdownWG.Add(1)
  144. // run the server on a routine
  145. go func() {
  146. defer sr.shutdownWG.Done()
  147. if errHTTP := sr.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
  148. host.ReportFatalError(errHTTP)
  149. }
  150. }()
  151. return nil
  152. }
  153. // Shutdown stops the the sapmReceiver's server.
  154. func (sr *sapmReceiver) Shutdown(context.Context) error {
  155. if sr.server == nil {
  156. return nil
  157. }
  158. err := sr.server.Close()
  159. sr.shutdownWG.Wait()
  160. return err
  161. }
  162. // this validates at compile time that sapmReceiver implements the receiver.Traces interface
  163. var _ receiver.Traces = (*sapmReceiver)(nil)
  164. // newReceiver creates a sapmReceiver that receives SAPM over http
  165. func newReceiver(
  166. params receiver.CreateSettings,
  167. config *Config,
  168. nextConsumer consumer.Traces,
  169. ) (receiver.Traces, error) {
  170. // build the response message
  171. defaultResponse := &splunksapm.PostSpansResponse{}
  172. defaultResponseBytes, err := defaultResponse.Marshal()
  173. if err != nil {
  174. return nil, fmt.Errorf("failed to marshal default response body for %v receiver: %w", params.ID, err)
  175. }
  176. transport := "http"
  177. if config.TLSSetting != nil {
  178. transport = "https"
  179. }
  180. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  181. ReceiverID: params.ID,
  182. Transport: transport,
  183. ReceiverCreateSettings: params,
  184. })
  185. if err != nil {
  186. return nil, err
  187. }
  188. return &sapmReceiver{
  189. settings: params.TelemetrySettings,
  190. config: config,
  191. nextConsumer: nextConsumer,
  192. defaultResponse: defaultResponseBytes,
  193. obsrecv: obsrecv,
  194. }, nil
  195. }