thrift.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package zipkinv1 // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv1"
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/base64"
  8. "encoding/binary"
  9. "errors"
  10. "fmt"
  11. "math"
  12. "net"
  13. jaegerzipkin "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
  14. "github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
  15. "go.opentelemetry.io/collector/pdata/pcommon"
  16. "go.opentelemetry.io/collector/pdata/ptrace"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/idutils"
  18. )
  19. type thriftUnmarshaler struct{}
  20. // UnmarshalTraces from Thrift bytes.
  21. func (t thriftUnmarshaler) UnmarshalTraces(buf []byte) (ptrace.Traces, error) {
  22. spans, err := jaegerzipkin.DeserializeThrift(context.TODO(), buf)
  23. if err != nil {
  24. return ptrace.Traces{}, err
  25. }
  26. return thriftBatchToTraces(spans)
  27. }
  28. // NewThriftTracesUnmarshaler returns an unmarshaler for Zipkin Thrift.
  29. func NewThriftTracesUnmarshaler() ptrace.Unmarshaler {
  30. return thriftUnmarshaler{}
  31. }
  32. // thriftBatchToTraces converts Zipkin v1 spans to ptrace.Traces.
  33. func thriftBatchToTraces(zSpans []*zipkincore.Span) (ptrace.Traces, error) {
  34. spanAndEndpoints := make([]spanAndEndpoint, 0, len(zSpans))
  35. for _, zSpan := range zSpans {
  36. spanAndEndpoints = append(spanAndEndpoints, thriftToSpanAndEndpoint(zSpan))
  37. }
  38. return zipkinToTraces(spanAndEndpoints)
  39. }
  40. func thriftToSpanAndEndpoint(zSpan *zipkincore.Span) spanAndEndpoint {
  41. traceIDHigh := int64(0)
  42. if zSpan.TraceIDHigh != nil {
  43. traceIDHigh = *zSpan.TraceIDHigh
  44. }
  45. // TODO: (@pjanotti) ideally we should error here instead of generating invalid Traces
  46. // however per https://go.opentelemetry.io/collector/issues/349
  47. // failures on the receivers in general are silent at this moment, so letting them
  48. // proceed for now. We should validate the traceID, spanID and parentID are good with
  49. // OTLP requirements.
  50. traceID := idutils.UInt64ToTraceID(uint64(traceIDHigh), uint64(zSpan.TraceID))
  51. spanID := idutils.UInt64ToSpanID(uint64(zSpan.ID))
  52. var parentID pcommon.SpanID
  53. if zSpan.ParentID != nil {
  54. parentID = idutils.UInt64ToSpanID(uint64(*zSpan.ParentID))
  55. }
  56. span, edpt := thriftAnnotationsToSpanAndEndpoint(zSpan.Annotations)
  57. localComponent := thriftBinAnnotationsToSpanAttributes(span, zSpan.BinaryAnnotations)
  58. if edpt.ServiceName == unknownServiceName && localComponent != "" {
  59. edpt.ServiceName = localComponent
  60. }
  61. if zSpan.Timestamp != nil {
  62. span.SetStartTimestamp(epochMicrosecondsToTimestamp(*zSpan.Timestamp))
  63. var duration int64
  64. if zSpan.Duration != nil {
  65. duration = *zSpan.Duration
  66. }
  67. span.SetEndTimestamp(epochMicrosecondsToTimestamp(*zSpan.Timestamp + duration))
  68. }
  69. span.SetName(zSpan.Name)
  70. span.SetTraceID(traceID)
  71. span.SetSpanID(spanID)
  72. span.SetParentSpanID(parentID)
  73. return spanAndEndpoint{span: span, endpoint: edpt}
  74. }
  75. func thriftAnnotationsToSpanAndEndpoint(ztAnnotations []*zipkincore.Annotation) (ptrace.Span, *endpoint) {
  76. annotations := make([]*annotation, 0, len(ztAnnotations))
  77. for _, ztAnnot := range ztAnnotations {
  78. annot := &annotation{
  79. Timestamp: ztAnnot.Timestamp,
  80. Value: ztAnnot.Value,
  81. Endpoint: toTranslatorEndpoint(ztAnnot.Host),
  82. }
  83. annotations = append(annotations, annot)
  84. }
  85. return jsonAnnotationsToSpanAndEndpoint(annotations)
  86. }
  87. func toTranslatorEndpoint(e *zipkincore.Endpoint) *endpoint {
  88. if e == nil {
  89. return nil
  90. }
  91. var ipv4, ipv6 string
  92. if e.Ipv4 != 0 {
  93. ipv4 = net.IPv4(byte(e.Ipv4>>24), byte(e.Ipv4>>16), byte(e.Ipv4>>8), byte(e.Ipv4)).String()
  94. }
  95. if len(e.Ipv6) != 0 {
  96. ipv6 = net.IP(e.Ipv6).String()
  97. }
  98. return &endpoint{
  99. ServiceName: e.ServiceName,
  100. IPv4: ipv4,
  101. IPv6: ipv6,
  102. Port: int32(e.Port),
  103. }
  104. }
  105. var trueByteSlice = []byte{1}
  106. func thriftBinAnnotationsToSpanAttributes(span ptrace.Span, ztBinAnnotations []*zipkincore.BinaryAnnotation) string {
  107. var fallbackServiceName string
  108. if len(ztBinAnnotations) == 0 {
  109. return fallbackServiceName
  110. }
  111. sMapper := &statusMapper{}
  112. var localComponent string
  113. for _, binaryAnnotation := range ztBinAnnotations {
  114. val := pcommon.NewValueEmpty()
  115. binAnnotationType := binaryAnnotation.AnnotationType
  116. if binaryAnnotation.Host != nil {
  117. fallbackServiceName = binaryAnnotation.Host.ServiceName
  118. }
  119. switch binaryAnnotation.AnnotationType {
  120. case zipkincore.AnnotationType_BOOL:
  121. isTrue := bytes.Equal(binaryAnnotation.Value, trueByteSlice)
  122. val.SetBool(isTrue)
  123. case zipkincore.AnnotationType_BYTES:
  124. bytesStr := base64.StdEncoding.EncodeToString(binaryAnnotation.Value)
  125. val.SetStr(bytesStr)
  126. case zipkincore.AnnotationType_DOUBLE:
  127. if d, err := bytesFloat64ToFloat64(binaryAnnotation.Value); err != nil {
  128. strAttributeForError(val, err)
  129. } else {
  130. val.SetDouble(d)
  131. }
  132. case zipkincore.AnnotationType_I16:
  133. if i, err := bytesInt16ToInt64(binaryAnnotation.Value); err != nil {
  134. strAttributeForError(val, err)
  135. } else {
  136. val.SetInt(i)
  137. }
  138. case zipkincore.AnnotationType_I32:
  139. if i, err := bytesInt32ToInt64(binaryAnnotation.Value); err != nil {
  140. strAttributeForError(val, err)
  141. } else {
  142. val.SetInt(i)
  143. }
  144. case zipkincore.AnnotationType_I64:
  145. if i, err := bytesInt64ToInt64(binaryAnnotation.Value); err != nil {
  146. strAttributeForError(val, err)
  147. } else {
  148. val.SetInt(i)
  149. }
  150. case zipkincore.AnnotationType_STRING:
  151. val.SetStr(string(binaryAnnotation.Value))
  152. default:
  153. strAttributeForError(val, fmt.Errorf("unknown zipkin v1 binary annotation type (%d)", int(binAnnotationType)))
  154. }
  155. key := binaryAnnotation.Key
  156. if key == zipkincore.LOCAL_COMPONENT {
  157. // TODO: (@pjanotti) add reference to OpenTracing and change related tags to use them
  158. key = "component"
  159. localComponent = string(binaryAnnotation.Value)
  160. }
  161. if drop := sMapper.fromAttribute(key, val); drop {
  162. continue
  163. }
  164. val.CopyTo(span.Attributes().PutEmpty(key))
  165. }
  166. if fallbackServiceName == "" {
  167. fallbackServiceName = localComponent
  168. }
  169. sMapper.status(span.Status())
  170. return fallbackServiceName
  171. }
  172. var errNotEnoughBytes = errors.New("not enough bytes representing the number")
  173. func bytesInt16ToInt64(b []byte) (int64, error) {
  174. const minSliceLength = 2
  175. if len(b) < minSliceLength {
  176. return 0, errNotEnoughBytes
  177. }
  178. return int64(binary.BigEndian.Uint16(b[:minSliceLength])), nil
  179. }
  180. func bytesInt32ToInt64(b []byte) (int64, error) {
  181. const minSliceLength = 4
  182. if len(b) < minSliceLength {
  183. return 0, errNotEnoughBytes
  184. }
  185. return int64(binary.BigEndian.Uint32(b[:minSliceLength])), nil
  186. }
  187. func bytesInt64ToInt64(b []byte) (int64, error) {
  188. const minSliceLength = 8
  189. if len(b) < minSliceLength {
  190. return 0, errNotEnoughBytes
  191. }
  192. return int64(binary.BigEndian.Uint64(b[:minSliceLength])), nil
  193. }
  194. func bytesFloat64ToFloat64(b []byte) (float64, error) {
  195. const minSliceLength = 8
  196. if len(b) < minSliceLength {
  197. return 0.0, errNotEnoughBytes
  198. }
  199. bits := binary.BigEndian.Uint64(b)
  200. return math.Float64frombits(bits), nil
  201. }
  202. func strAttributeForError(dest pcommon.Value, err error) {
  203. dest.SetStr("<" + err.Error() + ">")
  204. }