trace_receiver.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package zipkinreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
  4. import (
  5. "compress/gzip"
  6. "compress/zlib"
  7. "context"
  8. "errors"
  9. "io"
  10. "net"
  11. "net/http"
  12. "strings"
  13. "sync"
  14. "go.opentelemetry.io/collector/component"
  15. "go.opentelemetry.io/collector/consumer"
  16. "go.opentelemetry.io/collector/consumer/consumererror"
  17. "go.opentelemetry.io/collector/pdata/ptrace"
  18. "go.opentelemetry.io/collector/receiver"
  19. "go.opentelemetry.io/collector/receiver/receiverhelper"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv1"
  21. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
  22. )
  23. const (
  24. receiverTransportV1Thrift = "http_v1_thrift"
  25. receiverTransportV1JSON = "http_v1_json"
  26. receiverTransportV2JSON = "http_v2_json"
  27. receiverTransportV2PROTO = "http_v2_proto"
  28. )
  29. var errNextConsumerRespBody = []byte(`"Internal Server Error"`)
  30. var errBadRequestRespBody = []byte(`"Bad Request"`)
  31. // zipkinReceiver type is used to handle spans received in the Zipkin format.
  32. type zipkinReceiver struct {
  33. nextConsumer consumer.Traces
  34. shutdownWG sync.WaitGroup
  35. server *http.Server
  36. config *Config
  37. v1ThriftUnmarshaler ptrace.Unmarshaler
  38. v1JSONUnmarshaler ptrace.Unmarshaler
  39. jsonUnmarshaler ptrace.Unmarshaler
  40. protobufUnmarshaler ptrace.Unmarshaler
  41. protobufDebugUnmarshaler ptrace.Unmarshaler
  42. settings receiver.CreateSettings
  43. obsrecvrs map[string]*receiverhelper.ObsReport
  44. }
  45. var _ http.Handler = (*zipkinReceiver)(nil)
  46. // newReceiver creates a new zipkinReceiver reference.
  47. func newReceiver(config *Config, nextConsumer consumer.Traces, settings receiver.CreateSettings) (*zipkinReceiver, error) {
  48. if nextConsumer == nil {
  49. return nil, component.ErrNilNextConsumer
  50. }
  51. transports := []string{receiverTransportV1Thrift, receiverTransportV1JSON, receiverTransportV2JSON, receiverTransportV2PROTO}
  52. obsrecvrs := make(map[string]*receiverhelper.ObsReport)
  53. for _, transport := range transports {
  54. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  55. ReceiverID: settings.ID,
  56. Transport: transport,
  57. ReceiverCreateSettings: settings,
  58. })
  59. if err != nil {
  60. return nil, err
  61. }
  62. obsrecvrs[transport] = obsrecv
  63. }
  64. zr := &zipkinReceiver{
  65. nextConsumer: nextConsumer,
  66. config: config,
  67. v1ThriftUnmarshaler: zipkinv1.NewThriftTracesUnmarshaler(),
  68. v1JSONUnmarshaler: zipkinv1.NewJSONTracesUnmarshaler(config.ParseStringTags),
  69. jsonUnmarshaler: zipkinv2.NewJSONTracesUnmarshaler(config.ParseStringTags),
  70. protobufUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(false, config.ParseStringTags),
  71. protobufDebugUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(true, config.ParseStringTags),
  72. settings: settings,
  73. obsrecvrs: obsrecvrs,
  74. }
  75. return zr, nil
  76. }
  77. // Start spins up the receiver's HTTP server and makes the receiver start its processing.
  78. func (zr *zipkinReceiver) Start(_ context.Context, host component.Host) error {
  79. if host == nil {
  80. return errors.New("nil host")
  81. }
  82. var err error
  83. zr.server, err = zr.config.HTTPServerSettings.ToServer(host, zr.settings.TelemetrySettings, zr)
  84. if err != nil {
  85. return err
  86. }
  87. var listener net.Listener
  88. listener, err = zr.config.HTTPServerSettings.ToListener()
  89. if err != nil {
  90. return err
  91. }
  92. zr.shutdownWG.Add(1)
  93. go func() {
  94. defer zr.shutdownWG.Done()
  95. if errHTTP := zr.server.Serve(listener); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
  96. host.ReportFatalError(errHTTP)
  97. }
  98. }()
  99. return nil
  100. }
  101. // v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans.
  102. func (zr *zipkinReceiver) v1ToTraceSpans(blob []byte, hdr http.Header) (reqs ptrace.Traces, err error) {
  103. if hdr.Get("Content-Type") == "application/x-thrift" {
  104. return zr.v1ThriftUnmarshaler.UnmarshalTraces(blob)
  105. }
  106. return zr.v1JSONUnmarshaler.UnmarshalTraces(blob)
  107. }
  108. // v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans.
  109. func (zr *zipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs ptrace.Traces, err error) {
  110. // This flag's reference is from:
  111. // https://github.com/openzipkin/zipkin-go/blob/3793c981d4f621c0e3eb1457acffa2c1cc591384/proto/v2/zipkin.proto#L154
  112. debugWasSet := hdr.Get("X-B3-Flags") == "1"
  113. // By default, we'll assume using JSON
  114. unmarshaler := zr.jsonUnmarshaler
  115. // Zipkin can send protobuf via http
  116. if hdr.Get("Content-Type") == "application/x-protobuf" {
  117. // TODO: (@odeke-em) record the unique types of Content-Type uploads
  118. if debugWasSet {
  119. unmarshaler = zr.protobufDebugUnmarshaler
  120. } else {
  121. unmarshaler = zr.protobufUnmarshaler
  122. }
  123. }
  124. return unmarshaler.UnmarshalTraces(blob)
  125. }
  126. // Shutdown tells the receiver that should stop reception,
  127. // giving it a chance to perform any necessary clean-up and shutting down
  128. // its HTTP server.
  129. func (zr *zipkinReceiver) Shutdown(context.Context) error {
  130. var err error
  131. if zr.server != nil {
  132. err = zr.server.Close()
  133. }
  134. zr.shutdownWG.Wait()
  135. return err
  136. }
  137. // processBodyIfNecessary checks the "Content-Encoding" HTTP header and if
  138. // a compression such as "gzip", "deflate", "zlib", is found, the body will
  139. // be uncompressed accordingly or return the body untouched if otherwise.
  140. // Clients such as Zipkin-Java do this behavior e.g.
  141. //
  142. // send "Content-Encoding":"gzip" of the JSON content.
  143. func processBodyIfNecessary(req *http.Request) io.Reader {
  144. switch req.Header.Get("Content-Encoding") {
  145. default:
  146. return req.Body
  147. case "gzip":
  148. return gunzippedBodyIfPossible(req.Body)
  149. case "deflate", "zlib":
  150. return zlibUncompressedbody(req.Body)
  151. }
  152. }
  153. func gunzippedBodyIfPossible(r io.Reader) io.Reader {
  154. gzr, err := gzip.NewReader(r)
  155. if err != nil {
  156. // Just return the old body as was
  157. return r
  158. }
  159. return gzr
  160. }
  161. func zlibUncompressedbody(r io.Reader) io.Reader {
  162. zr, err := zlib.NewReader(r)
  163. if err != nil {
  164. // Just return the old body as was
  165. return r
  166. }
  167. return zr
  168. }
  169. const (
  170. zipkinV1TagValue = "zipkinV1"
  171. zipkinV2TagValue = "zipkinV2"
  172. )
  173. // The zipkinReceiver receives spans from endpoint /api/v2 as JSON,
  174. // unmarshalls them and sends them along to the nextConsumer.
  175. func (zr *zipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  176. ctx := r.Context()
  177. // Now deserialize and process the spans.
  178. asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans")
  179. transportTag := transportType(r, asZipkinv1)
  180. obsrecv := zr.obsrecvrs[transportTag]
  181. ctx = obsrecv.StartTracesOp(ctx)
  182. pr := processBodyIfNecessary(r)
  183. slurp, _ := io.ReadAll(pr)
  184. if c, ok := pr.(io.Closer); ok {
  185. _ = c.Close()
  186. }
  187. _ = r.Body.Close()
  188. var td ptrace.Traces
  189. var err error
  190. if asZipkinv1 {
  191. td, err = zr.v1ToTraceSpans(slurp, r.Header)
  192. } else {
  193. td, err = zr.v2ToTraceSpans(slurp, r.Header)
  194. }
  195. if err != nil {
  196. http.Error(w, err.Error(), http.StatusBadRequest)
  197. return
  198. }
  199. consumerErr := zr.nextConsumer.ConsumeTraces(ctx, td)
  200. receiverTagValue := zipkinV2TagValue
  201. if asZipkinv1 {
  202. receiverTagValue = zipkinV1TagValue
  203. }
  204. obsrecv.EndTracesOp(ctx, receiverTagValue, td.SpanCount(), consumerErr)
  205. if consumerErr == nil {
  206. // Send back the response "Accepted" as
  207. // required at https://zipkin.io/zipkin-api/#/default/post_spans
  208. w.WriteHeader(http.StatusAccepted)
  209. return
  210. }
  211. if consumererror.IsPermanent(consumerErr) {
  212. w.WriteHeader(http.StatusBadRequest)
  213. _, _ = w.Write(errBadRequestRespBody)
  214. } else {
  215. // Transient error, due to some internal condition.
  216. w.WriteHeader(http.StatusInternalServerError)
  217. _, _ = w.Write(errNextConsumerRespBody)
  218. }
  219. }
  220. func transportType(r *http.Request, asZipkinv1 bool) string {
  221. if asZipkinv1 {
  222. if r.Header.Get("Content-Type") == "application/x-thrift" {
  223. return receiverTransportV1Thrift
  224. }
  225. return receiverTransportV1JSON
  226. }
  227. if r.Header.Get("Content-Type") == "application/x-protobuf" {
  228. return receiverTransportV2PROTO
  229. }
  230. return receiverTransportV2JSON
  231. }