translator.go 9.9 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
  4. import (
  5. "bytes"
  6. "encoding/binary"
  7. "encoding/json"
  8. "io"
  9. "mime"
  10. "net/http"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
  15. "go.opentelemetry.io/collector/pdata/pcommon"
  16. "go.opentelemetry.io/collector/pdata/ptrace"
  17. semconv "go.opentelemetry.io/collector/semconv/v1.16.0"
  18. "go.uber.org/multierr"
  19. "google.golang.org/protobuf/proto"
  20. )
  21. const (
  22. datadogSpanKindKey = "span.kind"
  23. // The datadog trace id
  24. //
  25. // Type: string
  26. // Requirement Level: Optional
  27. // Examples: '6249785623524942554'
  28. attributeDatadogTraceID = "datadog.trace.id"
  29. // The datadog span id
  30. //
  31. // Type: string
  32. // Requirement Level: Optional
  33. // Examples: '228114450199004348'
  34. attributeDatadogSpanID = "datadog.span.id"
  35. )
  36. func upsertHeadersAttributes(req *http.Request, attrs pcommon.Map) {
  37. if ddTracerVersion := req.Header.Get("Datadog-Meta-Tracer-Version"); ddTracerVersion != "" {
  38. attrs.PutStr(semconv.AttributeTelemetrySDKVersion, "Datadog-"+ddTracerVersion)
  39. }
  40. if ddTracerLang := req.Header.Get("Datadog-Meta-Lang"); ddTracerLang != "" {
  41. otelLang := ddTracerLang
  42. if ddTracerLang == ".NET" {
  43. otelLang = "dotnet"
  44. }
  45. attrs.PutStr(semconv.AttributeTelemetrySDKLanguage, otelLang)
  46. }
  47. }
  48. func toTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
  49. var traces pb.Traces
  50. for _, p := range payload.GetChunks() {
  51. traces = append(traces, p.GetSpans())
  52. }
  53. sharedAttributes := pcommon.NewMap()
  54. for k, v := range map[string]string{
  55. semconv.AttributeContainerID: payload.ContainerID,
  56. semconv.AttributeTelemetrySDKLanguage: payload.LanguageName,
  57. semconv.AttributeProcessRuntimeVersion: payload.LanguageVersion,
  58. semconv.AttributeDeploymentEnvironment: payload.Env,
  59. semconv.AttributeHostName: payload.Hostname,
  60. semconv.AttributeServiceVersion: payload.AppVersion,
  61. semconv.AttributeTelemetrySDKName: "Datadog",
  62. semconv.AttributeTelemetrySDKVersion: payload.TracerVersion,
  63. } {
  64. if v != "" {
  65. sharedAttributes.PutStr(k, v)
  66. }
  67. }
  68. for k, v := range payload.Tags {
  69. if k = translateDataDogKeyToOtel(k); v != "" {
  70. sharedAttributes.PutStr(k, v)
  71. }
  72. }
  73. upsertHeadersAttributes(req, sharedAttributes)
  74. // Creating a map of service spans to slices
  75. // since the expectation is that `service.name`
  76. // is added as a resource attribute in most systems
  77. // now instead of being a span level attribute.
  78. groupByService := make(map[string]ptrace.SpanSlice)
  79. for _, trace := range traces {
  80. for _, span := range trace {
  81. slice, exist := groupByService[span.Service]
  82. if !exist {
  83. slice = ptrace.NewSpanSlice()
  84. groupByService[span.Service] = slice
  85. }
  86. newSpan := slice.AppendEmpty()
  87. newSpan.SetTraceID(uInt64ToTraceID(0, span.TraceID))
  88. newSpan.SetSpanID(uInt64ToSpanID(span.SpanID))
  89. newSpan.SetStartTimestamp(pcommon.Timestamp(span.Start))
  90. newSpan.SetEndTimestamp(pcommon.Timestamp(span.Start + span.Duration))
  91. newSpan.SetParentSpanID(uInt64ToSpanID(span.ParentID))
  92. newSpan.SetName(span.Name)
  93. newSpan.Status().SetCode(ptrace.StatusCodeOk)
  94. newSpan.Attributes().PutStr("dd.span.Resource", span.Resource)
  95. if span.Error > 0 {
  96. newSpan.Status().SetCode(ptrace.StatusCodeError)
  97. }
  98. newSpan.Attributes().PutStr(attributeDatadogSpanID, strconv.FormatUint(span.SpanID, 10))
  99. newSpan.Attributes().PutStr(attributeDatadogTraceID, strconv.FormatUint(span.TraceID, 10))
  100. for k, v := range span.GetMeta() {
  101. if k = translateDataDogKeyToOtel(k); len(k) > 0 {
  102. newSpan.Attributes().PutStr(k, v)
  103. }
  104. }
  105. switch span.Meta[datadogSpanKindKey] {
  106. case "server":
  107. newSpan.SetKind(ptrace.SpanKindServer)
  108. case "client":
  109. newSpan.SetKind(ptrace.SpanKindClient)
  110. case "producer":
  111. newSpan.SetKind(ptrace.SpanKindProducer)
  112. case "consumer":
  113. newSpan.SetKind(ptrace.SpanKindConsumer)
  114. case "internal":
  115. newSpan.SetKind(ptrace.SpanKindInternal)
  116. default:
  117. switch span.Type {
  118. case "web":
  119. newSpan.SetKind(ptrace.SpanKindServer)
  120. case "http":
  121. newSpan.SetKind(ptrace.SpanKindClient)
  122. default:
  123. newSpan.SetKind(ptrace.SpanKindUnspecified)
  124. }
  125. }
  126. }
  127. }
  128. results := ptrace.NewTraces()
  129. for service, spans := range groupByService {
  130. rs := results.ResourceSpans().AppendEmpty()
  131. rs.SetSchemaUrl(semconv.SchemaURL)
  132. sharedAttributes.CopyTo(rs.Resource().Attributes())
  133. rs.Resource().Attributes().PutStr(semconv.AttributeServiceName, service)
  134. in := rs.ScopeSpans().AppendEmpty()
  135. in.Scope().SetName("Datadog")
  136. in.Scope().SetVersion(payload.TracerVersion)
  137. spans.CopyTo(in.Spans())
  138. }
  139. return results
  140. }
  141. func translateDataDogKeyToOtel(k string) string {
  142. switch strings.ToLower(k) {
  143. case "env":
  144. return semconv.AttributeDeploymentEnvironment
  145. case "version":
  146. return semconv.AttributeServiceVersion
  147. case "container_id":
  148. return semconv.AttributeContainerID
  149. case "container_name":
  150. return semconv.AttributeContainerName
  151. case "image_name":
  152. return semconv.AttributeContainerImageName
  153. case "image_tag":
  154. return semconv.AttributeContainerImageTag
  155. case "process_id":
  156. return semconv.AttributeProcessPID
  157. case "error.stacktrace":
  158. return semconv.AttributeExceptionStacktrace
  159. case "error.msg":
  160. return semconv.AttributeExceptionMessage
  161. default:
  162. return k
  163. }
  164. }
  165. var bufferPool = sync.Pool{
  166. New: func() any {
  167. return new(bytes.Buffer)
  168. },
  169. }
  170. func getBuffer() *bytes.Buffer {
  171. buffer := bufferPool.Get().(*bytes.Buffer)
  172. buffer.Reset()
  173. return buffer
  174. }
  175. func putBuffer(buffer *bytes.Buffer) {
  176. bufferPool.Put(buffer)
  177. }
  178. func handlePayload(req *http.Request) (tp []*pb.TracerPayload, err error) {
  179. var tracerPayloads []*pb.TracerPayload
  180. defer func() {
  181. _, errs := io.Copy(io.Discard, req.Body)
  182. err = multierr.Combine(err, errs, req.Body.Close())
  183. }()
  184. switch {
  185. case strings.HasPrefix(req.URL.Path, "/v0.7"):
  186. buf := getBuffer()
  187. defer putBuffer(buf)
  188. if _, err = io.Copy(buf, req.Body); err != nil {
  189. return nil, err
  190. }
  191. var tracerPayload pb.TracerPayload
  192. if _, err = tracerPayload.UnmarshalMsg(buf.Bytes()); err != nil {
  193. return nil, err
  194. }
  195. tracerPayloads = append(tracerPayloads, &tracerPayload)
  196. case strings.HasPrefix(req.URL.Path, "/v0.5"):
  197. buf := getBuffer()
  198. defer putBuffer(buf)
  199. if _, err = io.Copy(buf, req.Body); err != nil {
  200. return nil, err
  201. }
  202. var traces pb.Traces
  203. if err = traces.UnmarshalMsgDictionary(buf.Bytes()); err != nil {
  204. return nil, err
  205. }
  206. tracerPayload := &pb.TracerPayload{
  207. LanguageName: req.Header.Get("Datadog-Meta-Lang"),
  208. LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
  209. TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
  210. Chunks: traceChunksFromTraces(traces),
  211. }
  212. tracerPayloads = append(tracerPayloads, tracerPayload)
  213. case strings.HasPrefix(req.URL.Path, "/v0.1"):
  214. var spans []pb.Span
  215. if err = json.NewDecoder(req.Body).Decode(&spans); err != nil {
  216. return nil, err
  217. }
  218. tracerPayload := &pb.TracerPayload{
  219. LanguageName: req.Header.Get("Datadog-Meta-Lang"),
  220. LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
  221. TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
  222. Chunks: traceChunksFromSpans(spans),
  223. }
  224. tracerPayloads = append(tracerPayloads, tracerPayload)
  225. case strings.HasPrefix(req.URL.Path, "/api/v0.2"):
  226. buf := getBuffer()
  227. defer putBuffer(buf)
  228. if _, err = io.Copy(buf, req.Body); err != nil {
  229. return nil, err
  230. }
  231. var agentPayload pb.AgentPayload
  232. if err = proto.Unmarshal(buf.Bytes(), &agentPayload); err != nil {
  233. return nil, err
  234. }
  235. return agentPayload.TracerPayloads, err
  236. default:
  237. var traces pb.Traces
  238. if err = decodeRequest(req, &traces); err != nil {
  239. return nil, err
  240. }
  241. tracerPayload := &pb.TracerPayload{
  242. LanguageName: req.Header.Get("Datadog-Meta-Lang"),
  243. LanguageVersion: req.Header.Get("Datadog-Meta-Lang-Version"),
  244. TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
  245. Chunks: traceChunksFromTraces(traces),
  246. }
  247. tracerPayloads = append(tracerPayloads, tracerPayload)
  248. }
  249. return tracerPayloads, nil
  250. }
  251. func decodeRequest(req *http.Request, dest *pb.Traces) (err error) {
  252. switch mediaType := getMediaType(req); mediaType {
  253. case "application/msgpack":
  254. buf := getBuffer()
  255. defer putBuffer(buf)
  256. _, err = io.Copy(buf, req.Body)
  257. if err != nil {
  258. return err
  259. }
  260. _, err = dest.UnmarshalMsg(buf.Bytes())
  261. return err
  262. case "application/json":
  263. fallthrough
  264. case "text/json":
  265. fallthrough
  266. case "":
  267. err = json.NewDecoder(req.Body).Decode(&dest)
  268. return err
  269. default:
  270. // do our best
  271. if err1 := json.NewDecoder(req.Body).Decode(&dest); err1 != nil {
  272. buf := getBuffer()
  273. defer putBuffer(buf)
  274. _, err2 := io.Copy(buf, req.Body)
  275. if err2 != nil {
  276. return err2
  277. }
  278. _, err2 = dest.UnmarshalMsg(buf.Bytes())
  279. return err2
  280. }
  281. return nil
  282. }
  283. }
  284. func traceChunksFromSpans(spans []pb.Span) []*pb.TraceChunk {
  285. traceChunks := []*pb.TraceChunk{}
  286. byID := make(map[uint64][]*pb.Span)
  287. for i := range spans {
  288. byID[spans[i].TraceID] = append(byID[spans[i].TraceID], &spans[i])
  289. }
  290. for _, t := range byID {
  291. traceChunks = append(traceChunks, &pb.TraceChunk{
  292. Priority: int32(0),
  293. Spans: t,
  294. })
  295. }
  296. return traceChunks
  297. }
  298. func traceChunksFromTraces(traces pb.Traces) []*pb.TraceChunk {
  299. traceChunks := make([]*pb.TraceChunk, 0, len(traces))
  300. for _, trace := range traces {
  301. traceChunks = append(traceChunks, &pb.TraceChunk{
  302. Priority: int32(0),
  303. Spans: trace,
  304. })
  305. }
  306. return traceChunks
  307. }
  308. func getMediaType(req *http.Request) string {
  309. mt, _, err := mime.ParseMediaType(req.Header.Get("Content-Type"))
  310. if err != nil {
  311. return "application/json"
  312. }
  313. return mt
  314. }
  315. func uInt64ToTraceID(high, low uint64) pcommon.TraceID {
  316. traceID := [16]byte{}
  317. binary.BigEndian.PutUint64(traceID[:8], high)
  318. binary.BigEndian.PutUint64(traceID[8:], low)
  319. return traceID
  320. }
  321. func uInt64ToSpanID(id uint64) pcommon.SpanID {
  322. spanID := [8]byte{}
  323. binary.BigEndian.PutUint64(spanID[:], id)
  324. return spanID
  325. }