jaegerproto_to_traces.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package jaeger // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
  4. import (
  5. "encoding/base64"
  6. "encoding/binary"
  7. "fmt"
  8. "hash/fnv"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "github.com/jaegertracing/jaeger/model"
  13. "go.opentelemetry.io/collector/pdata/pcommon"
  14. "go.opentelemetry.io/collector/pdata/ptrace"
  15. conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/idutils"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/occonventions"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/tracetranslator"
  19. )
  20. var blankJaegerProtoSpan = new(model.Span)
  21. // ProtoToTraces converts multiple Jaeger proto batches to internal traces
  22. func ProtoToTraces(batches []*model.Batch) (ptrace.Traces, error) {
  23. traceData := ptrace.NewTraces()
  24. if len(batches) == 0 {
  25. return traceData, nil
  26. }
  27. batches = regroup(batches)
  28. rss := traceData.ResourceSpans()
  29. rss.EnsureCapacity(len(batches))
  30. for _, batch := range batches {
  31. if batch.GetProcess() == nil && len(batch.GetSpans()) == 0 {
  32. continue
  33. }
  34. protoBatchToResourceSpans(*batch, rss.AppendEmpty())
  35. }
  36. return traceData, nil
  37. }
  38. func regroup(batches []*model.Batch) []*model.Batch {
  39. // Re-group batches
  40. // This is needed as there might be a Process within Batch and Span at the same
  41. // time, with the span one taking precedence.
  42. // As we only have it at one level in OpenTelemetry, ResourceSpans, we split
  43. // each batch into potentially multiple other batches, with the sum of their
  44. // processes as the key to a map.
  45. // Step 1) iterate over the batches
  46. // Step 2) for each batch, calculate the batch's process checksum and store
  47. // it on a map, with the checksum as the key and the process as the value
  48. // Step 3) iterate the spans for a batch: if a given span has its own process,
  49. // calculate the checksum for the process and store it on the same map
  50. // Step 4) each entry on the map becomes a ResourceSpan
  51. registry := map[uint64]*model.Batch{}
  52. for _, batch := range batches {
  53. bb := batchForProcess(registry, batch.Process)
  54. for _, span := range batch.Spans {
  55. if span.Process == nil {
  56. bb.Spans = append(bb.Spans, span)
  57. } else {
  58. b := batchForProcess(registry, span.Process)
  59. b.Spans = append(b.Spans, span)
  60. }
  61. }
  62. }
  63. result := make([]*model.Batch, 0, len(registry))
  64. for _, v := range registry {
  65. result = append(result, v)
  66. }
  67. return result
  68. }
  69. func batchForProcess(registry map[uint64]*model.Batch, p *model.Process) *model.Batch {
  70. sum := checksum(p)
  71. batch := registry[sum]
  72. if batch == nil {
  73. batch = &model.Batch{
  74. Process: p,
  75. }
  76. registry[sum] = batch
  77. }
  78. return batch
  79. }
  80. func checksum(process *model.Process) uint64 {
  81. // this will get all the keys and values, plus service name, into this buffer
  82. // this is potentially dangerous, as a batch/span with a big enough processes
  83. // might cause the collector to allocate this extra big information
  84. // for this reason, we hash it as an integer and return it, instead of keeping
  85. // all the hashes for all the processes for all batches in memory
  86. fnvHash := fnv.New64a()
  87. if process != nil {
  88. // this effectively means that all spans from batches with nil processes
  89. // will be grouped together
  90. // this should only ever happen in unit tests
  91. // this implementation never returns an error according to the Hash interface
  92. _ = process.Hash(fnvHash)
  93. }
  94. out := make([]byte, 0, 16)
  95. out = fnvHash.Sum(out)
  96. return binary.BigEndian.Uint64(out)
  97. }
  98. func protoBatchToResourceSpans(batch model.Batch, dest ptrace.ResourceSpans) {
  99. jSpans := batch.GetSpans()
  100. jProcessToInternalResource(batch.GetProcess(), dest.Resource())
  101. if len(jSpans) == 0 {
  102. return
  103. }
  104. jSpansToInternal(jSpans, dest.ScopeSpans())
  105. }
  106. func jProcessToInternalResource(process *model.Process, dest pcommon.Resource) {
  107. if process == nil || process.ServiceName == tracetranslator.ResourceNoServiceName {
  108. return
  109. }
  110. serviceName := process.ServiceName
  111. tags := process.Tags
  112. if serviceName == "" && tags == nil {
  113. return
  114. }
  115. attrs := dest.Attributes()
  116. if serviceName != "" {
  117. attrs.EnsureCapacity(len(tags) + 1)
  118. attrs.PutStr(conventions.AttributeServiceName, serviceName)
  119. } else {
  120. attrs.EnsureCapacity(len(tags))
  121. }
  122. jTagsToInternalAttributes(tags, attrs)
  123. // Handle special keys translations.
  124. translateHostnameAttr(attrs)
  125. translateJaegerVersionAttr(attrs)
  126. }
  127. // translateHostnameAttr translates "hostname" atttribute
  128. func translateHostnameAttr(attrs pcommon.Map) {
  129. hostname, hostnameFound := attrs.Get("hostname")
  130. _, convHostNameFound := attrs.Get(conventions.AttributeHostName)
  131. if hostnameFound && !convHostNameFound {
  132. hostname.CopyTo(attrs.PutEmpty(conventions.AttributeHostName))
  133. attrs.Remove("hostname")
  134. }
  135. }
  136. // translateHostnameAttr translates "jaeger.version" atttribute
  137. func translateJaegerVersionAttr(attrs pcommon.Map) {
  138. jaegerVersion, jaegerVersionFound := attrs.Get("jaeger.version")
  139. _, exporterVersionFound := attrs.Get(occonventions.AttributeExporterVersion)
  140. if jaegerVersionFound && !exporterVersionFound {
  141. attrs.PutStr(occonventions.AttributeExporterVersion, "Jaeger-"+jaegerVersion.Str())
  142. attrs.Remove("jaeger.version")
  143. }
  144. }
  145. type scope struct {
  146. name, version string
  147. }
  148. func jSpansToInternal(spans []*model.Span, dest ptrace.ScopeSpansSlice) {
  149. spansByLibrary := make(map[scope]ptrace.SpanSlice)
  150. for _, span := range spans {
  151. if span == nil || reflect.DeepEqual(span, blankJaegerProtoSpan) {
  152. continue
  153. }
  154. il := getScope(span)
  155. sps, found := spansByLibrary[il]
  156. if !found {
  157. ss := dest.AppendEmpty()
  158. ss.Scope().SetName(il.name)
  159. ss.Scope().SetVersion(il.version)
  160. sps = ss.Spans()
  161. spansByLibrary[il] = sps
  162. }
  163. jSpanToInternal(span, sps.AppendEmpty())
  164. }
  165. }
  166. func jSpanToInternal(span *model.Span, dest ptrace.Span) {
  167. dest.SetTraceID(idutils.UInt64ToTraceID(span.TraceID.High, span.TraceID.Low))
  168. dest.SetSpanID(idutils.UInt64ToSpanID(uint64(span.SpanID)))
  169. dest.SetName(span.OperationName)
  170. dest.SetStartTimestamp(pcommon.NewTimestampFromTime(span.StartTime))
  171. dest.SetEndTimestamp(pcommon.NewTimestampFromTime(span.StartTime.Add(span.Duration)))
  172. parentSpanID := span.ParentSpanID()
  173. if parentSpanID != model.SpanID(0) {
  174. dest.SetParentSpanID(idutils.UInt64ToSpanID(uint64(parentSpanID)))
  175. }
  176. attrs := dest.Attributes()
  177. attrs.EnsureCapacity(len(span.Tags))
  178. jTagsToInternalAttributes(span.Tags, attrs)
  179. if spanKindAttr, ok := attrs.Get(tracetranslator.TagSpanKind); ok {
  180. dest.SetKind(jSpanKindToInternal(spanKindAttr.Str()))
  181. attrs.Remove(tracetranslator.TagSpanKind)
  182. }
  183. setInternalSpanStatus(attrs, dest)
  184. dest.TraceState().FromRaw(getTraceStateFromAttrs(attrs))
  185. // drop the attributes slice if all of them were replaced during translation
  186. if attrs.Len() == 0 {
  187. attrs.Clear()
  188. }
  189. jLogsToSpanEvents(span.Logs, dest.Events())
  190. jReferencesToSpanLinks(span.References, parentSpanID, dest.Links())
  191. }
  192. func jTagsToInternalAttributes(tags []model.KeyValue, dest pcommon.Map) {
  193. for _, tag := range tags {
  194. switch tag.GetVType() {
  195. case model.ValueType_STRING:
  196. dest.PutStr(tag.Key, tag.GetVStr())
  197. case model.ValueType_BOOL:
  198. dest.PutBool(tag.Key, tag.GetVBool())
  199. case model.ValueType_INT64:
  200. dest.PutInt(tag.Key, tag.GetVInt64())
  201. case model.ValueType_FLOAT64:
  202. dest.PutDouble(tag.Key, tag.GetVFloat64())
  203. case model.ValueType_BINARY:
  204. dest.PutStr(tag.Key, base64.StdEncoding.EncodeToString(tag.GetVBinary()))
  205. default:
  206. dest.PutStr(tag.Key, fmt.Sprintf("<Unknown Jaeger TagType %q>", tag.GetVType()))
  207. }
  208. }
  209. }
  210. func setInternalSpanStatus(attrs pcommon.Map, span ptrace.Span) {
  211. dest := span.Status()
  212. statusCode := ptrace.StatusCodeUnset
  213. statusMessage := ""
  214. statusExists := false
  215. if errorVal, ok := attrs.Get(tracetranslator.TagError); ok && errorVal.Type() == pcommon.ValueTypeBool {
  216. if errorVal.Bool() {
  217. statusCode = ptrace.StatusCodeError
  218. attrs.Remove(tracetranslator.TagError)
  219. statusExists = true
  220. if desc, ok := extractStatusDescFromAttr(attrs); ok {
  221. statusMessage = desc
  222. } else if descAttr, ok := attrs.Get(tracetranslator.TagHTTPStatusMsg); ok {
  223. statusMessage = descAttr.Str()
  224. }
  225. }
  226. }
  227. if codeAttr, ok := attrs.Get(conventions.OtelStatusCode); ok {
  228. if !statusExists {
  229. // The error tag is the ultimate truth for a Jaeger spans' error
  230. // status. Only parse the otel.status_code tag if the error tag is
  231. // not set to true.
  232. statusExists = true
  233. switch strings.ToUpper(codeAttr.Str()) {
  234. case statusOk:
  235. statusCode = ptrace.StatusCodeOk
  236. case statusError:
  237. statusCode = ptrace.StatusCodeError
  238. }
  239. if desc, ok := extractStatusDescFromAttr(attrs); ok {
  240. statusMessage = desc
  241. }
  242. }
  243. // Regardless of error tag value, remove the otel.status_code tag. The
  244. // otel.status_message tag will have already been removed if
  245. // statusExists is true.
  246. attrs.Remove(conventions.OtelStatusCode)
  247. } else if httpCodeAttr, ok := attrs.Get(conventions.AttributeHTTPStatusCode); !statusExists && ok {
  248. // Fallback to introspecting if this span represents a failed HTTP
  249. // request or response, but again, only do so if the `error` tag was
  250. // not set to true and no explicit status was sent.
  251. if code, err := getStatusCodeFromHTTPStatusAttr(httpCodeAttr, span.Kind()); err == nil {
  252. if code != ptrace.StatusCodeUnset {
  253. statusExists = true
  254. statusCode = code
  255. }
  256. if msgAttr, ok := attrs.Get(tracetranslator.TagHTTPStatusMsg); ok {
  257. statusMessage = msgAttr.Str()
  258. }
  259. }
  260. }
  261. if statusExists {
  262. dest.SetCode(statusCode)
  263. dest.SetMessage(statusMessage)
  264. }
  265. }
  266. // extractStatusDescFromAttr returns the OTel status description from attrs
  267. // along with true if it is set. Otherwise, an empty string and false are
  268. // returned. The OTel status description attribute is deleted from attrs in
  269. // the process.
  270. func extractStatusDescFromAttr(attrs pcommon.Map) (string, bool) {
  271. if msgAttr, ok := attrs.Get(conventions.OtelStatusDescription); ok {
  272. msg := msgAttr.Str()
  273. attrs.Remove(conventions.OtelStatusDescription)
  274. return msg, true
  275. }
  276. return "", false
  277. }
  278. // codeFromAttr returns the integer code value from attrVal. An error is
  279. // returned if the code is not represented by an integer or string value in
  280. // the attrVal or the value is outside the bounds of an int representation.
  281. func codeFromAttr(attrVal pcommon.Value) (int64, error) {
  282. var val int64
  283. switch attrVal.Type() {
  284. case pcommon.ValueTypeInt:
  285. val = attrVal.Int()
  286. case pcommon.ValueTypeStr:
  287. var err error
  288. val, err = strconv.ParseInt(attrVal.Str(), 10, 0)
  289. if err != nil {
  290. return 0, err
  291. }
  292. default:
  293. return 0, fmt.Errorf("%w: %s", errType, attrVal.Type().String())
  294. }
  295. return val, nil
  296. }
  297. func getStatusCodeFromHTTPStatusAttr(attrVal pcommon.Value, kind ptrace.SpanKind) (ptrace.StatusCode, error) {
  298. statusCode, err := codeFromAttr(attrVal)
  299. if err != nil {
  300. return ptrace.StatusCodeUnset, err
  301. }
  302. // For HTTP status codes in the 4xx range span status MUST be left unset
  303. // in case of SpanKind.SERVER and MUST be set to Error in case of SpanKind.CLIENT.
  304. // For HTTP status codes in the 5xx range, as well as any other code the client
  305. // failed to interpret, span status MUST be set to Error.
  306. if statusCode >= 400 && statusCode < 500 {
  307. switch kind {
  308. case ptrace.SpanKindClient:
  309. return ptrace.StatusCodeError, nil
  310. case ptrace.SpanKindServer:
  311. return ptrace.StatusCodeUnset, nil
  312. }
  313. }
  314. return tracetranslator.StatusCodeFromHTTP(statusCode), nil
  315. }
  316. func jSpanKindToInternal(spanKind string) ptrace.SpanKind {
  317. switch spanKind {
  318. case "client":
  319. return ptrace.SpanKindClient
  320. case "server":
  321. return ptrace.SpanKindServer
  322. case "producer":
  323. return ptrace.SpanKindProducer
  324. case "consumer":
  325. return ptrace.SpanKindConsumer
  326. case "internal":
  327. return ptrace.SpanKindInternal
  328. }
  329. return ptrace.SpanKindUnspecified
  330. }
  331. func jLogsToSpanEvents(logs []model.Log, dest ptrace.SpanEventSlice) {
  332. if len(logs) == 0 {
  333. return
  334. }
  335. dest.EnsureCapacity(len(logs))
  336. for i, log := range logs {
  337. var event ptrace.SpanEvent
  338. if dest.Len() > i {
  339. event = dest.At(i)
  340. } else {
  341. event = dest.AppendEmpty()
  342. }
  343. event.SetTimestamp(pcommon.NewTimestampFromTime(log.Timestamp))
  344. if len(log.Fields) == 0 {
  345. continue
  346. }
  347. attrs := event.Attributes()
  348. attrs.EnsureCapacity(len(log.Fields))
  349. jTagsToInternalAttributes(log.Fields, attrs)
  350. if name, ok := attrs.Get(eventNameAttr); ok {
  351. event.SetName(name.Str())
  352. attrs.Remove(eventNameAttr)
  353. }
  354. }
  355. }
  356. // jReferencesToSpanLinks sets internal span links based on jaeger span references skipping excludeParentID
  357. func jReferencesToSpanLinks(refs []model.SpanRef, excludeParentID model.SpanID, dest ptrace.SpanLinkSlice) {
  358. if len(refs) == 0 || len(refs) == 1 && refs[0].SpanID == excludeParentID && refs[0].RefType == model.ChildOf {
  359. return
  360. }
  361. dest.EnsureCapacity(len(refs))
  362. for _, ref := range refs {
  363. if ref.SpanID == excludeParentID && ref.RefType == model.ChildOf {
  364. continue
  365. }
  366. link := dest.AppendEmpty()
  367. link.SetTraceID(idutils.UInt64ToTraceID(ref.TraceID.High, ref.TraceID.Low))
  368. link.SetSpanID(idutils.UInt64ToSpanID(uint64(ref.SpanID)))
  369. link.Attributes().PutStr(conventions.AttributeOpentracingRefType, jRefTypeToAttribute(ref.RefType))
  370. }
  371. }
  372. func getTraceStateFromAttrs(attrs pcommon.Map) string {
  373. traceState := ""
  374. // TODO Bring this inline with solution for jaegertracing/jaeger-client-java #702 once available
  375. if attr, ok := attrs.Get(tracetranslator.TagW3CTraceState); ok {
  376. traceState = attr.Str()
  377. attrs.Remove(tracetranslator.TagW3CTraceState)
  378. }
  379. return traceState
  380. }
  381. func getScope(span *model.Span) scope {
  382. il := scope{}
  383. if libraryName, ok := getAndDeleteTag(span, conventions.OtelLibraryName); ok {
  384. il.name = libraryName
  385. if libraryVersion, ok := getAndDeleteTag(span, conventions.OtelLibraryVersion); ok {
  386. il.version = libraryVersion
  387. }
  388. }
  389. return il
  390. }
  391. func getAndDeleteTag(span *model.Span, key string) (string, bool) {
  392. for i := range span.Tags {
  393. if span.Tags[i].Key == key {
  394. value := span.Tags[i].GetVStr()
  395. span.Tags = append(span.Tags[:i], span.Tags[i+1:]...)
  396. return value, true
  397. }
  398. }
  399. return "", false
  400. }
  401. func jRefTypeToAttribute(ref model.SpanRefType) string {
  402. if ref == model.ChildOf {
  403. return conventions.AttributeOpentracingRefTypeChildOf
  404. }
  405. return conventions.AttributeOpentracingRefTypeFollowsFrom
  406. }