|
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
- import (
- "bytes"
- "encoding/binary"
- "encoding/json"
- "io"
- "mime"
- "net/http"
- "strconv"
- "strings"
- "sync"
- pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/ptrace"
- semconv "go.opentelemetry.io/collector/semconv/v1.16.0"
- "go.uber.org/multierr"
- "google.golang.org/protobuf/proto"
- )
- const (
- datadogSpanKindKey = "span.kind"
- // The datadog trace id
- //
- // Type: string
- // Requirement Level: Optional
- // Examples: '6249785623524942554'
- attributeDatadogTraceID = "datadog.trace.id"
- // The datadog span id
- //
- // Type: string
- // Requirement Level: Optional
- // Examples: '228114450199004348'
- attributeDatadogSpanID = "datadog.span.id"
- )
- func upsertHeadersAttributes(req *http.Request, attrs pcommon.Map) {
- if ddTracerVersion := req.Header.Get("Datadog-Meta-Tracer-Version"); ddTracerVersion != "" {
- attrs.PutStr(semconv.AttributeTelemetrySDKVersion, "Datadog-"+ddTracerVersion)
- }
- if ddTracerLang := req.Header.Get("Datadog-Meta-Lang"); ddTracerLang != "" {
- otelLang := ddTracerLang
- if ddTracerLang == ".NET" {
- otelLang = "dotnet"
- }
- attrs.PutStr(semconv.AttributeTelemetrySDKLanguage, otelLang)
- }
- }
- func toTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
- var traces pb.Traces
- for _, p := range payload.GetChunks() {
- traces = append(traces, p.GetSpans())
- }
- sharedAttributes := pcommon.NewMap()
- for k, v := range map[string]string{
- semconv.AttributeContainerID: payload.ContainerID,
- semconv.AttributeTelemetrySDKLanguage: payload.LanguageName,
- semconv.AttributeProcessRuntimeVersion: payload.LanguageVersion,
- semconv.AttributeDeploymentEnvironment: payload.Env,
- semconv.AttributeHostName: payload.Hostname,
- semconv.AttributeServiceVersion: payload.AppVersion,
- semconv.AttributeTelemetrySDKName: "Datadog",
- semconv.AttributeTelemetrySDKVersion: payload.TracerVersion,
- } {
- if v != "" {
- sharedAttributes.PutStr(k, v)
- }
- }
- for k, v := range payload.Tags {
- if k = translateDataDogKeyToOtel(k); v != "" {
- sharedAttributes.PutStr(k, v)
- }
- }
- upsertHeadersAttributes(req, sharedAttributes)
- // Creating a map of service spans to slices
- // since the expectation is that `service.name`
- // is added as a resource attribute in most systems
- // now instead of being a span level attribute.
- groupByService := make(map[string]ptrace.SpanSlice)
- for _, trace := range traces {
- for _, span := range trace {
- slice, exist := groupByService[span.Service]
- if !exist {
- slice = ptrace.NewSpanSlice()
- groupByService[span.Service] = slice
- }
- newSpan := slice.AppendEmpty()
- newSpan.SetTraceID(uInt64ToTraceID(0, span.TraceID))
- newSpan.SetSpanID(uInt64ToSpanID(span.SpanID))
- newSpan.SetStartTimestamp(pcommon.Timestamp(span.Start))
- newSpan.SetEndTimestamp(pcommon.Timestamp(span.Start + span.Duration))
- newSpan.SetParentSpanID(uInt64ToSpanID(span.ParentID))
- newSpan.SetName(span.Name)
- newSpan.Status().SetCode(ptrace.StatusCodeOk)
- newSpan.Attributes().PutStr("dd.span.Resource", span.Resource)
- if span.Error > 0 {
- newSpan.Status().SetCode(ptrace.StatusCodeError)
- }
- newSpan.Attributes().PutStr(attributeDatadogSpanID, strconv.FormatUint(span.SpanID, 10))
- newSpan.Attributes().PutStr(attributeDatadogTraceID, strconv.FormatUint(span.TraceID, 10))
- for k, v := range span.GetMeta() {
- if k = translateDataDogKeyToOtel(k); len(k) > 0 {
- newSpan.Attributes().PutStr(k, v)
- }
- }
- switch span.Meta[datadogSpanKindKey] {
- case "server":
- newSpan.SetKind(ptrace.SpanKindServer)
- case "client":
- newSpan.SetKind(ptrace.SpanKindClient)
- case "producer":
- newSpan.SetKind(ptrace.SpanKindProducer)
- case "consumer":
- newSpan.SetKind(ptrace.SpanKindConsumer)
- case "internal":
- newSpan.SetKind(ptrace.SpanKindInternal)
- default:
- switch span.Type {
- case "web":
- newSpan.SetKind(ptrace.SpanKindServer)
- case "http":
- newSpan.SetKind(ptrace.SpanKindClient)
- default:
- newSpan.SetKind(ptrace.SpanKindUnspecified)
- }
- }
- }
- }
- results := ptrace.NewTraces()
- for service, spans := range groupByService {
- rs := results.ResourceSpans().AppendEmpty()
- rs.SetSchemaUrl(semconv.SchemaURL)
- sharedAttributes.CopyTo(rs.Resource().Attributes())
- rs.Resource().Attributes().PutStr(semconv.AttributeServiceName, service)
- in := rs.ScopeSpans().AppendEmpty()
- in.Scope().SetName("Datadog")
- in.Scope().SetVersion(payload.TracerVersion)
- spans.CopyTo(in.Spans())
- }
- return results
- }
- func translateDataDogKeyToOtel(k string) string {
- switch strings.ToLower(k) {
- case "env":
- return semconv.AttributeDeploymentEnvironment
- case "version":
- return semconv.AttributeServiceVersion
- case "container_id":
- return semconv.AttributeContainerID
- case "container_name":
- return semconv.AttributeContainerName
- case "image_name":
- return semconv.AttributeContainerImageName
- case "image_tag":
- return semconv.AttributeContainerImageTag
- case "process_id":
- return semconv.AttributeProcessPID
- case "error.stacktrace":
- return semconv.AttributeExceptionStacktrace
- case "error.msg":
- return semconv.AttributeExceptionMessage
- default:
- return k
- }
- }
- var bufferPool = sync.Pool{
- New: func() any {
- return new(bytes.Buffer)
- },
- }
- func getBuffer() *bytes.Buffer {
- buffer := bufferPool.Get().(*bytes.Buffer)
- buffer.Reset()
- return buffer
- }
- func putBuffer(buffer *bytes.Buffer) {
- bufferPool.Put(buffer)
- }
- func handlePayload(req *http.Request) (tp []*pb.TracerPayload, err error) {
- var tracerPayloads []*pb.TracerPayload
- defer func() {
- _, errs := io.Copy(io.Discard, req.Body)
- err = multierr.Combine(err, errs, req.Body.Close())
- }()
- switch {
- case strings.HasPrefix(req.URL.Path, "/v0.7"):
- buf := getBuffer()
- defer putBuffer(buf)
- if _, err = io.Copy(buf, req.Body); err != nil {
- return nil, err
- }
- var tracerPayload pb.TracerPayload
- if _, err = tracerPayload.UnmarshalMsg(buf.Bytes()); err != nil {
- return nil, err
- }
- tracerPayloads = append(tracerPayloads, &tracerPayload)
- case strings.HasPrefix(req.URL.Path, "/v0.5"):
- buf := getBuffer()
- defer putBuffer(buf)
- if _, err = io.Copy(buf, req.Body); err != nil {
- return nil, err
- }
- var traces pb.Traces
- if err = traces.UnmarshalMsgDictionary(buf.Bytes()); err != nil {
- return nil, err
- }
- tracerPayload := &pb.TracerPayload{
- LanguageName: req.Header.Get("Datadog-Meta-Lang"),
- LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
- TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
- Chunks: traceChunksFromTraces(traces),
- }
- tracerPayloads = append(tracerPayloads, tracerPayload)
- case strings.HasPrefix(req.URL.Path, "/v0.1"):
- var spans []pb.Span
- if err = json.NewDecoder(req.Body).Decode(&spans); err != nil {
- return nil, err
- }
- tracerPayload := &pb.TracerPayload{
- LanguageName: req.Header.Get("Datadog-Meta-Lang"),
- LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
- TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
- Chunks: traceChunksFromSpans(spans),
- }
- tracerPayloads = append(tracerPayloads, tracerPayload)
- case strings.HasPrefix(req.URL.Path, "/api/v0.2"):
- buf := getBuffer()
- defer putBuffer(buf)
- if _, err = io.Copy(buf, req.Body); err != nil {
- return nil, err
- }
- var agentPayload pb.AgentPayload
- if err = proto.Unmarshal(buf.Bytes(), &agentPayload); err != nil {
- return nil, err
- }
- return agentPayload.TracerPayloads, err
- default:
- var traces pb.Traces
- if err = decodeRequest(req, &traces); err != nil {
- return nil, err
- }
- tracerPayload := &pb.TracerPayload{
- LanguageName: req.Header.Get("Datadog-Meta-Lang"),
- LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
- TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
- Chunks: traceChunksFromTraces(traces),
- }
- tracerPayloads = append(tracerPayloads, tracerPayload)
- }
- return tracerPayloads, nil
- }
- func decodeRequest(req *http.Request, dest *pb.Traces) (err error) {
- switch mediaType := getMediaType(req); mediaType {
- case "application/msgpack":
- buf := getBuffer()
- defer putBuffer(buf)
- _, err = io.Copy(buf, req.Body)
- if err != nil {
- return err
- }
- _, err = dest.UnmarshalMsg(buf.Bytes())
- return err
- case "application/json":
- fallthrough
- case "text/json":
- fallthrough
- case "":
- err = json.NewDecoder(req.Body).Decode(&dest)
- return err
- default:
- // do our best
- if err1 := json.NewDecoder(req.Body).Decode(&dest); err1 != nil {
- buf := getBuffer()
- defer putBuffer(buf)
- _, err2 := io.Copy(buf, req.Body)
- if err2 != nil {
- return err2
- }
- _, err2 = dest.UnmarshalMsg(buf.Bytes())
- return err2
- }
- return nil
- }
- }
- func traceChunksFromSpans(spans []pb.Span) []*pb.TraceChunk {
- traceChunks := []*pb.TraceChunk{}
- byID := make(map[uint64][]*pb.Span)
- for i := range spans {
- byID[spans[i].TraceID] = append(byID[spans[i].TraceID], &spans[i])
- }
- for _, t := range byID {
- traceChunks = append(traceChunks, &pb.TraceChunk{
- Priority: int32(0),
- Spans: t,
- })
- }
- return traceChunks
- }
- func traceChunksFromTraces(traces pb.Traces) []*pb.TraceChunk {
- traceChunks := make([]*pb.TraceChunk, 0, len(traces))
- for _, trace := range traces {
- traceChunks = append(traceChunks, &pb.TraceChunk{
- Priority: int32(0),
- Spans: trace,
- })
- }
- return traceChunks
- }
- func getMediaType(req *http.Request) string {
- mt, _, err := mime.ParseMediaType(req.Header.Get("Content-Type"))
- if err != nil {
- return "application/json"
- }
- return mt
- }
- func uInt64ToTraceID(high, low uint64) pcommon.TraceID {
- traceID := [16]byte{}
- binary.BigEndian.PutUint64(traceID[:8], high)
- binary.BigEndian.PutUint64(traceID[8:], low)
- return traceID
- }
- func uInt64ToSpanID(id uint64) pcommon.SpanID {
- spanID := [8]byte{}
- binary.BigEndian.PutUint64(spanID[:], id)
- return spanID
- }
|