unmarshaller_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package solacereceiver
  4. import (
  5. "errors"
  6. "testing"
  7. "github.com/Azure/go-amqp"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. "go.opentelemetry.io/collector/pdata/pcommon"
  11. "go.opentelemetry.io/collector/pdata/ptrace"
  12. "go.uber.org/zap"
  13. "google.golang.org/protobuf/proto"
  14. egress_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/egress/v1"
  15. receive_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/receive/v1"
  16. )
  17. // Validate entire unmarshal flow
  18. func TestSolaceMessageUnmarshallerUnmarshal(t *testing.T) {
  19. validReceiveTopicVersion := "_telemetry/broker/trace/receive/v1"
  20. validEgressTopicVersion := "_telemetry/broker/trace/egress/v1"
  21. invalidReceiveTopicVersion := "_telemetry/broker/trace/receive/v2"
  22. invalidTelemetryTopic := "_telemetry/broker/trace/somethingNew"
  23. invalidTopicString := "some unknown topic string that won't be valid"
  24. tests := []struct {
  25. name string
  26. message *amqp.Message
  27. want *ptrace.Traces
  28. err error
  29. }{
  30. {
  31. name: "Unknown Topic Stirng",
  32. message: &inboundMessage{
  33. Properties: &amqp.MessageProperties{
  34. To: &invalidTopicString,
  35. },
  36. },
  37. err: errUnknownTopic,
  38. },
  39. {
  40. name: "Bad Topic Version",
  41. message: &inboundMessage{
  42. Properties: &amqp.MessageProperties{
  43. To: &invalidReceiveTopicVersion,
  44. },
  45. },
  46. err: errUpgradeRequired,
  47. },
  48. {
  49. name: "Unknown Telemetry Topic",
  50. message: &inboundMessage{
  51. Properties: &amqp.MessageProperties{
  52. To: &invalidTelemetryTopic,
  53. },
  54. },
  55. err: errUpgradeRequired,
  56. },
  57. {
  58. name: "No Message Properties",
  59. message: &inboundMessage{
  60. Properties: nil,
  61. },
  62. err: errUnknownTopic,
  63. },
  64. {
  65. name: "No Topic String",
  66. message: &inboundMessage{
  67. Properties: &amqp.MessageProperties{
  68. To: nil,
  69. },
  70. },
  71. err: errUnknownTopic,
  72. },
  73. {
  74. name: "Empty Message Data with Receive topic",
  75. message: &amqp.Message{
  76. Data: [][]byte{{}},
  77. Properties: &amqp.MessageProperties{
  78. To: &validReceiveTopicVersion,
  79. },
  80. },
  81. err: errEmptyPayload,
  82. },
  83. {
  84. name: "Invalid Message Data with Receive topic",
  85. message: &amqp.Message{
  86. Data: [][]byte{{1, 2, 3, 4, 5}},
  87. Properties: &amqp.MessageProperties{
  88. To: &validReceiveTopicVersion,
  89. },
  90. },
  91. err: errors.New("cannot parse invalid wire-format data"),
  92. },
  93. {
  94. name: "Empty Message Data with Egress topic",
  95. message: &amqp.Message{
  96. Data: [][]byte{{}},
  97. Properties: &amqp.MessageProperties{
  98. To: &validEgressTopicVersion,
  99. },
  100. },
  101. err: errEmptyPayload,
  102. },
  103. {
  104. name: "Invalid Message Data with Egress topic",
  105. message: &amqp.Message{
  106. Data: [][]byte{{1, 2, 3, 4, 5}},
  107. Properties: &amqp.MessageProperties{
  108. To: &validEgressTopicVersion,
  109. },
  110. },
  111. err: errors.New("cannot parse invalid wire-format data"),
  112. },
  113. {
  114. name: "Valid Receive Message Data",
  115. message: &amqp.Message{
  116. Data: [][]byte{func() []byte {
  117. // TODO capture binary data of this directly, ie. real world data.
  118. var (
  119. protocolVersion = "5.0"
  120. applicationMessageID = "someMessageID"
  121. correlationID = "someConversationID"
  122. priority = uint32(1)
  123. ttl = int64(86000)
  124. routerName = "someRouterName"
  125. vpnName = "someVpnName"
  126. replyToTopic = "someReplyToTopic"
  127. topic = "someTopic"
  128. )
  129. validData, err := proto.Marshal(&receive_v1.SpanData{
  130. TraceId: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
  131. SpanId: []byte{7, 6, 5, 4, 3, 2, 1, 0},
  132. StartTimeUnixNano: 1234567890,
  133. EndTimeUnixNano: 2234567890,
  134. RouterName: routerName,
  135. MessageVpnName: &vpnName,
  136. SolosVersion: "10.0.0",
  137. Protocol: "MQTT",
  138. ProtocolVersion: &protocolVersion,
  139. ApplicationMessageId: &applicationMessageID,
  140. CorrelationId: &correlationID,
  141. DeliveryMode: receive_v1.SpanData_DIRECT,
  142. BinaryAttachmentSize: 1000,
  143. XmlAttachmentSize: 200,
  144. MetadataSize: 34,
  145. ClientUsername: "someClientUsername",
  146. ClientName: "someClient1234",
  147. Topic: topic,
  148. ReplyToTopic: &replyToTopic,
  149. ReplicationGroupMessageId: []byte{0x01, 0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
  150. Priority: &priority,
  151. Ttl: &ttl,
  152. DmqEligible: true,
  153. DroppedEnqueueEventsSuccess: 42,
  154. DroppedEnqueueEventsFailed: 24,
  155. HostIp: []byte{1, 2, 3, 4},
  156. HostPort: 55555,
  157. PeerIp: []byte{35, 69, 4, 37, 44, 161, 0, 0, 0, 0, 5, 103, 86, 115, 35, 181},
  158. PeerPort: 12345,
  159. BrokerReceiveTimeUnixNano: 1357924680,
  160. DroppedApplicationMessageProperties: false,
  161. UserProperties: map[string]*receive_v1.SpanData_UserPropertyValue{
  162. "special_key": {
  163. Value: &receive_v1.SpanData_UserPropertyValue_BoolValue{
  164. BoolValue: true,
  165. },
  166. },
  167. },
  168. EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
  169. {
  170. Dest: &receive_v1.SpanData_EnqueueEvent_QueueName{QueueName: "somequeue"},
  171. TimeUnixNano: 123456789,
  172. },
  173. {
  174. Dest: &receive_v1.SpanData_EnqueueEvent_TopicEndpointName{TopicEndpointName: "sometopic"},
  175. TimeUnixNano: 2345678,
  176. },
  177. },
  178. TransactionEvent: &receive_v1.SpanData_TransactionEvent{
  179. TimeUnixNano: 123456789,
  180. Type: receive_v1.SpanData_TransactionEvent_SESSION_TIMEOUT,
  181. Initiator: receive_v1.SpanData_TransactionEvent_CLIENT,
  182. TransactionId: &receive_v1.SpanData_TransactionEvent_LocalId{
  183. LocalId: &receive_v1.SpanData_TransactionEvent_LocalTransactionId{
  184. TransactionId: 12345,
  185. SessionId: 67890,
  186. SessionName: "my-session-name",
  187. },
  188. },
  189. },
  190. })
  191. require.NoError(t, err)
  192. return validData
  193. }()},
  194. Properties: &amqp.MessageProperties{
  195. To: &validReceiveTopicVersion,
  196. },
  197. },
  198. want: func() *ptrace.Traces {
  199. traces := ptrace.NewTraces()
  200. resource := traces.ResourceSpans().AppendEmpty()
  201. populateAttributes(t, resource.Resource().Attributes(), map[string]any{
  202. "service.name": "someRouterName",
  203. "service.instance.id": "someVpnName",
  204. "service.version": "10.0.0",
  205. })
  206. instrumentation := resource.ScopeSpans().AppendEmpty()
  207. span := instrumentation.Spans().AppendEmpty()
  208. span.SetTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
  209. span.SetSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0})
  210. span.SetStartTimestamp(1234567890)
  211. span.SetEndTimestamp(2234567890)
  212. // expect some constants
  213. span.SetKind(5)
  214. span.SetName("(topic) receive")
  215. span.Status().SetCode(ptrace.StatusCodeUnset)
  216. spanAttrs := span.Attributes()
  217. populateAttributes(t, spanAttrs, map[string]any{
  218. "messaging.system": "SolacePubSub+",
  219. "messaging.operation": "receive",
  220. "messaging.protocol": "MQTT",
  221. "messaging.protocol_version": "5.0",
  222. "messaging.message_id": "someMessageID",
  223. "messaging.conversation_id": "someConversationID",
  224. "messaging.message_payload_size_bytes": int64(1234),
  225. "messaging.destination": "someTopic",
  226. "messaging.solace.client_username": "someClientUsername",
  227. "messaging.solace.client_name": "someClient1234",
  228. "messaging.solace.replication_group_message_id": "rmid1:00010-40910192431-40516479-90a9c4e1",
  229. "messaging.solace.priority": int64(1),
  230. "messaging.solace.ttl": int64(86000),
  231. "messaging.solace.dmq_eligible": true,
  232. "messaging.solace.dropped_enqueue_events_success": int64(42),
  233. "messaging.solace.dropped_enqueue_events_failed": int64(24),
  234. "messaging.solace.reply_to_topic": "someReplyToTopic",
  235. "messaging.solace.broker_receive_time_unix_nano": int64(1357924680),
  236. "messaging.solace.dropped_application_message_properties": false,
  237. "messaging.solace.delivery_mode": "direct",
  238. "net.host.ip": "1.2.3.4",
  239. "net.host.port": int64(55555),
  240. "net.peer.ip": "2345:425:2ca1::567:5673:23b5",
  241. "net.peer.port": int64(12345),
  242. "messaging.solace.user_properties.special_key": true,
  243. })
  244. populateEvent(t, span, "somequeue enqueue", 123456789, map[string]any{
  245. "messaging.solace.destination_type": "queue",
  246. "messaging.solace.rejects_all_enqueues": false,
  247. })
  248. populateEvent(t, span, "sometopic enqueue", 2345678, map[string]any{
  249. "messaging.solace.destination_type": "topic-endpoint",
  250. "messaging.solace.rejects_all_enqueues": false,
  251. })
  252. populateEvent(t, span, "session_timeout", 123456789, map[string]any{
  253. "messaging.solace.transaction_initiator": "client",
  254. "messaging.solace.transaction_id": 12345,
  255. "messaging.solace.transacted_session_name": "my-session-name",
  256. "messaging.solace.transacted_session_id": 67890,
  257. })
  258. return &traces
  259. }(),
  260. },
  261. {
  262. name: "Valid Egress Message Data",
  263. message: &amqp.Message{
  264. Data: [][]byte{func() []byte {
  265. // TODO capture binary data of this directly, ie. real world data.
  266. var (
  267. routerName = "someRouterName"
  268. vpnName = "someVpnName"
  269. )
  270. validData, err := proto.Marshal(&egress_v1.SpanData{
  271. RouterName: routerName,
  272. MessageVpnName: &vpnName,
  273. SolosVersion: "10.0.0",
  274. EgressSpans: func() []*egress_v1.SpanData_EgressSpan {
  275. spans := make([]*egress_v1.SpanData_EgressSpan, len(validEgressSpans))
  276. i := 0
  277. for _, spanRef := range validEgressSpans {
  278. span := spanRef.in
  279. spans[i] = span
  280. i++
  281. }
  282. return spans
  283. }(),
  284. })
  285. require.NoError(t, err)
  286. return validData
  287. }()},
  288. Properties: &amqp.MessageProperties{
  289. To: &validEgressTopicVersion,
  290. },
  291. },
  292. want: func() *ptrace.Traces {
  293. traces := ptrace.NewTraces()
  294. resource := traces.ResourceSpans().AppendEmpty()
  295. populateAttributes(t, resource.Resource().Attributes(), map[string]any{
  296. "service.name": "someRouterName",
  297. "service.instance.id": "someVpnName",
  298. "service.version": "10.0.0",
  299. })
  300. instrumentation := resource.ScopeSpans().AppendEmpty()
  301. // first send span
  302. for _, spanRef := range validEgressSpans {
  303. span := spanRef.out
  304. newSpan := instrumentation.Spans().AppendEmpty()
  305. span.CopyTo(newSpan)
  306. }
  307. return &traces
  308. }(),
  309. },
  310. }
  311. for _, tt := range tests {
  312. t.Run(tt.name, func(t *testing.T) {
  313. u := newTracesUnmarshaller(zap.NewNop(), newTestMetrics(t))
  314. traces, err := u.unmarshal(tt.message)
  315. if tt.err != nil {
  316. require.Error(t, err)
  317. assert.Contains(t, err.Error(), tt.err.Error())
  318. } else {
  319. assert.NoError(t, err)
  320. }
  321. if tt.want != nil {
  322. require.NotNil(t, traces)
  323. require.Equal(t, 1, traces.ResourceSpans().Len())
  324. expectedResource := tt.want.ResourceSpans().At(0)
  325. resource := traces.ResourceSpans().At(0)
  326. assert.Equal(t, expectedResource.Resource().Attributes().AsRaw(), resource.Resource().Attributes().AsRaw())
  327. require.Equal(t, 1, resource.ScopeSpans().Len())
  328. expectedInstrumentation := expectedResource.ScopeSpans().At(0)
  329. instrumentation := resource.ScopeSpans().At(0)
  330. assert.Equal(t, expectedInstrumentation.Scope(), instrumentation.Scope())
  331. require.Equal(t, expectedInstrumentation.Spans().Len(), instrumentation.Spans().Len())
  332. for i := 0; i < expectedInstrumentation.Spans().Len(); i++ {
  333. expectedSpan := expectedInstrumentation.Spans().At(i)
  334. span := instrumentation.Spans().At(i)
  335. compareSpans(t, expectedSpan, span)
  336. }
  337. } else {
  338. assert.Equal(t, ptrace.Traces{}, traces)
  339. }
  340. })
  341. }
  342. }
  343. // common helpers
  344. func compareSpans(t *testing.T, expected, actual ptrace.Span) {
  345. assert.Equal(t, expected.Name(), actual.Name())
  346. assert.Equal(t, expected.TraceID(), actual.TraceID())
  347. assert.Equal(t, expected.SpanID(), actual.SpanID())
  348. assert.Equal(t, expected.ParentSpanID(), actual.ParentSpanID())
  349. assert.Equal(t, expected.StartTimestamp(), actual.StartTimestamp())
  350. assert.Equal(t, expected.EndTimestamp(), actual.EndTimestamp())
  351. assert.Equal(t, expected.Attributes().AsRaw(), actual.Attributes().AsRaw())
  352. require.Equal(t, expected.Events().Len(), actual.Events().Len())
  353. for i := 0; i < expected.Events().Len(); i++ {
  354. lessFunc := func(a, b ptrace.SpanEvent) bool {
  355. return a.Name() < b.Name() // choose any comparison here
  356. }
  357. ee := expected.Events()
  358. ee.Sort(lessFunc)
  359. expectedEvent := ee.At(i)
  360. ae := actual.Events()
  361. ae.Sort(lessFunc)
  362. actualEvent := ae.At(i)
  363. assert.Equal(t, expectedEvent.Name(), actualEvent.Name())
  364. assert.Equal(t, expectedEvent.Timestamp(), actualEvent.Timestamp())
  365. assert.Equal(t, expectedEvent.Attributes().AsRaw(), actualEvent.Attributes().AsRaw())
  366. }
  367. }
  368. func populateEvent(t *testing.T, span ptrace.Span, name string, timestamp uint64, attributes map[string]any) {
  369. spanEvent := span.Events().AppendEmpty()
  370. spanEvent.SetName(name)
  371. spanEvent.SetTimestamp(pcommon.Timestamp(timestamp))
  372. populateAttributes(t, spanEvent.Attributes(), attributes)
  373. }
  374. func populateAttributes(t *testing.T, attrMap pcommon.Map, attributes map[string]any) {
  375. for key, val := range attributes {
  376. switch casted := val.(type) {
  377. case string:
  378. attrMap.PutStr(key, casted)
  379. case int64:
  380. attrMap.PutInt(key, casted)
  381. case int:
  382. attrMap.PutInt(key, int64(casted))
  383. case bool:
  384. attrMap.PutBool(key, casted)
  385. default:
  386. require.Fail(t, "Test setup issue: unknown type, could not insert data")
  387. }
  388. }
  389. }