skywalkingproto_to_traces.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package skywalking // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/skywalking"
  4. import (
  5. "bytes"
  6. "encoding/hex"
  7. "reflect"
  8. "strconv"
  9. "time"
  10. "unsafe"
  11. "github.com/google/uuid"
  12. "go.opentelemetry.io/collector/pdata/pcommon"
  13. "go.opentelemetry.io/collector/pdata/ptrace"
  14. conventions "go.opentelemetry.io/collector/semconv/v1.8.0"
  15. common "skywalking.apache.org/repo/goapi/collect/common/v3"
  16. agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
  17. )
  18. const (
  19. AttributeRefType = "refType"
  20. AttributeParentService = "parent.service"
  21. AttributeParentInstance = "parent.service.instance"
  22. AttributeParentEndpoint = "parent.endpoint"
  23. AttributeSkywalkingSpanID = "sw8.span_id"
  24. AttributeSkywalkingTraceID = "sw8.trace_id"
  25. AttributeSkywalkingSegmentID = "sw8.segment_id"
  26. AttributeSkywalkingParentSpanID = "sw8.parent_span_id"
  27. AttributeSkywalkingParentSegmentID = "sw8.parent_segment_id"
  28. AttributeNetworkAddressUsedAtPeer = "network.AddressUsedAtPeer"
  29. )
  30. var otSpanTagsMapping = map[string]string{
  31. "url": conventions.AttributeHTTPURL,
  32. "status_code": conventions.AttributeHTTPStatusCode,
  33. "db.type": conventions.AttributeDBSystem,
  34. "db.instance": conventions.AttributeDBName,
  35. "mq.broker": conventions.AttributeNetPeerName,
  36. }
  37. // ProtoToTraces converts multiple skywalking proto batches to internal traces
  38. func ProtoToTraces(segment *agentV3.SegmentObject) ptrace.Traces {
  39. traceData := ptrace.NewTraces()
  40. swSpans := segment.Spans
  41. if swSpans == nil && len(swSpans) == 0 {
  42. return traceData
  43. }
  44. resourceSpan := traceData.ResourceSpans().AppendEmpty()
  45. rs := resourceSpan.Resource()
  46. for _, span := range swSpans {
  47. swTagsToInternalResource(span, rs)
  48. }
  49. rs.Attributes().PutStr(conventions.AttributeServiceName, segment.GetService())
  50. rs.Attributes().PutStr(conventions.AttributeServiceInstanceID, segment.GetServiceInstance())
  51. rs.Attributes().PutStr(AttributeSkywalkingTraceID, segment.GetTraceId())
  52. il := resourceSpan.ScopeSpans().AppendEmpty()
  53. swSpansToSpanSlice(segment.GetTraceId(), segment.GetTraceSegmentId(), swSpans, il.Spans())
  54. return traceData
  55. }
  56. func swTagsToInternalResource(span *agentV3.SpanObject, dest pcommon.Resource) {
  57. if span == nil {
  58. return
  59. }
  60. attrs := dest.Attributes()
  61. attrs.Clear()
  62. tags := span.Tags
  63. if tags == nil {
  64. return
  65. }
  66. for _, tag := range tags {
  67. otKey, ok := otSpanTagsMapping[tag.Key]
  68. if ok {
  69. attrs.PutStr(otKey, tag.Value)
  70. }
  71. }
  72. }
  73. func swSpansToSpanSlice(traceID string, segmentID string, spans []*agentV3.SpanObject, dest ptrace.SpanSlice) {
  74. if len(spans) == 0 {
  75. return
  76. }
  77. dest.EnsureCapacity(len(spans))
  78. for _, span := range spans {
  79. if span == nil {
  80. continue
  81. }
  82. swSpanToSpan(traceID, segmentID, span, dest.AppendEmpty())
  83. }
  84. }
  85. func swSpanToSpan(traceID string, segmentID string, span *agentV3.SpanObject, dest ptrace.Span) {
  86. dest.SetTraceID(swTraceIDToTraceID(traceID))
  87. // skywalking defines segmentId + spanId as unique identifier
  88. // so use segmentId to convert to an unique otel-span
  89. dest.SetSpanID(segmentIDToSpanID(segmentID, uint32(span.GetSpanId())))
  90. // parent spanid = -1, means(root span) no parent span in current skywalking segment, so it is necessary to search for the parent segment.
  91. if span.ParentSpanId != -1 {
  92. dest.SetParentSpanID(segmentIDToSpanID(segmentID, uint32(span.GetParentSpanId())))
  93. } else if len(span.Refs) == 1 {
  94. // TODO: SegmentReference references usually have only one element, but in batch consumer case, such as in MQ or async batch process, it could be multiple.
  95. // We only handle one element for now.
  96. dest.SetParentSpanID(segmentIDToSpanID(span.Refs[0].GetParentTraceSegmentId(), uint32(span.Refs[0].GetParentSpanId())))
  97. }
  98. dest.SetName(span.OperationName)
  99. dest.SetStartTimestamp(microsecondsToTimestamp(span.GetStartTime()))
  100. dest.SetEndTimestamp(microsecondsToTimestamp(span.GetEndTime()))
  101. attrs := dest.Attributes()
  102. attrs.EnsureCapacity(len(span.Tags))
  103. swKvPairsToInternalAttributes(span.Tags, attrs)
  104. // drop the attributes slice if all of them were replaced during translation
  105. if attrs.Len() == 0 {
  106. attrs.Clear()
  107. }
  108. attrs.PutStr(AttributeSkywalkingSegmentID, segmentID)
  109. setSwSpanIDToAttributes(span, attrs)
  110. setInternalSpanStatus(span, dest.Status())
  111. switch {
  112. case span.SpanLayer == agentV3.SpanLayer_MQ:
  113. if span.SpanType == agentV3.SpanType_Entry {
  114. dest.SetKind(ptrace.SpanKindConsumer)
  115. } else if span.SpanType == agentV3.SpanType_Exit {
  116. dest.SetKind(ptrace.SpanKindProducer)
  117. }
  118. case span.GetSpanType() == agentV3.SpanType_Exit:
  119. dest.SetKind(ptrace.SpanKindClient)
  120. case span.GetSpanType() == agentV3.SpanType_Entry:
  121. dest.SetKind(ptrace.SpanKindServer)
  122. case span.GetSpanType() == agentV3.SpanType_Local:
  123. dest.SetKind(ptrace.SpanKindInternal)
  124. default:
  125. dest.SetKind(ptrace.SpanKindUnspecified)
  126. }
  127. swLogsToSpanEvents(span.GetLogs(), dest.Events())
  128. // skywalking: In the across thread and across processes, these references target the parent segments.
  129. swReferencesToSpanLinks(span.Refs, dest.Links())
  130. }
  131. func swReferencesToSpanLinks(refs []*agentV3.SegmentReference, dest ptrace.SpanLinkSlice) {
  132. if len(refs) == 0 {
  133. return
  134. }
  135. dest.EnsureCapacity(len(refs))
  136. for _, ref := range refs {
  137. link := dest.AppendEmpty()
  138. link.SetTraceID(swTraceIDToTraceID(ref.TraceId))
  139. link.SetSpanID(segmentIDToSpanID(ref.ParentTraceSegmentId, uint32(ref.ParentSpanId)))
  140. link.TraceState().FromRaw("")
  141. kvParis := []*common.KeyStringValuePair{
  142. {
  143. Key: AttributeParentService,
  144. Value: ref.ParentService,
  145. },
  146. {
  147. Key: AttributeParentInstance,
  148. Value: ref.ParentServiceInstance,
  149. },
  150. {
  151. Key: AttributeParentEndpoint,
  152. Value: ref.ParentEndpoint,
  153. },
  154. {
  155. Key: AttributeNetworkAddressUsedAtPeer,
  156. Value: ref.NetworkAddressUsedAtPeer,
  157. },
  158. {
  159. Key: AttributeRefType,
  160. Value: ref.RefType.String(),
  161. },
  162. {
  163. Key: AttributeSkywalkingTraceID,
  164. Value: ref.TraceId,
  165. },
  166. {
  167. Key: AttributeSkywalkingParentSegmentID,
  168. Value: ref.ParentTraceSegmentId,
  169. },
  170. {
  171. Key: AttributeSkywalkingParentSpanID,
  172. Value: strconv.Itoa(int(ref.ParentSpanId)),
  173. },
  174. }
  175. swKvPairsToInternalAttributes(kvParis, link.Attributes())
  176. }
  177. }
  178. func setInternalSpanStatus(span *agentV3.SpanObject, dest ptrace.Status) {
  179. if span.GetIsError() {
  180. dest.SetCode(ptrace.StatusCodeError)
  181. dest.SetMessage("ERROR")
  182. } else {
  183. dest.SetCode(ptrace.StatusCodeOk)
  184. dest.SetMessage("SUCCESS")
  185. }
  186. }
  187. func setSwSpanIDToAttributes(span *agentV3.SpanObject, dest pcommon.Map) {
  188. dest.PutInt(AttributeSkywalkingSpanID, int64(span.GetSpanId()))
  189. if span.ParentSpanId != -1 {
  190. dest.PutInt(AttributeSkywalkingParentSpanID, int64(span.GetParentSpanId()))
  191. }
  192. }
  193. func swLogsToSpanEvents(logs []*agentV3.Log, dest ptrace.SpanEventSlice) {
  194. if len(logs) == 0 {
  195. return
  196. }
  197. dest.EnsureCapacity(len(logs))
  198. for i, log := range logs {
  199. var event ptrace.SpanEvent
  200. if dest.Len() > i {
  201. event = dest.At(i)
  202. } else {
  203. event = dest.AppendEmpty()
  204. }
  205. event.SetName("logs")
  206. event.SetTimestamp(microsecondsToTimestamp(log.GetTime()))
  207. if len(log.GetData()) == 0 {
  208. continue
  209. }
  210. attrs := event.Attributes()
  211. attrs.Clear()
  212. attrs.EnsureCapacity(len(log.GetData()))
  213. swKvPairsToInternalAttributes(log.GetData(), attrs)
  214. }
  215. }
  216. func swKvPairsToInternalAttributes(pairs []*common.KeyStringValuePair, dest pcommon.Map) {
  217. if pairs == nil {
  218. return
  219. }
  220. for _, pair := range pairs {
  221. dest.PutStr(pair.Key, pair.Value)
  222. }
  223. }
  224. // microsecondsToTimestamp converts epoch microseconds to pcommon.Timestamp
  225. func microsecondsToTimestamp(ms int64) pcommon.Timestamp {
  226. return pcommon.NewTimestampFromTime(time.UnixMilli(ms))
  227. }
  228. func swTraceIDToTraceID(traceID string) pcommon.TraceID {
  229. // skywalking traceid format:
  230. // de5980b8-fce3-4a37-aab9-b4ac3af7eedd: from browser/js-sdk/envoy/nginx-lua sdk/py-agent
  231. // 56a5e1c519ae4c76a2b8b11d92cead7f.12.16563474296430001: from java-agent
  232. if len(traceID) <= 36 { // 36: uuid length (rfc4122)
  233. uid, err := uuid.Parse(traceID)
  234. if err != nil {
  235. return pcommon.NewTraceIDEmpty()
  236. }
  237. return pcommon.TraceID(uid)
  238. }
  239. return swStringToUUID(traceID, 0)
  240. }
  241. func segmentIDToSpanID(segmentID string, spanID uint32) pcommon.SpanID {
  242. // skywalking segmentid format:
  243. // 56a5e1c519ae4c76a2b8b11d92cead7f.12.16563474296430001: from TraceSegmentId
  244. // 56a5e1c519ae4c76a2b8b11d92cead7f: from ParentTraceSegmentId
  245. if len(segmentID) < 32 {
  246. return pcommon.NewSpanIDEmpty()
  247. }
  248. return uuidTo8Bytes(swStringToUUID(segmentID, spanID))
  249. }
  250. func swStringToUUID(s string, extra uint32) (dst [16]byte) {
  251. // there are 2 possible formats for 's':
  252. // s format = 56a5e1c519ae4c76a2b8b11d92cead7f.0000000000.000000000000000000
  253. // ^ start(length=32) ^ mid(u32) ^ last(u64)
  254. // uid = UUID(start) XOR ([4]byte(extra) . [4]byte(uint32(mid)) . [8]byte(uint64(last)))
  255. // s format = 56a5e1c519ae4c76a2b8b11d92cead7f
  256. // ^ start(length=32)
  257. // uid = UUID(start) XOR [4]byte(extra)
  258. if len(s) < 32 {
  259. return
  260. }
  261. t := unsafeGetBytes(s)
  262. var uid [16]byte
  263. _, err := hex.Decode(uid[:], t[:32])
  264. if err != nil {
  265. return uid
  266. }
  267. for i := 0; i < 4; i++ {
  268. uid[i] ^= byte(extra)
  269. extra >>= 8
  270. }
  271. if len(s) == 32 {
  272. return uid
  273. }
  274. index1 := bytes.IndexByte(t, '.')
  275. index2 := bytes.LastIndexByte(t, '.')
  276. if index1 != 32 || index2 < 0 {
  277. return
  278. }
  279. mid, err := strconv.Atoi(s[index1+1 : index2])
  280. if err != nil {
  281. return
  282. }
  283. last, err := strconv.Atoi(s[index2+1:])
  284. if err != nil {
  285. return
  286. }
  287. for i := 4; i < 8; i++ {
  288. uid[i] ^= byte(mid)
  289. mid >>= 8
  290. }
  291. for i := 8; i < 16; i++ {
  292. uid[i] ^= byte(last)
  293. last >>= 8
  294. }
  295. return uid
  296. }
  297. func uuidTo8Bytes(uuid [16]byte) [8]byte {
  298. // high bit XOR low bit
  299. var dst [8]byte
  300. for i := 0; i < 8; i++ {
  301. dst[i] = uuid[i] ^ uuid[i+8]
  302. }
  303. return dst
  304. }
  305. func unsafeGetBytes(s string) []byte {
  306. return (*[0x7fff0000]byte)(unsafe.Pointer(
  307. (*reflect.StringHeader)(unsafe.Pointer(&s)).Data),
  308. )[:len(s):len(s)]
  309. }