unmarshaller_receive.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver"
  4. import (
  5. "encoding/hex"
  6. "fmt"
  7. "net"
  8. "strings"
  9. "go.opentelemetry.io/collector/pdata/pcommon"
  10. "go.opentelemetry.io/collector/pdata/ptrace"
  11. "go.opentelemetry.io/otel/baggage"
  12. "go.uber.org/zap"
  13. "google.golang.org/protobuf/proto"
  14. receive_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/receive/v1"
  15. )
  16. type brokerTraceReceiveUnmarshallerV1 struct {
  17. logger *zap.Logger
  18. metrics *opencensusMetrics
  19. }
  20. // unmarshal implements tracesUnmarshaller.unmarshal
  21. func (u *brokerTraceReceiveUnmarshallerV1) unmarshal(message *inboundMessage) (ptrace.Traces, error) {
  22. spanData, err := u.unmarshalToSpanData(message)
  23. if err != nil {
  24. return ptrace.Traces{}, err
  25. }
  26. traces := ptrace.NewTraces()
  27. u.populateTraces(spanData, traces)
  28. return traces, nil
  29. }
  30. // unmarshalToSpanData will consume an solaceMessage and unmarshal it into a SpanData.
  31. // Returns an error if one occurred.
  32. func (u *brokerTraceReceiveUnmarshallerV1) unmarshalToSpanData(message *inboundMessage) (*receive_v1.SpanData, error) {
  33. var data = message.GetData()
  34. if len(data) == 0 {
  35. return nil, errEmptyPayload
  36. }
  37. var spanData receive_v1.SpanData
  38. if err := proto.Unmarshal(data, &spanData); err != nil {
  39. return nil, err
  40. }
  41. return &spanData, nil
  42. }
  43. // createSpan will create a new Span from the given traces and map the given SpanData to the span.
  44. // This will set all required fields such as name version, trace and span ID, parent span ID (if applicable),
  45. // timestamps, errors and states.
  46. func (u *brokerTraceReceiveUnmarshallerV1) populateTraces(spanData *receive_v1.SpanData, traces ptrace.Traces) {
  47. // Append new resource span and map any attributes
  48. resourceSpan := traces.ResourceSpans().AppendEmpty()
  49. u.mapResourceSpanAttributes(spanData, resourceSpan.Resource().Attributes())
  50. instrLibrarySpans := resourceSpan.ScopeSpans().AppendEmpty()
  51. // Create a new span
  52. clientSpan := instrLibrarySpans.Spans().AppendEmpty()
  53. // map the basic span data
  54. u.mapClientSpanData(spanData, clientSpan)
  55. // map all span attributes
  56. u.mapClientSpanAttributes(spanData, clientSpan.Attributes())
  57. // map all events
  58. u.mapEvents(spanData, clientSpan)
  59. }
  60. func (u *brokerTraceReceiveUnmarshallerV1) mapResourceSpanAttributes(spanData *receive_v1.SpanData, attrMap pcommon.Map) {
  61. setResourceSpanAttributes(attrMap, spanData.RouterName, spanData.SolosVersion, spanData.MessageVpnName)
  62. }
  63. func (u *brokerTraceReceiveUnmarshallerV1) mapClientSpanData(spanData *receive_v1.SpanData, clientSpan ptrace.Span) {
  64. const clientSpanName = "(topic) receive"
  65. // client span constants
  66. clientSpan.SetName(clientSpanName)
  67. // SPAN_KIND_CONSUMER == 5
  68. clientSpan.SetKind(ptrace.SpanKindConsumer)
  69. // map trace ID
  70. var traceID [16]byte
  71. copy(traceID[:16], spanData.TraceId)
  72. clientSpan.SetTraceID(traceID)
  73. // map span ID
  74. var spanID [8]byte
  75. copy(spanID[:8], spanData.SpanId)
  76. clientSpan.SetSpanID(spanID)
  77. // conditional parent-span-id
  78. if len(spanData.ParentSpanId) == 8 {
  79. var parentSpanID [8]byte
  80. copy(parentSpanID[:8], spanData.ParentSpanId)
  81. clientSpan.SetParentSpanID(parentSpanID)
  82. }
  83. // timestamps
  84. clientSpan.SetStartTimestamp(pcommon.Timestamp(spanData.GetStartTimeUnixNano()))
  85. clientSpan.SetEndTimestamp(pcommon.Timestamp(spanData.GetEndTimeUnixNano()))
  86. // status
  87. if spanData.ErrorDescription != "" {
  88. clientSpan.Status().SetCode(ptrace.StatusCodeError)
  89. clientSpan.Status().SetMessage(spanData.ErrorDescription)
  90. }
  91. // trace state
  92. if spanData.TraceState != nil {
  93. clientSpan.TraceState().FromRaw(*spanData.TraceState)
  94. }
  95. }
  96. // mapAttributes takes a set of attributes from SpanData and maps them to ClientSpan.Attributes().
  97. // Will also copy any user properties stored in the SpanData with a best effort approach.
  98. func (u *brokerTraceReceiveUnmarshallerV1) mapClientSpanAttributes(spanData *receive_v1.SpanData, attrMap pcommon.Map) {
  99. // receive operation
  100. const operationAttrValue = "receive"
  101. attrMap.PutStr(systemAttrKey, systemAttrValue)
  102. attrMap.PutStr(operationAttrKey, operationAttrValue)
  103. attrMap.PutStr(protocolAttrKey, spanData.Protocol)
  104. if spanData.ProtocolVersion != nil {
  105. attrMap.PutStr(protocolVersionAttrKey, *spanData.ProtocolVersion)
  106. }
  107. if spanData.ApplicationMessageId != nil {
  108. attrMap.PutStr(messageIDAttrKey, *spanData.ApplicationMessageId)
  109. }
  110. if spanData.CorrelationId != nil {
  111. attrMap.PutStr(conversationIDAttrKey, *spanData.CorrelationId)
  112. }
  113. attrMap.PutInt(payloadSizeBytesAttrKey, int64(spanData.BinaryAttachmentSize+spanData.XmlAttachmentSize+spanData.MetadataSize))
  114. attrMap.PutStr(clientUsernameAttrKey, spanData.ClientUsername)
  115. attrMap.PutStr(clientNameAttrKey, spanData.ClientName)
  116. attrMap.PutInt(receiveTimeAttrKey, spanData.BrokerReceiveTimeUnixNano)
  117. attrMap.PutStr(destinationAttrKey, spanData.Topic)
  118. var deliveryMode string
  119. switch spanData.DeliveryMode {
  120. case receive_v1.SpanData_DIRECT:
  121. deliveryMode = "direct"
  122. case receive_v1.SpanData_NON_PERSISTENT:
  123. deliveryMode = "non_persistent"
  124. case receive_v1.SpanData_PERSISTENT:
  125. deliveryMode = "persistent"
  126. default:
  127. deliveryMode = fmt.Sprintf("Unknown Delivery Mode (%s)", spanData.DeliveryMode.String())
  128. u.logger.Warn(fmt.Sprintf("Received span with unknown delivery mode %s", spanData.DeliveryMode))
  129. u.metrics.recordRecoverableUnmarshallingError()
  130. }
  131. attrMap.PutStr(deliveryModeAttrKey, deliveryMode)
  132. rgmid := u.rgmidToString(spanData.ReplicationGroupMessageId)
  133. if len(rgmid) > 0 {
  134. attrMap.PutStr(replicationGroupMessageIDAttrKey, rgmid)
  135. }
  136. if spanData.Priority != nil {
  137. attrMap.PutInt(priorityAttrKey, int64(*spanData.Priority))
  138. }
  139. if spanData.Ttl != nil {
  140. attrMap.PutInt(ttlAttrKey, *spanData.Ttl)
  141. }
  142. if spanData.ReplyToTopic != nil {
  143. attrMap.PutStr(replyToAttrKey, *spanData.ReplyToTopic)
  144. }
  145. attrMap.PutBool(dmqEligibleAttrKey, spanData.DmqEligible)
  146. attrMap.PutInt(droppedEnqueueEventsSuccessAttrKey, int64(spanData.DroppedEnqueueEventsSuccess))
  147. attrMap.PutInt(droppedEnqueueEventsFailedAttrKey, int64(spanData.DroppedEnqueueEventsFailed))
  148. // The IPs are now optional meaning we will not incluude them if they are zero length
  149. hostIPLen := len(spanData.HostIp)
  150. if hostIPLen == 4 || hostIPLen == 16 {
  151. attrMap.PutStr(hostIPAttrKey, net.IP(spanData.HostIp).String())
  152. attrMap.PutInt(hostPortAttrKey, int64(spanData.HostPort))
  153. } else {
  154. u.logger.Debug("Host ip not included", zap.Int("length", hostIPLen))
  155. }
  156. peerIPLen := len(spanData.PeerIp)
  157. if peerIPLen == 4 || peerIPLen == 16 {
  158. attrMap.PutStr(peerIPAttrKey, net.IP(spanData.PeerIp).String())
  159. attrMap.PutInt(peerPortAttrKey, int64(spanData.PeerPort))
  160. } else {
  161. u.logger.Debug("Peer IP not included", zap.Int("length", peerIPLen))
  162. }
  163. if spanData.Baggage != nil {
  164. err := u.unmarshalBaggage(attrMap, *spanData.Baggage)
  165. if err != nil {
  166. u.logger.Warn("Received malformed baggage string in span data")
  167. u.metrics.recordRecoverableUnmarshallingError()
  168. }
  169. }
  170. attrMap.PutBool(droppedUserPropertiesAttrKey, spanData.DroppedApplicationMessageProperties)
  171. for key, value := range spanData.UserProperties {
  172. if value != nil {
  173. u.insertUserProperty(attrMap, key, value.Value)
  174. }
  175. }
  176. }
  177. // mapEvents maps all events contained in SpanData to relevant events within clientSpan.Events()
  178. func (u *brokerTraceReceiveUnmarshallerV1) mapEvents(spanData *receive_v1.SpanData, clientSpan ptrace.Span) {
  179. // handle enqueue events
  180. for _, enqueueEvent := range spanData.EnqueueEvents {
  181. u.mapEnqueueEvent(enqueueEvent, clientSpan.Events())
  182. }
  183. // handle transaction events
  184. if transactionEvent := spanData.TransactionEvent; transactionEvent != nil {
  185. u.mapTransactionEvent(transactionEvent, clientSpan.Events().AppendEmpty())
  186. }
  187. }
  188. // mapEnqueueEvent maps a SpanData_EnqueueEvent to a ClientSpan.Event
  189. func (u *brokerTraceReceiveUnmarshallerV1) mapEnqueueEvent(enqueueEvent *receive_v1.SpanData_EnqueueEvent, clientSpanEvents ptrace.SpanEventSlice) {
  190. const (
  191. enqueueEventSuffix = " enqueue" // Final should be `<dest> enqueue`
  192. messagingDestinationTypeEventKey = "messaging.solace.destination_type"
  193. statusMessageEventKey = "messaging.solace.enqueue_error_message"
  194. rejectsAllEnqueuesKey = "messaging.solace.rejects_all_enqueues"
  195. partitionNumberKey = "messaging.solace.partition_number"
  196. )
  197. var destinationName string
  198. var destinationType string
  199. switch casted := enqueueEvent.Dest.(type) {
  200. case *receive_v1.SpanData_EnqueueEvent_TopicEndpointName:
  201. destinationName = casted.TopicEndpointName
  202. destinationType = topicEndpointKind
  203. case *receive_v1.SpanData_EnqueueEvent_QueueName:
  204. destinationName = casted.QueueName
  205. destinationType = queueKind
  206. default:
  207. u.logger.Warn(fmt.Sprintf("Unknown destination type %T", casted))
  208. u.metrics.recordRecoverableUnmarshallingError()
  209. return
  210. }
  211. clientEvent := clientSpanEvents.AppendEmpty()
  212. clientEvent.SetName(destinationName + enqueueEventSuffix)
  213. clientEvent.SetTimestamp(pcommon.Timestamp(enqueueEvent.TimeUnixNano))
  214. clientEvent.Attributes().EnsureCapacity(3)
  215. clientEvent.Attributes().PutStr(messagingDestinationTypeEventKey, destinationType)
  216. clientEvent.Attributes().PutBool(rejectsAllEnqueuesKey, enqueueEvent.RejectsAllEnqueues)
  217. if enqueueEvent.ErrorDescription != nil {
  218. clientEvent.Attributes().PutStr(statusMessageEventKey, enqueueEvent.GetErrorDescription())
  219. }
  220. if enqueueEvent.PartitionNumber != nil {
  221. clientEvent.Attributes().PutInt(partitionNumberKey, int64(*enqueueEvent.PartitionNumber))
  222. }
  223. }
  224. // mapTransactionEvent maps a SpanData_TransactionEvent to a ClientSpan.Event
  225. func (u *brokerTraceReceiveUnmarshallerV1) mapTransactionEvent(transactionEvent *receive_v1.SpanData_TransactionEvent, clientEvent ptrace.SpanEvent) {
  226. // map the transaction type to a name
  227. var name string
  228. switch transactionEvent.GetType() {
  229. case receive_v1.SpanData_TransactionEvent_COMMIT:
  230. name = "commit"
  231. case receive_v1.SpanData_TransactionEvent_ROLLBACK:
  232. name = "rollback"
  233. case receive_v1.SpanData_TransactionEvent_END:
  234. name = "end"
  235. case receive_v1.SpanData_TransactionEvent_PREPARE:
  236. name = "prepare"
  237. case receive_v1.SpanData_TransactionEvent_SESSION_TIMEOUT:
  238. name = "session_timeout"
  239. case receive_v1.SpanData_TransactionEvent_ROLLBACK_ONLY:
  240. name = "rollback_only"
  241. default:
  242. // Set the name to the unknown transaction event type to ensure forward compat.
  243. name = fmt.Sprintf("Unknown Transaction Event (%s)", transactionEvent.GetType().String())
  244. u.logger.Warn(fmt.Sprintf("Received span with unknown transaction event %s", transactionEvent.GetType()))
  245. u.metrics.recordRecoverableUnmarshallingError()
  246. }
  247. clientEvent.SetName(name)
  248. clientEvent.SetTimestamp(pcommon.Timestamp(transactionEvent.TimeUnixNano))
  249. // map initiator enums to expected initiator strings
  250. var initiator string
  251. switch transactionEvent.GetInitiator() {
  252. case receive_v1.SpanData_TransactionEvent_CLIENT:
  253. initiator = "client"
  254. case receive_v1.SpanData_TransactionEvent_ADMIN:
  255. initiator = "administrator"
  256. case receive_v1.SpanData_TransactionEvent_BROKER:
  257. initiator = "broker"
  258. default:
  259. initiator = fmt.Sprintf("Unknown Transaction Initiator (%s)", transactionEvent.GetInitiator().String())
  260. u.logger.Warn(fmt.Sprintf("Received span with unknown transaction initiator %s", transactionEvent.GetInitiator()))
  261. u.metrics.recordRecoverableUnmarshallingError()
  262. }
  263. clientEvent.Attributes().PutStr(transactionInitiatorEventKey, initiator)
  264. // conditionally set the error description if one occurred, otherwise omit
  265. if transactionEvent.ErrorDescription != nil {
  266. clientEvent.Attributes().PutStr(transactionErrorMessageEventKey, transactionEvent.GetErrorDescription())
  267. }
  268. // map the transaction type/id
  269. transactionID := transactionEvent.GetTransactionId()
  270. switch casted := transactionID.(type) {
  271. case *receive_v1.SpanData_TransactionEvent_LocalId:
  272. clientEvent.Attributes().PutInt(transactionIDEventKey, int64(casted.LocalId.TransactionId))
  273. clientEvent.Attributes().PutStr(transactedSessionNameEventKey, casted.LocalId.SessionName)
  274. clientEvent.Attributes().PutInt(transactedSessionIDEventKey, int64(casted.LocalId.SessionId))
  275. case *receive_v1.SpanData_TransactionEvent_Xid_:
  276. // format xxxxxxxx-yyyyyyyy-zzzzzzzz where x is FormatID (hex rep of int32), y is BranchQualifier and z is GlobalID, hex encoded.
  277. xidString := fmt.Sprintf("%08x", casted.Xid.FormatId) + "-" +
  278. hex.EncodeToString(casted.Xid.BranchQualifier) + "-" + hex.EncodeToString(casted.Xid.GlobalId)
  279. clientEvent.Attributes().PutStr(transactionXIDEventKey, xidString)
  280. default:
  281. u.logger.Warn(fmt.Sprintf("Unknown transaction ID type %T", transactionID))
  282. u.metrics.recordRecoverableUnmarshallingError()
  283. }
  284. }
  285. func (u *brokerTraceReceiveUnmarshallerV1) rgmidToString(rgmid []byte) string {
  286. // rgmid[0] is the version of the rgmid
  287. if len(rgmid) != 17 || rgmid[0] != 1 {
  288. // may be cases where the rgmid is empty or nil, len(rgmid) will return 0 if nil
  289. if len(rgmid) > 0 {
  290. u.logger.Warn("Received invalid length or version for rgmid", zap.Int8("version", int8(rgmid[0])), zap.Int("length", len(rgmid)))
  291. u.metrics.recordRecoverableUnmarshallingError()
  292. }
  293. return hex.EncodeToString(rgmid)
  294. }
  295. rgmidEncoded := make([]byte, 32)
  296. hex.Encode(rgmidEncoded, rgmid[1:])
  297. // format: rmid1:aaaaa-bbbbbbbbbbb-cccccccc-dddddddd
  298. rgmidString := "rmid1:" + string(rgmidEncoded[0:5]) + "-" + string(rgmidEncoded[5:16]) + "-" + string(rgmidEncoded[16:24]) + "-" + string(rgmidEncoded[24:32])
  299. return rgmidString
  300. }
  301. // unmarshalBaggage will unmarshal a baggage string
  302. // See spec https://github.com/open-telemetry/opentelemetry-go/blob/v1.11.1/baggage/baggage.go
  303. func (u *brokerTraceReceiveUnmarshallerV1) unmarshalBaggage(toMap pcommon.Map, baggageString string) error {
  304. const (
  305. baggageValuePrefix = "messaging.solace.message.baggage."
  306. baggageMetadataPrefix = "messaging.solace.message.baggage_metadata."
  307. propertyDelimiter = ";"
  308. )
  309. bg, err := baggage.Parse(baggageString)
  310. if err != nil {
  311. return err
  312. }
  313. // we got a valid baggage string, assume everything else is valid
  314. for _, member := range bg.Members() {
  315. toMap.PutStr(baggageValuePrefix+member.Key(), member.Value())
  316. // member.Properties copies, we should cache
  317. properties := member.Properties()
  318. if len(properties) > 0 {
  319. // Re-encode the properties and save them as a parameter
  320. var propertyString strings.Builder
  321. propertyString.WriteString(properties[0].String())
  322. for i := 1; i < len(properties); i++ {
  323. propertyString.WriteString(propertyDelimiter + properties[i].String())
  324. }
  325. toMap.PutStr(baggageMetadataPrefix+member.Key(), propertyString.String())
  326. }
  327. }
  328. return nil
  329. }
  330. // insertUserProperty will instert a user property value with the given key to an attribute if possible.
  331. // Since AttributeMap only supports int64 integer types, uint64 data may be misrepresented.
  332. func (u *brokerTraceReceiveUnmarshallerV1) insertUserProperty(toMap pcommon.Map, key string, value any) {
  333. const (
  334. // userPropertiesPrefixAttrKey is the key used to prefix all user properties
  335. userPropertiesAttrKeyPrefix = "messaging.solace.user_properties."
  336. )
  337. k := userPropertiesAttrKeyPrefix + key
  338. switch v := value.(type) {
  339. case *receive_v1.SpanData_UserPropertyValue_NullValue:
  340. toMap.PutEmpty(k)
  341. case *receive_v1.SpanData_UserPropertyValue_BoolValue:
  342. toMap.PutBool(k, v.BoolValue)
  343. case *receive_v1.SpanData_UserPropertyValue_DoubleValue:
  344. toMap.PutDouble(k, v.DoubleValue)
  345. case *receive_v1.SpanData_UserPropertyValue_ByteArrayValue:
  346. toMap.PutEmptyBytes(k).FromRaw(v.ByteArrayValue)
  347. case *receive_v1.SpanData_UserPropertyValue_FloatValue:
  348. toMap.PutDouble(k, float64(v.FloatValue))
  349. case *receive_v1.SpanData_UserPropertyValue_Int8Value:
  350. toMap.PutInt(k, int64(v.Int8Value))
  351. case *receive_v1.SpanData_UserPropertyValue_Int16Value:
  352. toMap.PutInt(k, int64(v.Int16Value))
  353. case *receive_v1.SpanData_UserPropertyValue_Int32Value:
  354. toMap.PutInt(k, int64(v.Int32Value))
  355. case *receive_v1.SpanData_UserPropertyValue_Int64Value:
  356. toMap.PutInt(k, v.Int64Value)
  357. case *receive_v1.SpanData_UserPropertyValue_Uint8Value:
  358. toMap.PutInt(k, int64(v.Uint8Value))
  359. case *receive_v1.SpanData_UserPropertyValue_Uint16Value:
  360. toMap.PutInt(k, int64(v.Uint16Value))
  361. case *receive_v1.SpanData_UserPropertyValue_Uint32Value:
  362. toMap.PutInt(k, int64(v.Uint32Value))
  363. case *receive_v1.SpanData_UserPropertyValue_Uint64Value:
  364. toMap.PutInt(k, int64(v.Uint64Value))
  365. case *receive_v1.SpanData_UserPropertyValue_StringValue:
  366. toMap.PutStr(k, v.StringValue)
  367. case *receive_v1.SpanData_UserPropertyValue_DestinationValue:
  368. toMap.PutStr(k, v.DestinationValue)
  369. case *receive_v1.SpanData_UserPropertyValue_CharacterValue:
  370. toMap.PutStr(k, string(rune(v.CharacterValue)))
  371. default:
  372. u.logger.Warn(fmt.Sprintf("Unknown user property type: %T", v))
  373. u.metrics.recordRecoverableUnmarshallingError()
  374. }
  375. }