123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package zipkinreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
- import (
- "compress/gzip"
- "compress/zlib"
- "context"
- "errors"
- "io"
- "net"
- "net/http"
- "strings"
- "sync"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/consumer/consumererror"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/receiverhelper"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv1"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
- )
- const (
- receiverTransportV1Thrift = "http_v1_thrift"
- receiverTransportV1JSON = "http_v1_json"
- receiverTransportV2JSON = "http_v2_json"
- receiverTransportV2PROTO = "http_v2_proto"
- )
- var errNextConsumerRespBody = []byte(`"Internal Server Error"`)
- var errBadRequestRespBody = []byte(`"Bad Request"`)
- // zipkinReceiver type is used to handle spans received in the Zipkin format.
- type zipkinReceiver struct {
- nextConsumer consumer.Traces
- shutdownWG sync.WaitGroup
- server *http.Server
- config *Config
- v1ThriftUnmarshaler ptrace.Unmarshaler
- v1JSONUnmarshaler ptrace.Unmarshaler
- jsonUnmarshaler ptrace.Unmarshaler
- protobufUnmarshaler ptrace.Unmarshaler
- protobufDebugUnmarshaler ptrace.Unmarshaler
- settings receiver.CreateSettings
- obsrecvrs map[string]*receiverhelper.ObsReport
- }
- var _ http.Handler = (*zipkinReceiver)(nil)
- // newReceiver creates a new zipkinReceiver reference.
- func newReceiver(config *Config, nextConsumer consumer.Traces, settings receiver.CreateSettings) (*zipkinReceiver, error) {
- if nextConsumer == nil {
- return nil, component.ErrNilNextConsumer
- }
- transports := []string{receiverTransportV1Thrift, receiverTransportV1JSON, receiverTransportV2JSON, receiverTransportV2PROTO}
- obsrecvrs := make(map[string]*receiverhelper.ObsReport)
- for _, transport := range transports {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
- ReceiverID: settings.ID,
- Transport: transport,
- ReceiverCreateSettings: settings,
- })
- if err != nil {
- return nil, err
- }
- obsrecvrs[transport] = obsrecv
- }
- zr := &zipkinReceiver{
- nextConsumer: nextConsumer,
- config: config,
- v1ThriftUnmarshaler: zipkinv1.NewThriftTracesUnmarshaler(),
- v1JSONUnmarshaler: zipkinv1.NewJSONTracesUnmarshaler(config.ParseStringTags),
- jsonUnmarshaler: zipkinv2.NewJSONTracesUnmarshaler(config.ParseStringTags),
- protobufUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(false, config.ParseStringTags),
- protobufDebugUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(true, config.ParseStringTags),
- settings: settings,
- obsrecvrs: obsrecvrs,
- }
- return zr, nil
- }
- // Start spins up the receiver's HTTP server and makes the receiver start its processing.
- func (zr *zipkinReceiver) Start(_ context.Context, host component.Host) error {
- if host == nil {
- return errors.New("nil host")
- }
- var err error
- zr.server, err = zr.config.HTTPServerSettings.ToServer(host, zr.settings.TelemetrySettings, zr)
- if err != nil {
- return err
- }
- var listener net.Listener
- listener, err = zr.config.HTTPServerSettings.ToListener()
- if err != nil {
- return err
- }
- zr.shutdownWG.Add(1)
- go func() {
- defer zr.shutdownWG.Done()
- if errHTTP := zr.server.Serve(listener); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
- host.ReportFatalError(errHTTP)
- }
- }()
- return nil
- }
- // v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans.
- func (zr *zipkinReceiver) v1ToTraceSpans(blob []byte, hdr http.Header) (reqs ptrace.Traces, err error) {
- if hdr.Get("Content-Type") == "application/x-thrift" {
- return zr.v1ThriftUnmarshaler.UnmarshalTraces(blob)
- }
- return zr.v1JSONUnmarshaler.UnmarshalTraces(blob)
- }
- // v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans.
- func (zr *zipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs ptrace.Traces, err error) {
- // This flag's reference is from:
- // https://github.com/openzipkin/zipkin-go/blob/3793c981d4f621c0e3eb1457acffa2c1cc591384/proto/v2/zipkin.proto#L154
- debugWasSet := hdr.Get("X-B3-Flags") == "1"
- // By default, we'll assume using JSON
- unmarshaler := zr.jsonUnmarshaler
- // Zipkin can send protobuf via http
- if hdr.Get("Content-Type") == "application/x-protobuf" {
- // TODO: (@odeke-em) record the unique types of Content-Type uploads
- if debugWasSet {
- unmarshaler = zr.protobufDebugUnmarshaler
- } else {
- unmarshaler = zr.protobufUnmarshaler
- }
- }
- return unmarshaler.UnmarshalTraces(blob)
- }
- // Shutdown tells the receiver that should stop reception,
- // giving it a chance to perform any necessary clean-up and shutting down
- // its HTTP server.
- func (zr *zipkinReceiver) Shutdown(context.Context) error {
- var err error
- if zr.server != nil {
- err = zr.server.Close()
- }
- zr.shutdownWG.Wait()
- return err
- }
- // processBodyIfNecessary checks the "Content-Encoding" HTTP header and if
- // a compression such as "gzip", "deflate", "zlib", is found, the body will
- // be uncompressed accordingly or return the body untouched if otherwise.
- // Clients such as Zipkin-Java do this behavior e.g.
- //
- // send "Content-Encoding":"gzip" of the JSON content.
- func processBodyIfNecessary(req *http.Request) io.Reader {
- switch req.Header.Get("Content-Encoding") {
- default:
- return req.Body
- case "gzip":
- return gunzippedBodyIfPossible(req.Body)
- case "deflate", "zlib":
- return zlibUncompressedbody(req.Body)
- }
- }
- func gunzippedBodyIfPossible(r io.Reader) io.Reader {
- gzr, err := gzip.NewReader(r)
- if err != nil {
- // Just return the old body as was
- return r
- }
- return gzr
- }
- func zlibUncompressedbody(r io.Reader) io.Reader {
- zr, err := zlib.NewReader(r)
- if err != nil {
- // Just return the old body as was
- return r
- }
- return zr
- }
- const (
- zipkinV1TagValue = "zipkinV1"
- zipkinV2TagValue = "zipkinV2"
- )
- // The zipkinReceiver receives spans from endpoint /api/v2 as JSON,
- // unmarshalls them and sends them along to the nextConsumer.
- func (zr *zipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- ctx := r.Context()
- // Now deserialize and process the spans.
- asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans")
- transportTag := transportType(r, asZipkinv1)
- obsrecv := zr.obsrecvrs[transportTag]
- ctx = obsrecv.StartTracesOp(ctx)
- pr := processBodyIfNecessary(r)
- slurp, _ := io.ReadAll(pr)
- if c, ok := pr.(io.Closer); ok {
- _ = c.Close()
- }
- _ = r.Body.Close()
- var td ptrace.Traces
- var err error
- if asZipkinv1 {
- td, err = zr.v1ToTraceSpans(slurp, r.Header)
- } else {
- td, err = zr.v2ToTraceSpans(slurp, r.Header)
- }
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- consumerErr := zr.nextConsumer.ConsumeTraces(ctx, td)
- receiverTagValue := zipkinV2TagValue
- if asZipkinv1 {
- receiverTagValue = zipkinV1TagValue
- }
- obsrecv.EndTracesOp(ctx, receiverTagValue, td.SpanCount(), consumerErr)
- if consumerErr == nil {
- // Send back the response "Accepted" as
- // required at https://zipkin.io/zipkin-api/#/default/post_spans
- w.WriteHeader(http.StatusAccepted)
- return
- }
- if consumererror.IsPermanent(consumerErr) {
- w.WriteHeader(http.StatusBadRequest)
- _, _ = w.Write(errBadRequestRespBody)
- } else {
- // Transient error, due to some internal condition.
- w.WriteHeader(http.StatusInternalServerError)
- _, _ = w.Write(errNextConsumerRespBody)
- }
- }
- func transportType(r *http.Request, asZipkinv1 bool) string {
- if asZipkinv1 {
- if r.Header.Get("Content-Type") == "application/x-thrift" {
- return receiverTransportV1Thrift
- }
- return receiverTransportV1JSON
- }
- if r.Header.Get("Content-Type") == "application/x-protobuf" {
- return receiverTransportV2PROTO
- }
- return receiverTransportV2JSON
- }
|