unmarshaller.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver"
  4. import (
  5. "errors"
  6. "strings"
  7. "go.opentelemetry.io/collector/pdata/pcommon"
  8. "go.opentelemetry.io/collector/pdata/ptrace"
  9. "go.uber.org/zap"
  10. )
  11. // tracesUnmarshaller deserializes the message body.
  12. type tracesUnmarshaller interface {
  13. // unmarshal the amqp-message into traces.
  14. // Only valid traces are produced or error is returned
  15. unmarshal(message *inboundMessage) (ptrace.Traces, error)
  16. }
  17. // newUnmarshalleer returns a new unmarshaller ready for message unmarshalling
  18. func newTracesUnmarshaller(logger *zap.Logger, metrics *opencensusMetrics) tracesUnmarshaller {
  19. return &solaceTracesUnmarshaller{
  20. logger: logger,
  21. metrics: metrics,
  22. // v1 unmarshaller is implemented by solaceMessageUnmarshallerV1
  23. receiveUnmarshallerV1: &brokerTraceReceiveUnmarshallerV1{
  24. logger: logger,
  25. metrics: metrics,
  26. },
  27. egressUnmarshallerV1: &brokerTraceEgressUnmarshallerV1{
  28. logger: logger,
  29. metrics: metrics,
  30. },
  31. }
  32. }
  33. // solaceTracesUnmarshaller implements tracesUnmarshaller.
  34. type solaceTracesUnmarshaller struct {
  35. logger *zap.Logger
  36. metrics *opencensusMetrics
  37. receiveUnmarshallerV1 tracesUnmarshaller
  38. egressUnmarshallerV1 tracesUnmarshaller
  39. }
  40. var (
  41. errUpgradeRequired = errors.New("unsupported trace message, upgrade required")
  42. errUnknownTopic = errors.New("unknown topic")
  43. errEmptyPayload = errors.New("no binary attachment")
  44. )
  45. // unmarshal will unmarshal an *solaceMessage into ptrace.Traces.
  46. // It will make a decision based on the version of the message which unmarshalling strategy to use.
  47. // For now, only receive v1 messages are used.
  48. func (u *solaceTracesUnmarshaller) unmarshal(message *inboundMessage) (ptrace.Traces, error) {
  49. const (
  50. topicPrefix = "_telemetry/"
  51. receiveSpanPrefix = "broker/trace/receive/"
  52. egressSpanPrefix = "broker/trace/egress/"
  53. v1Suffix = "v1"
  54. )
  55. if message.Properties == nil || message.Properties.To == nil {
  56. // no topic
  57. u.logger.Error("Received message with no topic")
  58. return ptrace.Traces{}, errUnknownTopic
  59. }
  60. var topic string = *message.Properties.To
  61. // Multiplex the topic string. For now we only have a single type handled
  62. if strings.HasPrefix(topic, topicPrefix) {
  63. // we are a telemetry strng
  64. if strings.HasPrefix(topic[len(topicPrefix):], receiveSpanPrefix) {
  65. // we are handling a receive span, validate the version is v1
  66. if strings.HasSuffix(topic, v1Suffix) {
  67. return u.receiveUnmarshallerV1.unmarshal(message)
  68. }
  69. // otherwise we are an unknown version
  70. u.logger.Error("Received message with unsupported receive span version, an upgrade is required", zap.String("topic", *message.Properties.To))
  71. } else { // make lint happy, wants two boolean expressions to be written as a switch?!
  72. if strings.HasPrefix(topic[len(topicPrefix):], egressSpanPrefix) {
  73. if strings.HasSuffix(topic, v1Suffix) {
  74. return u.egressUnmarshallerV1.unmarshal(message)
  75. }
  76. } else {
  77. u.logger.Error("Received message with unsupported topic, an upgrade is required", zap.String("topic", *message.Properties.To))
  78. }
  79. }
  80. // if we don't know the type, we must upgrade
  81. return ptrace.Traces{}, errUpgradeRequired
  82. }
  83. // unknown topic, do not require an upgrade
  84. u.logger.Error("Received message with unknown topic", zap.String("topic", *message.Properties.To))
  85. return ptrace.Traces{}, errUnknownTopic
  86. }
  87. // common helper functions used by all unmarshallers
  88. // Endpoint types
  89. const (
  90. queueKind = "queue"
  91. topicEndpointKind = "topic-endpoint"
  92. )
  93. // Transaction event keys
  94. const (
  95. transactionInitiatorEventKey = "messaging.solace.transaction_initiator"
  96. transactionIDEventKey = "messaging.solace.transaction_id"
  97. transactedSessionNameEventKey = "messaging.solace.transacted_session_name"
  98. transactedSessionIDEventKey = "messaging.solace.transacted_session_id"
  99. transactionErrorMessageEventKey = "messaging.solace.transaction_error_message"
  100. transactionXIDEventKey = "messaging.solace.transaction_xid"
  101. )
  102. // span keys
  103. const (
  104. protocolAttrKey = "messaging.protocol"
  105. protocolVersionAttrKey = "messaging.protocol_version"
  106. messageIDAttrKey = "messaging.message_id"
  107. conversationIDAttrKey = "messaging.conversation_id"
  108. payloadSizeBytesAttrKey = "messaging.message_payload_size_bytes"
  109. destinationAttrKey = "messaging.destination"
  110. clientUsernameAttrKey = "messaging.solace.client_username"
  111. clientNameAttrKey = "messaging.solace.client_name"
  112. replicationGroupMessageIDAttrKey = "messaging.solace.replication_group_message_id"
  113. priorityAttrKey = "messaging.solace.priority"
  114. ttlAttrKey = "messaging.solace.ttl"
  115. dmqEligibleAttrKey = "messaging.solace.dmq_eligible"
  116. droppedEnqueueEventsSuccessAttrKey = "messaging.solace.dropped_enqueue_events_success"
  117. droppedEnqueueEventsFailedAttrKey = "messaging.solace.dropped_enqueue_events_failed"
  118. replyToAttrKey = "messaging.solace.reply_to_topic"
  119. receiveTimeAttrKey = "messaging.solace.broker_receive_time_unix_nano"
  120. droppedUserPropertiesAttrKey = "messaging.solace.dropped_application_message_properties"
  121. deliveryModeAttrKey = "messaging.solace.delivery_mode"
  122. hostIPAttrKey = "net.host.ip"
  123. hostPortAttrKey = "net.host.port"
  124. peerIPAttrKey = "net.peer.ip"
  125. peerPortAttrKey = "net.peer.port"
  126. )
  127. // constant attributes
  128. const (
  129. systemAttrKey = "messaging.system"
  130. systemAttrValue = "SolacePubSub+"
  131. operationAttrKey = "messaging.operation"
  132. )
  133. func setResourceSpanAttributes(attrMap pcommon.Map, routerName, version string, messageVpnName *string) {
  134. const (
  135. routerNameAttrKey = "service.name"
  136. messageVpnNameAttrKey = "service.instance.id"
  137. solosVersionAttrKey = "service.version"
  138. )
  139. attrMap.PutStr(routerNameAttrKey, routerName)
  140. attrMap.PutStr(solosVersionAttrKey, version)
  141. if messageVpnName != nil {
  142. attrMap.PutStr(messageVpnNameAttrKey, *messageVpnName)
  143. }
  144. }