unmarshaller_receive_test.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package solacereceiver
  4. import (
  5. "fmt"
  6. "testing"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/stretchr/testify/require"
  9. "go.opentelemetry.io/collector/pdata/pcommon"
  10. "go.opentelemetry.io/collector/pdata/ptrace"
  11. "go.uber.org/zap"
  12. receive_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/receive/v1"
  13. )
  14. func TestReceiveUnmarshallerMapResourceSpan(t *testing.T) {
  15. var (
  16. routerName = "someRouterName"
  17. vpnName = "someVpnName"
  18. version = "10.0.0"
  19. )
  20. tests := []struct {
  21. name string
  22. spanData *receive_v1.SpanData
  23. want map[string]any
  24. expectedUnmarshallingErrors any
  25. }{
  26. {
  27. name: "Maps All Fields When Present",
  28. spanData: &receive_v1.SpanData{
  29. RouterName: routerName,
  30. MessageVpnName: &vpnName,
  31. SolosVersion: version,
  32. },
  33. want: map[string]any{
  34. "service.name": routerName,
  35. "service.instance.id": vpnName,
  36. "service.version": version,
  37. },
  38. },
  39. {
  40. name: "Does Not Map Fields When Not Present",
  41. spanData: &receive_v1.SpanData{},
  42. want: map[string]any{
  43. "service.version": "",
  44. "service.name": "",
  45. },
  46. },
  47. }
  48. for _, tt := range tests {
  49. t.Run(tt.name, func(t *testing.T) {
  50. u := newTestReceiveV1Unmarshaller(t)
  51. actual := pcommon.NewMap()
  52. u.mapResourceSpanAttributes(tt.spanData, actual)
  53. assert.Equal(t, tt.want, actual.AsRaw())
  54. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors)
  55. })
  56. }
  57. }
  58. // Tests the received span to traces mappings
  59. // Includes all required opentelemetry fields such as trace ID, span ID, etc.
  60. func TestReceiveUnmarshallerMapClientSpanData(t *testing.T) {
  61. someTraceState := "some trace status"
  62. tests := []struct {
  63. name string
  64. data *receive_v1.SpanData
  65. want func(ptrace.Span)
  66. }{
  67. // no trace state no status no parent span
  68. {
  69. name: "Without Optional Fields",
  70. data: &receive_v1.SpanData{
  71. TraceId: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
  72. SpanId: []byte{7, 6, 5, 4, 3, 2, 1, 0},
  73. StartTimeUnixNano: 1234567890,
  74. EndTimeUnixNano: 2234567890,
  75. },
  76. want: func(span ptrace.Span) {
  77. span.SetTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
  78. span.SetSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0})
  79. span.SetStartTimestamp(1234567890)
  80. span.SetEndTimestamp(2234567890)
  81. // expect some constants
  82. span.SetKind(5)
  83. span.SetName("(topic) receive")
  84. span.Status().SetCode(ptrace.StatusCodeUnset)
  85. },
  86. },
  87. // trace state status and parent span
  88. {
  89. name: "With Optional Fields",
  90. data: &receive_v1.SpanData{
  91. TraceId: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
  92. SpanId: []byte{7, 6, 5, 4, 3, 2, 1, 0},
  93. StartTimeUnixNano: 1234567890,
  94. EndTimeUnixNano: 2234567890,
  95. ParentSpanId: []byte{15, 14, 13, 12, 11, 10, 9, 8},
  96. TraceState: &someTraceState,
  97. ErrorDescription: "some error",
  98. },
  99. want: func(span ptrace.Span) {
  100. span.SetTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
  101. span.SetSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0})
  102. span.SetStartTimestamp(1234567890)
  103. span.SetEndTimestamp(2234567890)
  104. span.SetParentSpanID([8]byte{15, 14, 13, 12, 11, 10, 9, 8})
  105. span.TraceState().FromRaw(someTraceState)
  106. span.Status().SetCode(ptrace.StatusCodeError)
  107. span.Status().SetMessage("some error")
  108. // expect some constants
  109. span.SetKind(5)
  110. span.SetName("(topic) receive")
  111. },
  112. },
  113. }
  114. for _, tt := range tests {
  115. t.Run(tt.name, func(t *testing.T) {
  116. u := newTestReceiveV1Unmarshaller(t)
  117. actual := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
  118. u.mapClientSpanData(tt.data, actual)
  119. expected := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
  120. tt.want(expected)
  121. assert.Equal(t, expected, actual)
  122. })
  123. }
  124. }
  125. func TestReceiveUnmarshallerMapClientSpanAttributes(t *testing.T) {
  126. var (
  127. protocolVersion = "5.0"
  128. applicationMessageID = "someMessageID"
  129. correlationID = "someConversationID"
  130. replyToTopic = "someReplyToTopic"
  131. baggageString = `someKey=someVal;someProp=someOtherThing,someOtherKey=someOtherVal;someProp=NewProp123;someOtherProp=AnotherProp192`
  132. invalidBaggageString = `someKey"=someVal;someProp=someOtherThing`
  133. priority = uint32(1)
  134. ttl = int64(86000)
  135. )
  136. tests := []struct {
  137. name string
  138. spanData *receive_v1.SpanData
  139. want map[string]any
  140. expectedUnmarshallingErrors any
  141. }{
  142. {
  143. name: "With All Valid Attributes",
  144. spanData: &receive_v1.SpanData{
  145. Protocol: "MQTT",
  146. ProtocolVersion: &protocolVersion,
  147. ApplicationMessageId: &applicationMessageID,
  148. CorrelationId: &correlationID,
  149. BinaryAttachmentSize: 1000,
  150. XmlAttachmentSize: 200,
  151. MetadataSize: 34,
  152. ClientUsername: "someClientUsername",
  153. ClientName: "someClient1234",
  154. ReplyToTopic: &replyToTopic,
  155. DeliveryMode: receive_v1.SpanData_PERSISTENT,
  156. Topic: "someTopic",
  157. ReplicationGroupMessageId: []byte{0x01, 0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
  158. Priority: &priority,
  159. Ttl: &ttl,
  160. DmqEligible: true,
  161. DroppedEnqueueEventsSuccess: 42,
  162. DroppedEnqueueEventsFailed: 24,
  163. HostIp: []byte{1, 2, 3, 4},
  164. HostPort: 55555,
  165. PeerIp: []byte{35, 69, 4, 37, 44, 161, 0, 0, 0, 0, 5, 103, 86, 115, 35, 181},
  166. PeerPort: 12345,
  167. BrokerReceiveTimeUnixNano: 1357924680,
  168. DroppedApplicationMessageProperties: false,
  169. Baggage: &baggageString,
  170. UserProperties: map[string]*receive_v1.SpanData_UserPropertyValue{
  171. "special_key": {
  172. Value: &receive_v1.SpanData_UserPropertyValue_BoolValue{
  173. BoolValue: true,
  174. },
  175. },
  176. },
  177. },
  178. want: map[string]any{
  179. "messaging.system": "SolacePubSub+",
  180. "messaging.operation": "receive",
  181. "messaging.protocol": "MQTT",
  182. "messaging.protocol_version": "5.0",
  183. "messaging.message_id": "someMessageID",
  184. "messaging.conversation_id": "someConversationID",
  185. "messaging.message_payload_size_bytes": int64(1234),
  186. "messaging.destination": "someTopic",
  187. "messaging.solace.client_username": "someClientUsername",
  188. "messaging.solace.client_name": "someClient1234",
  189. "messaging.solace.replication_group_message_id": "rmid1:00010-40910192431-40516479-90a9c4e1",
  190. "messaging.solace.priority": int64(1),
  191. "messaging.solace.ttl": int64(86000),
  192. "messaging.solace.dmq_eligible": true,
  193. "messaging.solace.dropped_enqueue_events_success": int64(42),
  194. "messaging.solace.dropped_enqueue_events_failed": int64(24),
  195. "messaging.solace.reply_to_topic": "someReplyToTopic",
  196. "messaging.solace.delivery_mode": "persistent",
  197. "net.host.ip": "1.2.3.4",
  198. "net.host.port": int64(55555),
  199. "net.peer.ip": "2345:425:2ca1::567:5673:23b5",
  200. "net.peer.port": int64(12345),
  201. "messaging.solace.user_properties.special_key": true,
  202. "messaging.solace.broker_receive_time_unix_nano": int64(1357924680),
  203. "messaging.solace.dropped_application_message_properties": false,
  204. "messaging.solace.message.baggage.someKey": "someVal",
  205. "messaging.solace.message.baggage_metadata.someKey": "someProp=someOtherThing",
  206. "messaging.solace.message.baggage.someOtherKey": `someOtherVal`,
  207. "messaging.solace.message.baggage_metadata.someOtherKey": "someProp=NewProp123;someOtherProp=AnotherProp192",
  208. },
  209. },
  210. {
  211. name: "With Only Required Fields",
  212. spanData: &receive_v1.SpanData{
  213. Protocol: "MQTT",
  214. BinaryAttachmentSize: 1000,
  215. XmlAttachmentSize: 200,
  216. MetadataSize: 34,
  217. ClientUsername: "someClientUsername",
  218. ClientName: "someClient1234",
  219. Topic: "someTopic",
  220. DeliveryMode: receive_v1.SpanData_NON_PERSISTENT,
  221. DmqEligible: true,
  222. DroppedEnqueueEventsSuccess: 42,
  223. DroppedEnqueueEventsFailed: 24,
  224. HostIp: []byte{1, 2, 3, 4},
  225. HostPort: 55555,
  226. PeerIp: []byte{35, 69, 4, 37, 44, 161, 0, 0, 0, 0, 5, 103, 86, 115, 35, 181},
  227. PeerPort: 12345,
  228. BrokerReceiveTimeUnixNano: 1357924680,
  229. DroppedApplicationMessageProperties: true,
  230. UserProperties: map[string]*receive_v1.SpanData_UserPropertyValue{
  231. "special_key": nil,
  232. },
  233. },
  234. want: map[string]any{
  235. "messaging.system": "SolacePubSub+",
  236. "messaging.operation": "receive",
  237. "messaging.protocol": "MQTT",
  238. "messaging.message_payload_size_bytes": int64(1234),
  239. "messaging.destination": "someTopic",
  240. "messaging.solace.client_username": "someClientUsername",
  241. "messaging.solace.client_name": "someClient1234",
  242. "messaging.solace.dmq_eligible": true,
  243. "messaging.solace.delivery_mode": "non_persistent",
  244. "messaging.solace.dropped_enqueue_events_success": int64(42),
  245. "messaging.solace.dropped_enqueue_events_failed": int64(24),
  246. "net.host.ip": "1.2.3.4",
  247. "net.host.port": int64(55555),
  248. "net.peer.ip": "2345:425:2ca1::567:5673:23b5",
  249. "net.peer.port": int64(12345),
  250. "messaging.solace.broker_receive_time_unix_nano": int64(1357924680),
  251. "messaging.solace.dropped_application_message_properties": true,
  252. },
  253. },
  254. {
  255. name: "With Some Invalid Fields",
  256. spanData: &receive_v1.SpanData{
  257. Protocol: "MQTT",
  258. BinaryAttachmentSize: 1000,
  259. XmlAttachmentSize: 200,
  260. MetadataSize: 34,
  261. ClientUsername: "someClientUsername",
  262. ClientName: "someClient1234",
  263. Topic: "someTopic",
  264. DeliveryMode: receive_v1.SpanData_DeliveryMode(1000),
  265. DmqEligible: true,
  266. DroppedEnqueueEventsSuccess: 42,
  267. DroppedEnqueueEventsFailed: 24,
  268. HostPort: 55555,
  269. PeerPort: 12345,
  270. BrokerReceiveTimeUnixNano: 1357924680,
  271. DroppedApplicationMessageProperties: true,
  272. Baggage: &invalidBaggageString,
  273. UserProperties: map[string]*receive_v1.SpanData_UserPropertyValue{
  274. "special_key": nil,
  275. },
  276. },
  277. // we no longer expect the port when the IP is not present
  278. want: map[string]any{
  279. "messaging.system": "SolacePubSub+",
  280. "messaging.operation": "receive",
  281. "messaging.protocol": "MQTT",
  282. "messaging.message_payload_size_bytes": int64(1234),
  283. "messaging.destination": "someTopic",
  284. "messaging.solace.client_username": "someClientUsername",
  285. "messaging.solace.client_name": "someClient1234",
  286. "messaging.solace.dmq_eligible": true,
  287. "messaging.solace.delivery_mode": "Unknown Delivery Mode (1000)",
  288. "messaging.solace.dropped_enqueue_events_success": int64(42),
  289. "messaging.solace.dropped_enqueue_events_failed": int64(24),
  290. "messaging.solace.broker_receive_time_unix_nano": int64(1357924680),
  291. "messaging.solace.dropped_application_message_properties": true,
  292. },
  293. // Invalid delivery mode, missing IPs, invalid baggage string
  294. expectedUnmarshallingErrors: 2,
  295. },
  296. }
  297. for _, tt := range tests {
  298. t.Run(tt.name, func(t *testing.T) {
  299. u := newTestReceiveV1Unmarshaller(t)
  300. actual := pcommon.NewMap()
  301. u.mapClientSpanAttributes(tt.spanData, actual)
  302. assert.Equal(t, tt.want, actual.AsRaw())
  303. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors)
  304. })
  305. }
  306. }
  307. // Validate that all event types are properly handled and appended into the span data
  308. func TestReceiveUnmarshallerEvents(t *testing.T) {
  309. someErrorString := "some error"
  310. somePartitionNumber := uint32(345)
  311. tests := []struct {
  312. name string
  313. spanData *receive_v1.SpanData
  314. populateExpectedSpan func(span ptrace.Span)
  315. unmarshallingErrors any
  316. }{
  317. { // don't expect any events when none are present in the span data
  318. name: "No Events",
  319. spanData: &receive_v1.SpanData{},
  320. populateExpectedSpan: func(span ptrace.Span) {},
  321. },
  322. { // when an enqueue event is present, expect it to be added to the span events
  323. name: "Enqueue Event Queue",
  324. spanData: &receive_v1.SpanData{
  325. EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
  326. {
  327. Dest: &receive_v1.SpanData_EnqueueEvent_QueueName{QueueName: "somequeue"},
  328. TimeUnixNano: 123456789,
  329. PartitionNumber: &somePartitionNumber,
  330. },
  331. },
  332. },
  333. populateExpectedSpan: func(span ptrace.Span) {
  334. populateEvent(t, span, "somequeue enqueue", 123456789, map[string]any{
  335. "messaging.solace.destination_type": "queue",
  336. "messaging.solace.rejects_all_enqueues": false,
  337. "messaging.solace.partition_number": 345,
  338. })
  339. },
  340. },
  341. { // when a topic endpoint enqueue event is present, expect it to be added to the span events
  342. name: "Enqueue Event Topic Endpoint",
  343. spanData: &receive_v1.SpanData{
  344. EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
  345. {
  346. Dest: &receive_v1.SpanData_EnqueueEvent_TopicEndpointName{TopicEndpointName: "sometopic"},
  347. TimeUnixNano: 123456789,
  348. ErrorDescription: &someErrorString,
  349. RejectsAllEnqueues: true,
  350. },
  351. },
  352. },
  353. populateExpectedSpan: func(span ptrace.Span) {
  354. populateEvent(t, span, "sometopic enqueue", 123456789, map[string]any{
  355. "messaging.solace.destination_type": "topic-endpoint",
  356. "messaging.solace.enqueue_error_message": someErrorString,
  357. "messaging.solace.rejects_all_enqueues": true,
  358. })
  359. },
  360. },
  361. { // when a both a queue and topic endpoint enqueue event is present, expect it to be added to the span events
  362. name: "Enqueue Event Queue and Topic Endpoint",
  363. spanData: &receive_v1.SpanData{
  364. EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
  365. {
  366. Dest: &receive_v1.SpanData_EnqueueEvent_QueueName{QueueName: "somequeue"},
  367. TimeUnixNano: 123456789,
  368. },
  369. {
  370. Dest: &receive_v1.SpanData_EnqueueEvent_TopicEndpointName{TopicEndpointName: "sometopic"},
  371. TimeUnixNano: 2345678,
  372. },
  373. },
  374. },
  375. populateExpectedSpan: func(span ptrace.Span) {
  376. populateEvent(t, span, "somequeue enqueue", 123456789, map[string]any{
  377. "messaging.solace.destination_type": "queue",
  378. "messaging.solace.rejects_all_enqueues": false,
  379. })
  380. populateEvent(t, span, "sometopic enqueue", 2345678, map[string]any{
  381. "messaging.solace.destination_type": "topic-endpoint",
  382. "messaging.solace.rejects_all_enqueues": false,
  383. })
  384. },
  385. },
  386. { // when an enqueue event does not have a valid dest (ie. nil)
  387. name: "Enqueue Event no Dest",
  388. spanData: &receive_v1.SpanData{
  389. EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
  390. {
  391. Dest: nil,
  392. TimeUnixNano: 123456789,
  393. },
  394. },
  395. },
  396. populateExpectedSpan: func(span ptrace.Span) {},
  397. unmarshallingErrors: 1,
  398. },
  399. { // Local Transaction
  400. name: "Local Transaction Event",
  401. spanData: &receive_v1.SpanData{
  402. TransactionEvent: &receive_v1.SpanData_TransactionEvent{
  403. TimeUnixNano: 123456789,
  404. Type: receive_v1.SpanData_TransactionEvent_COMMIT,
  405. Initiator: receive_v1.SpanData_TransactionEvent_CLIENT,
  406. TransactionId: &receive_v1.SpanData_TransactionEvent_LocalId{
  407. LocalId: &receive_v1.SpanData_TransactionEvent_LocalTransactionId{
  408. TransactionId: 12345,
  409. SessionId: 67890,
  410. SessionName: "my-session-name",
  411. },
  412. },
  413. },
  414. },
  415. populateExpectedSpan: func(span ptrace.Span) {
  416. populateEvent(t, span, "commit", 123456789, map[string]any{
  417. "messaging.solace.transaction_initiator": "client",
  418. "messaging.solace.transaction_id": 12345,
  419. "messaging.solace.transacted_session_name": "my-session-name",
  420. "messaging.solace.transacted_session_id": 67890,
  421. })
  422. },
  423. },
  424. { // XA transaction
  425. name: "XA Transaction Event",
  426. spanData: &receive_v1.SpanData{
  427. TransactionEvent: &receive_v1.SpanData_TransactionEvent{
  428. TimeUnixNano: 123456789,
  429. Type: receive_v1.SpanData_TransactionEvent_END,
  430. Initiator: receive_v1.SpanData_TransactionEvent_ADMIN,
  431. TransactionId: &receive_v1.SpanData_TransactionEvent_Xid_{
  432. Xid: &receive_v1.SpanData_TransactionEvent_Xid{
  433. FormatId: 123,
  434. BranchQualifier: []byte{0, 8, 20, 254},
  435. GlobalId: []byte{128, 64, 32, 16, 8, 4, 2, 1, 0},
  436. },
  437. },
  438. },
  439. },
  440. populateExpectedSpan: func(span ptrace.Span) {
  441. populateEvent(t, span, "end", 123456789, map[string]any{
  442. "messaging.solace.transaction_initiator": "administrator",
  443. "messaging.solace.transaction_xid": "0000007b-000814fe-804020100804020100",
  444. })
  445. },
  446. },
  447. { // XA Transaction with no branch qualifier or global ID and with an error
  448. name: "XA Transaction Event with nil fields and error",
  449. spanData: &receive_v1.SpanData{
  450. TransactionEvent: &receive_v1.SpanData_TransactionEvent{
  451. TimeUnixNano: 123456789,
  452. Type: receive_v1.SpanData_TransactionEvent_PREPARE,
  453. Initiator: receive_v1.SpanData_TransactionEvent_BROKER,
  454. TransactionId: &receive_v1.SpanData_TransactionEvent_Xid_{
  455. Xid: &receive_v1.SpanData_TransactionEvent_Xid{
  456. FormatId: 123,
  457. BranchQualifier: nil,
  458. GlobalId: nil,
  459. },
  460. },
  461. ErrorDescription: &someErrorString,
  462. },
  463. },
  464. populateExpectedSpan: func(span ptrace.Span) {
  465. populateEvent(t, span, "prepare", 123456789, map[string]any{
  466. "messaging.solace.transaction_initiator": "broker",
  467. "messaging.solace.transaction_xid": "0000007b--",
  468. "messaging.solace.transaction_error_message": someErrorString,
  469. })
  470. },
  471. },
  472. { // Type of transaction not handled
  473. name: "Unknown Transaction Type and no ID",
  474. spanData: &receive_v1.SpanData{
  475. TransactionEvent: &receive_v1.SpanData_TransactionEvent{
  476. TimeUnixNano: 123456789,
  477. Type: receive_v1.SpanData_TransactionEvent_Type(12345),
  478. },
  479. },
  480. populateExpectedSpan: func(span ptrace.Span) {
  481. populateEvent(t, span, "Unknown Transaction Event (12345)", 123456789, map[string]any{
  482. "messaging.solace.transaction_initiator": "client",
  483. })
  484. },
  485. unmarshallingErrors: 2,
  486. },
  487. { // Type of ID not handled, type of initiator not handled
  488. name: "Unknown Transaction Initiator and no ID",
  489. spanData: &receive_v1.SpanData{
  490. TransactionEvent: &receive_v1.SpanData_TransactionEvent{
  491. TimeUnixNano: 123456789,
  492. Type: receive_v1.SpanData_TransactionEvent_ROLLBACK,
  493. Initiator: receive_v1.SpanData_TransactionEvent_Initiator(12345),
  494. TransactionId: nil,
  495. },
  496. },
  497. populateExpectedSpan: func(span ptrace.Span) {
  498. populateEvent(t, span, "rollback", 123456789, map[string]any{
  499. "messaging.solace.transaction_initiator": "Unknown Transaction Initiator (12345)",
  500. })
  501. },
  502. unmarshallingErrors: 2,
  503. },
  504. { // when a both a queue and topic endpoint enqueue event is present, expect it to be added to the span events
  505. name: "Multiple Events",
  506. spanData: &receive_v1.SpanData{
  507. EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
  508. {
  509. Dest: &receive_v1.SpanData_EnqueueEvent_QueueName{QueueName: "somequeue"},
  510. TimeUnixNano: 123456789,
  511. },
  512. {
  513. Dest: &receive_v1.SpanData_EnqueueEvent_TopicEndpointName{TopicEndpointName: "sometopic"},
  514. TimeUnixNano: 2345678,
  515. RejectsAllEnqueues: true,
  516. },
  517. },
  518. TransactionEvent: &receive_v1.SpanData_TransactionEvent{
  519. TimeUnixNano: 123456789,
  520. Type: receive_v1.SpanData_TransactionEvent_ROLLBACK_ONLY,
  521. Initiator: receive_v1.SpanData_TransactionEvent_CLIENT,
  522. TransactionId: &receive_v1.SpanData_TransactionEvent_LocalId{
  523. LocalId: &receive_v1.SpanData_TransactionEvent_LocalTransactionId{
  524. TransactionId: 12345,
  525. SessionId: 67890,
  526. SessionName: "my-session-name",
  527. },
  528. },
  529. },
  530. },
  531. populateExpectedSpan: func(span ptrace.Span) {
  532. populateEvent(t, span, "somequeue enqueue", 123456789, map[string]any{
  533. "messaging.solace.destination_type": "queue",
  534. "messaging.solace.rejects_all_enqueues": false,
  535. })
  536. populateEvent(t, span, "sometopic enqueue", 2345678, map[string]any{
  537. "messaging.solace.destination_type": "topic-endpoint",
  538. "messaging.solace.rejects_all_enqueues": true,
  539. })
  540. populateEvent(t, span, "rollback_only", 123456789, map[string]any{
  541. "messaging.solace.transaction_initiator": "client",
  542. "messaging.solace.transaction_id": 12345,
  543. "messaging.solace.transacted_session_name": "my-session-name",
  544. "messaging.solace.transacted_session_id": 67890,
  545. })
  546. },
  547. },
  548. }
  549. for _, tt := range tests {
  550. t.Run(tt.name, func(t *testing.T) {
  551. u := newTestReceiveV1Unmarshaller(t)
  552. expected := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
  553. tt.populateExpectedSpan(expected)
  554. actual := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
  555. u.mapEvents(tt.spanData, actual)
  556. // order is nondeterministic for attributes, so we must sort to get a valid comparison
  557. compareSpans(t, expected, actual)
  558. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.unmarshallingErrors)
  559. })
  560. }
  561. }
  562. func TestReceiveUnmarshallerRGMID(t *testing.T) {
  563. tests := []struct {
  564. name string
  565. in []byte
  566. expected string
  567. numErr any
  568. }{
  569. {
  570. name: "Valid RGMID",
  571. in: []byte{0x01, 0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
  572. expected: "rmid1:00010-40910192431-40516479-90a9c4e1",
  573. },
  574. {
  575. name: "Bad RGMID Version",
  576. in: []byte{0x02, 0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
  577. expected: "0200010409101924314051647990a9c4e1", // expect default behavior of hex dump
  578. numErr: 1,
  579. },
  580. {
  581. name: "Bad RGMID length",
  582. in: []byte{0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
  583. expected: "00010409101924314051647990a9c4e1", // expect default behavior of hex dump
  584. numErr: 1,
  585. },
  586. {
  587. name: "Nil RGMID",
  588. in: nil,
  589. expected: "",
  590. },
  591. }
  592. for _, tt := range tests {
  593. t.Run(tt.name, func(t *testing.T) {
  594. u := newTestReceiveV1Unmarshaller(t)
  595. actual := u.rgmidToString(tt.in)
  596. assert.Equal(t, tt.expected, actual)
  597. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.numErr)
  598. })
  599. }
  600. }
  601. func TestReceiveUnmarshallerReceiveBaggageString(t *testing.T) {
  602. testCases := []struct {
  603. name string
  604. baggage string
  605. expected func(pcommon.Map)
  606. errStr string
  607. }{
  608. {
  609. name: "Valid baggage",
  610. baggage: `someKey=someVal`,
  611. expected: func(m pcommon.Map) {
  612. assert.NoError(t, m.FromRaw(map[string]any{
  613. "messaging.solace.message.baggage.someKey": "someVal",
  614. }))
  615. },
  616. },
  617. {
  618. name: "Valid baggage with properties",
  619. baggage: `someKey=someVal;someProp=someOtherThing,someOtherKey=someOtherVal;someProp=NewProp123;someOtherProp=AnotherProp192`,
  620. expected: func(m pcommon.Map) {
  621. assert.NoError(t, m.FromRaw(map[string]any{
  622. "messaging.solace.message.baggage.someKey": "someVal",
  623. "messaging.solace.message.baggage_metadata.someKey": "someProp=someOtherThing",
  624. "messaging.solace.message.baggage.someOtherKey": `someOtherVal`,
  625. "messaging.solace.message.baggage_metadata.someOtherKey": "someProp=NewProp123;someOtherProp=AnotherProp192",
  626. }))
  627. },
  628. },
  629. {
  630. name: "Invalid baggage",
  631. baggage: `someKey"=someVal;someProp=someOtherThing`,
  632. errStr: "invalid key",
  633. },
  634. }
  635. for _, testCase := range testCases {
  636. t.Run(fmt.Sprintf("%T", testCase.name), func(t *testing.T) {
  637. actual := pcommon.NewMap()
  638. u := newTestReceiveV1Unmarshaller(t)
  639. err := u.unmarshalBaggage(actual, testCase.baggage)
  640. if testCase.errStr == "" {
  641. assert.Nil(t, err)
  642. } else {
  643. assert.ErrorContains(t, err, testCase.errStr)
  644. }
  645. if testCase.expected != nil {
  646. expected := pcommon.NewMap()
  647. testCase.expected(expected)
  648. assert.Equal(t, expected.AsRaw(), actual.AsRaw())
  649. } else {
  650. // assert we didn't add anything if we don't have a result map
  651. assert.Equal(t, 0, actual.Len())
  652. }
  653. })
  654. }
  655. }
  656. func TestReceiveUnmarshallerInsertUserProperty(t *testing.T) {
  657. emojiVal := 0xf09f92a9
  658. testCases := []struct {
  659. data any
  660. expectedType pcommon.ValueType
  661. validate func(val pcommon.Value)
  662. }{
  663. {
  664. &receive_v1.SpanData_UserPropertyValue_NullValue{},
  665. pcommon.ValueTypeEmpty,
  666. nil,
  667. },
  668. {
  669. &receive_v1.SpanData_UserPropertyValue_BoolValue{BoolValue: true},
  670. pcommon.ValueTypeBool,
  671. func(val pcommon.Value) {
  672. assert.Equal(t, true, val.Bool())
  673. },
  674. },
  675. {
  676. &receive_v1.SpanData_UserPropertyValue_DoubleValue{DoubleValue: 12.34},
  677. pcommon.ValueTypeDouble,
  678. func(val pcommon.Value) {
  679. assert.Equal(t, float64(12.34), val.Double())
  680. },
  681. },
  682. {
  683. &receive_v1.SpanData_UserPropertyValue_ByteArrayValue{ByteArrayValue: []byte{1, 2, 3, 4}},
  684. pcommon.ValueTypeBytes,
  685. func(val pcommon.Value) {
  686. assert.Equal(t, []byte{1, 2, 3, 4}, val.Bytes().AsRaw())
  687. },
  688. },
  689. {
  690. &receive_v1.SpanData_UserPropertyValue_FloatValue{FloatValue: 12.34},
  691. pcommon.ValueTypeDouble,
  692. func(val pcommon.Value) {
  693. assert.Equal(t, float64(float32(12.34)), val.Double())
  694. },
  695. },
  696. {
  697. &receive_v1.SpanData_UserPropertyValue_Int8Value{Int8Value: 8},
  698. pcommon.ValueTypeInt,
  699. func(val pcommon.Value) {
  700. assert.Equal(t, int64(8), val.Int())
  701. },
  702. },
  703. {
  704. &receive_v1.SpanData_UserPropertyValue_Int16Value{Int16Value: 16},
  705. pcommon.ValueTypeInt,
  706. func(val pcommon.Value) {
  707. assert.Equal(t, int64(16), val.Int())
  708. },
  709. },
  710. {
  711. &receive_v1.SpanData_UserPropertyValue_Int32Value{Int32Value: 32},
  712. pcommon.ValueTypeInt,
  713. func(val pcommon.Value) {
  714. assert.Equal(t, int64(32), val.Int())
  715. },
  716. },
  717. {
  718. &receive_v1.SpanData_UserPropertyValue_Int64Value{Int64Value: 64},
  719. pcommon.ValueTypeInt,
  720. func(val pcommon.Value) {
  721. assert.Equal(t, int64(64), val.Int())
  722. },
  723. },
  724. {
  725. &receive_v1.SpanData_UserPropertyValue_Uint8Value{Uint8Value: 8},
  726. pcommon.ValueTypeInt,
  727. func(val pcommon.Value) {
  728. assert.Equal(t, int64(8), val.Int())
  729. },
  730. },
  731. {
  732. &receive_v1.SpanData_UserPropertyValue_Uint16Value{Uint16Value: 16},
  733. pcommon.ValueTypeInt,
  734. func(val pcommon.Value) {
  735. assert.Equal(t, int64(16), val.Int())
  736. },
  737. },
  738. {
  739. &receive_v1.SpanData_UserPropertyValue_Uint32Value{Uint32Value: 32},
  740. pcommon.ValueTypeInt,
  741. func(val pcommon.Value) {
  742. assert.Equal(t, int64(32), val.Int())
  743. },
  744. },
  745. {
  746. &receive_v1.SpanData_UserPropertyValue_Uint64Value{Uint64Value: 64},
  747. pcommon.ValueTypeInt,
  748. func(val pcommon.Value) {
  749. assert.Equal(t, int64(64), val.Int())
  750. },
  751. },
  752. {
  753. &receive_v1.SpanData_UserPropertyValue_StringValue{StringValue: "hello world"},
  754. pcommon.ValueTypeStr,
  755. func(val pcommon.Value) {
  756. assert.Equal(t, "hello world", val.Str())
  757. },
  758. },
  759. {
  760. &receive_v1.SpanData_UserPropertyValue_DestinationValue{DestinationValue: "some_dest"},
  761. pcommon.ValueTypeStr,
  762. func(val pcommon.Value) {
  763. assert.Equal(t, "some_dest", val.Str())
  764. },
  765. },
  766. {
  767. &receive_v1.SpanData_UserPropertyValue_CharacterValue{CharacterValue: 0x61},
  768. pcommon.ValueTypeStr,
  769. func(val pcommon.Value) {
  770. assert.Equal(t, "a", val.Str())
  771. },
  772. },
  773. {
  774. &receive_v1.SpanData_UserPropertyValue_CharacterValue{CharacterValue: 0xe68080},
  775. pcommon.ValueTypeStr,
  776. func(val pcommon.Value) {
  777. assert.Equal(t, string(rune(0xe68080)), val.Str())
  778. },
  779. },
  780. {
  781. &receive_v1.SpanData_UserPropertyValue_CharacterValue{CharacterValue: 0xf09f92a9},
  782. pcommon.ValueTypeStr,
  783. func(val pcommon.Value) {
  784. assert.Equal(t, string(rune(emojiVal)), val.Str())
  785. },
  786. },
  787. }
  788. unmarshaller := &brokerTraceReceiveUnmarshallerV1{
  789. logger: zap.NewNop(),
  790. }
  791. for _, testCase := range testCases {
  792. t.Run(fmt.Sprintf("%T", testCase.data), func(t *testing.T) {
  793. const key = "some-property"
  794. attributeMap := pcommon.NewMap()
  795. unmarshaller.insertUserProperty(attributeMap, key, testCase.data)
  796. actual, ok := attributeMap.Get("messaging.solace.user_properties." + key)
  797. require.True(t, ok)
  798. assert.Equal(t, testCase.expectedType, actual.Type())
  799. if testCase.validate != nil {
  800. testCase.validate(actual)
  801. }
  802. })
  803. }
  804. }
  805. func TestSolaceMessageReceiveUnmarshallerV1InsertUserPropertyUnsupportedType(t *testing.T) {
  806. u := newTestReceiveV1Unmarshaller(t)
  807. const key = "some-property"
  808. attributeMap := pcommon.NewMap()
  809. u.insertUserProperty(attributeMap, key, "invalid data type")
  810. _, ok := attributeMap.Get("messaging.solace.user_properties." + key)
  811. assert.False(t, ok)
  812. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, 1)
  813. }
  814. func newTestReceiveV1Unmarshaller(t *testing.T) *brokerTraceReceiveUnmarshallerV1 {
  815. m := newTestMetrics(t)
  816. return &brokerTraceReceiveUnmarshallerV1{zap.NewNop(), m}
  817. }