unmarshaller_egress_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package solacereceiver
  4. import (
  5. "fmt"
  6. "testing"
  7. "github.com/Azure/go-amqp"
  8. "github.com/stretchr/testify/assert"
  9. "go.opentelemetry.io/collector/pdata/pcommon"
  10. "go.opentelemetry.io/collector/pdata/ptrace"
  11. "go.uber.org/zap"
  12. egress_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/egress/v1"
  13. )
  14. // msgWithAdditionalSpan is an EgressSpanMsg with one EgressSpan of type 1 (future proofing)
  15. // Used to test that if we receive a oneof that does not support our given trace then we will be able to handle it.
  16. // We must do this with protobuf data since we must ensure that the protobuf library handles this case as expected when unmarshalling.
  17. var msgWithAdditionalSpan = []byte{10, 48, 10, 16, 1, 2, 3, 4, 5, 6, 7, 8, 9, 8, 7, 6, 5, 4, 3, 2, 18, 8, 1, 2, 3, 4, 5, 6, 7, 8, 26, 8, 1, 2, 3, 4, 5, 6, 7, 8, 74, 8, 32, 3, 10, 4, 116, 101, 115, 116, 18, 10, 118, 109, 114, 45, 49, 51, 51, 45, 53, 51, 26, 7, 100, 101, 102, 97, 117, 108, 116}
  18. func TestMsgWithUnknownOneof(t *testing.T) {
  19. unmarshallerV1 := newTestEgressV1Unmarshaller(t)
  20. spanData, err := unmarshallerV1.unmarshalToSpanData(amqp.NewMessage(msgWithAdditionalSpan))
  21. assert.NoError(t, err)
  22. // expect one egress span
  23. assert.Len(t, spanData.EgressSpans, 1)
  24. assert.Nil(t, spanData.EgressSpans[0].GetSendSpan())
  25. }
  26. func TestEgressUnmarshallerMapResourceSpan(t *testing.T) {
  27. var (
  28. routerName = "someRouterName"
  29. vpnName = "someVpnName"
  30. version = "10.0.0"
  31. )
  32. tests := []struct {
  33. name string
  34. spanData *egress_v1.SpanData
  35. want map[string]any
  36. expectedUnmarshallingErrors any
  37. }{
  38. {
  39. name: "Maps All Fields When Present",
  40. spanData: &egress_v1.SpanData{
  41. RouterName: routerName,
  42. MessageVpnName: &vpnName,
  43. SolosVersion: version,
  44. },
  45. want: map[string]any{
  46. "service.name": routerName,
  47. "service.instance.id": vpnName,
  48. "service.version": version,
  49. },
  50. },
  51. {
  52. name: "Does Not Map Fields When Not Present",
  53. spanData: &egress_v1.SpanData{},
  54. want: map[string]any{
  55. "service.version": "",
  56. "service.name": "",
  57. },
  58. },
  59. }
  60. for _, tt := range tests {
  61. t.Run(tt.name, func(t *testing.T) {
  62. u := newTestEgressV1Unmarshaller(t)
  63. actual := pcommon.NewMap()
  64. u.mapResourceSpanAttributes(tt.spanData, actual)
  65. assert.Equal(t, tt.want, actual.AsRaw())
  66. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors)
  67. })
  68. }
  69. }
  70. var (
  71. protocolVersion = "5.0"
  72. protocolVersion2 = "1.0"
  73. protocolVersion3 = "3.0"
  74. someError = "someErrorOccurred"
  75. someOtherError = "someOtherErrorOccurred"
  76. )
  77. // validEgressSpans is valid data used as test data
  78. var validEgressSpans = []struct {
  79. in *egress_v1.SpanData_EgressSpan
  80. out ptrace.Span
  81. }{
  82. {
  83. &egress_v1.SpanData_EgressSpan{
  84. TraceId: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
  85. SpanId: []byte{7, 6, 5, 4, 3, 2, 1, 0},
  86. StartTimeUnixNano: 234567890,
  87. EndTimeUnixNano: 234567890,
  88. TypeData: &egress_v1.SpanData_EgressSpan_SendSpan{
  89. SendSpan: &egress_v1.SpanData_SendSpan{
  90. Protocol: "SMF",
  91. ProtocolVersion: &protocolVersion3,
  92. ConsumerClientUsername: "clientUsername",
  93. ConsumerClientName: "clientName",
  94. Source: &egress_v1.SpanData_SendSpan_QueueName{
  95. QueueName: "someQueue",
  96. },
  97. Outcome: egress_v1.SpanData_SendSpan_FLOW_UNBOUND,
  98. ReplayedMsg: false,
  99. },
  100. },
  101. },
  102. func() ptrace.Span {
  103. span := ptrace.NewSpan()
  104. span.SetName("someQueue send")
  105. span.SetTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
  106. span.SetSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0})
  107. span.SetStartTimestamp(234567890)
  108. span.SetEndTimestamp(234567890)
  109. span.SetKind(4)
  110. spanAttrs := span.Attributes()
  111. spanAttrs.PutStr("messaging.system", "SolacePubSub+")
  112. spanAttrs.PutStr("messaging.operation", "send")
  113. spanAttrs.PutStr("messaging.protocol", "SMF")
  114. spanAttrs.PutStr("messaging.protocol_version", "3.0")
  115. spanAttrs.PutStr("messaging.source.name", "someQueue")
  116. spanAttrs.PutStr("messaging.source.kind", "queue")
  117. spanAttrs.PutStr("messaging.solace.client_username", "clientUsername")
  118. spanAttrs.PutStr("messaging.solace.client_name", "clientName")
  119. spanAttrs.PutBool("messaging.solace.message_replayed", false)
  120. spanAttrs.PutStr("messaging.solace.send.outcome", "flow unbound")
  121. return span
  122. }(),
  123. },
  124. {
  125. &egress_v1.SpanData_EgressSpan{
  126. TraceId: []byte{1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
  127. SpanId: []byte{7, 6, 5, 4, 3, 2, 1, 1},
  128. StartTimeUnixNano: 1234567890,
  129. EndTimeUnixNano: 2234567890,
  130. ErrorDescription: &someError,
  131. TypeData: &egress_v1.SpanData_EgressSpan_SendSpan{
  132. SendSpan: &egress_v1.SpanData_SendSpan{
  133. Protocol: "MQTT",
  134. ProtocolVersion: &protocolVersion,
  135. ConsumerClientUsername: "someClientUsername",
  136. ConsumerClientName: "someClient1234",
  137. Source: &egress_v1.SpanData_SendSpan_QueueName{
  138. QueueName: "queueName",
  139. },
  140. Outcome: egress_v1.SpanData_SendSpan_ACCEPTED,
  141. ReplayedMsg: false,
  142. },
  143. },
  144. TransactionEvent: &egress_v1.SpanData_TransactionEvent{
  145. TimeUnixNano: 123456789,
  146. Type: egress_v1.SpanData_TransactionEvent_SESSION_TIMEOUT,
  147. Initiator: egress_v1.SpanData_TransactionEvent_BROKER,
  148. TransactionId: &egress_v1.SpanData_TransactionEvent_LocalId{
  149. LocalId: &egress_v1.SpanData_TransactionEvent_LocalTransactionId{
  150. TransactionId: 12345,
  151. SessionId: 67890,
  152. SessionName: "my-session-name",
  153. },
  154. },
  155. ErrorDescription: &someOtherError,
  156. },
  157. }, func() ptrace.Span {
  158. span := ptrace.NewSpan()
  159. span.SetName("queueName send")
  160. span.SetTraceID([16]byte{1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
  161. span.SetSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 1})
  162. span.SetStartTimestamp(1234567890)
  163. span.SetEndTimestamp(2234567890)
  164. span.SetKind(4)
  165. span.Status().SetCode(ptrace.StatusCodeError)
  166. span.Status().SetMessage("someErrorOccurred")
  167. spanAttrs := span.Attributes()
  168. spanAttrs.PutStr("messaging.system", "SolacePubSub+")
  169. spanAttrs.PutStr("messaging.operation", "send")
  170. spanAttrs.PutStr("messaging.protocol", "MQTT")
  171. spanAttrs.PutStr("messaging.protocol_version", "5.0")
  172. spanAttrs.PutStr("messaging.source.name", "queueName")
  173. spanAttrs.PutStr("messaging.source.kind", "queue")
  174. spanAttrs.PutStr("messaging.solace.client_username", "someClientUsername")
  175. spanAttrs.PutStr("messaging.solace.client_name", "someClient1234")
  176. spanAttrs.PutBool("messaging.solace.message_replayed", false)
  177. spanAttrs.PutStr("messaging.solace.send.outcome", "accepted")
  178. txnEvent := span.Events().AppendEmpty()
  179. txnEvent.SetName("session_timeout")
  180. txnEvent.SetTimestamp(123456789)
  181. txnEventAttrs := txnEvent.Attributes()
  182. txnEventAttrs.PutStr("messaging.solace.transaction_initiator", "broker")
  183. txnEventAttrs.PutInt("messaging.solace.transaction_id", 12345)
  184. txnEventAttrs.PutStr("messaging.solace.transacted_session_name", "my-session-name")
  185. txnEventAttrs.PutInt("messaging.solace.transacted_session_id", 67890)
  186. txnEventAttrs.PutStr("messaging.solace.transaction_error_message", "someOtherErrorOccurred")
  187. return span
  188. }(),
  189. },
  190. {
  191. &egress_v1.SpanData_EgressSpan{
  192. TraceId: []byte{1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31},
  193. SpanId: []byte{0, 1, 2, 3, 4, 5, 6, 7},
  194. ParentSpanId: []byte{7, 6, 5, 4, 3, 2, 1, 0},
  195. StartTimeUnixNano: 4234567890,
  196. EndTimeUnixNano: 5234567890,
  197. TypeData: &egress_v1.SpanData_EgressSpan_SendSpan{
  198. SendSpan: &egress_v1.SpanData_SendSpan{
  199. Protocol: "AMQP",
  200. ProtocolVersion: &protocolVersion2,
  201. ConsumerClientUsername: "someOtherClientUsername",
  202. ConsumerClientName: "someOtherClient1234",
  203. Source: &egress_v1.SpanData_SendSpan_TopicEndpointName{
  204. TopicEndpointName: "topicEndpointName",
  205. },
  206. Outcome: egress_v1.SpanData_SendSpan_REJECTED,
  207. ReplayedMsg: true,
  208. },
  209. },
  210. TransactionEvent: &egress_v1.SpanData_TransactionEvent{
  211. TimeUnixNano: 223456789,
  212. Type: egress_v1.SpanData_TransactionEvent_END,
  213. Initiator: egress_v1.SpanData_TransactionEvent_CLIENT,
  214. TransactionId: &egress_v1.SpanData_TransactionEvent_Xid_{
  215. Xid: &egress_v1.SpanData_TransactionEvent_Xid{
  216. FormatId: 123,
  217. BranchQualifier: []byte{0, 8, 20, 254},
  218. GlobalId: []byte{128, 64, 32, 16, 8, 4, 2, 1, 0},
  219. },
  220. },
  221. },
  222. }, func() ptrace.Span {
  223. // second send span
  224. span := ptrace.NewSpan()
  225. span.SetName("topicEndpointName send")
  226. span.SetTraceID([16]byte{1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31})
  227. span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})
  228. span.SetParentSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0})
  229. span.SetStartTimestamp(4234567890)
  230. span.SetEndTimestamp(5234567890)
  231. span.SetKind(4)
  232. spanAttrs := span.Attributes()
  233. spanAttrs.PutStr("messaging.system", "SolacePubSub+")
  234. spanAttrs.PutStr("messaging.operation", "send")
  235. spanAttrs.PutStr("messaging.protocol", "AMQP")
  236. spanAttrs.PutStr("messaging.protocol_version", "1.0")
  237. spanAttrs.PutStr("messaging.source.name", "topicEndpointName")
  238. spanAttrs.PutStr("messaging.source.kind", "topic-endpoint")
  239. spanAttrs.PutStr("messaging.solace.client_username", "someOtherClientUsername")
  240. spanAttrs.PutStr("messaging.solace.client_name", "someOtherClient1234")
  241. spanAttrs.PutBool("messaging.solace.message_replayed", true)
  242. spanAttrs.PutStr("messaging.solace.send.outcome", "rejected")
  243. txnEvent := span.Events().AppendEmpty()
  244. txnEvent.SetName("end")
  245. txnEvent.SetTimestamp(223456789)
  246. txnAttrs := txnEvent.Attributes()
  247. txnAttrs.PutStr("messaging.solace.transaction_initiator", "client")
  248. txnAttrs.PutStr("messaging.solace.transaction_xid", "0000007b-000814fe-804020100804020100")
  249. return span
  250. }(),
  251. },
  252. }
  253. func TestEgressUnmarshallerEgressSpan(t *testing.T) {
  254. type testCase struct {
  255. name string
  256. spanData *egress_v1.SpanData_EgressSpan
  257. want *ptrace.Span
  258. expectedUnmarshallingErrors any
  259. }
  260. tests := []testCase{
  261. {
  262. name: "No typed span",
  263. spanData: &egress_v1.SpanData_EgressSpan{
  264. TraceId: []byte{1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31},
  265. SpanId: []byte{0, 1, 2, 3, 4, 5, 6, 7},
  266. ParentSpanId: []byte{7, 6, 5, 4, 3, 2, 1, 0},
  267. StartTimeUnixNano: 4234567890,
  268. EndTimeUnixNano: 5234567890,
  269. // no typed span type
  270. },
  271. },
  272. }
  273. var i = 1
  274. for _, dataRef := range validEgressSpans {
  275. name := "valid span " + fmt.Sprint(i)
  276. i++
  277. want := dataRef.out
  278. spanData := dataRef.in
  279. tests = append(tests, testCase{
  280. name: name,
  281. spanData: spanData,
  282. want: &want,
  283. })
  284. }
  285. for _, tt := range tests {
  286. t.Run(tt.name, func(t *testing.T) {
  287. u := newTestEgressV1Unmarshaller(t)
  288. actual := ptrace.NewSpanSlice()
  289. u.mapEgressSpan(tt.spanData, actual)
  290. if tt.want != nil {
  291. assert.Equal(t, 1, actual.Len())
  292. compareSpans(t, *tt.want, actual.At(0))
  293. } else {
  294. assert.Equal(t, 0, actual.Len())
  295. validateMetric(t, u.metrics.views.droppedEgressSpans, 1)
  296. }
  297. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors)
  298. })
  299. }
  300. }
  301. func TestEgressUnmarshallerSendSpanAttributes(t *testing.T) {
  302. // creates a base attribute map that additional data can be added to
  303. // does not include outcome or source. Attributes will override all fields in base
  304. getSpan := func(attributes map[string]any, name string) ptrace.Span {
  305. base := map[string]any{
  306. "messaging.system": "SolacePubSub+",
  307. "messaging.operation": "send",
  308. "messaging.protocol": "MQTT",
  309. "messaging.protocol_version": "5.0",
  310. "messaging.solace.client_username": "someUser",
  311. "messaging.solace.client_name": "someName",
  312. "messaging.solace.message_replayed": false,
  313. "messaging.solace.send.outcome": "accepted",
  314. }
  315. for key, val := range attributes {
  316. base[key] = val
  317. }
  318. span := ptrace.NewSpan()
  319. err := span.Attributes().FromRaw(base)
  320. assert.NoError(t, err)
  321. span.SetName(name)
  322. span.SetKind(ptrace.SpanKindProducer)
  323. return span
  324. }
  325. // sets the common fields from getAttributes
  326. getSendSpan := func(base *egress_v1.SpanData_SendSpan) *egress_v1.SpanData_SendSpan {
  327. protocolVersion := "5.0"
  328. base.Protocol = "MQTT"
  329. base.ProtocolVersion = &protocolVersion
  330. base.ConsumerClientUsername = "someUser"
  331. base.ConsumerClientName = "someName"
  332. return base
  333. }
  334. tests := []struct {
  335. name string
  336. spanData *egress_v1.SpanData_SendSpan
  337. want ptrace.Span
  338. expectedUnmarshallingErrors any
  339. }{
  340. {
  341. name: "With Queue source",
  342. spanData: getSendSpan(&egress_v1.SpanData_SendSpan{
  343. Source: &egress_v1.SpanData_SendSpan_QueueName{
  344. QueueName: "someQueue",
  345. },
  346. }),
  347. want: getSpan(map[string]any{
  348. "messaging.source.name": "someQueue",
  349. "messaging.source.kind": "queue",
  350. }, "someQueue send"),
  351. },
  352. {
  353. name: "With Topic Endpoint source",
  354. spanData: getSendSpan(&egress_v1.SpanData_SendSpan{
  355. Source: &egress_v1.SpanData_SendSpan_TopicEndpointName{
  356. TopicEndpointName: "0123456789abcdef0123456789abcdeg",
  357. },
  358. }),
  359. want: getSpan(map[string]any{
  360. "messaging.source.name": "0123456789abcdef0123456789abcdeg",
  361. "messaging.source.kind": "topic-endpoint",
  362. }, "0123456789abcdef0123456789abcdeg send"),
  363. },
  364. {
  365. name: "With Anonymous Queue source",
  366. spanData: getSendSpan(&egress_v1.SpanData_SendSpan{
  367. Source: &egress_v1.SpanData_SendSpan_QueueName{
  368. QueueName: "#P2P/QTMP/myQueue",
  369. },
  370. }),
  371. want: getSpan(map[string]any{
  372. "messaging.source.name": "#P2P/QTMP/myQueue",
  373. "messaging.source.kind": "queue",
  374. }, "(anonymous) send"),
  375. },
  376. {
  377. name: "With Anonymous Topic Endpoint source",
  378. spanData: getSendSpan(&egress_v1.SpanData_SendSpan{
  379. Source: &egress_v1.SpanData_SendSpan_TopicEndpointName{
  380. TopicEndpointName: "0123456789abcdef0123456789abcdef",
  381. },
  382. }),
  383. want: getSpan(map[string]any{
  384. "messaging.source.name": "0123456789abcdef0123456789abcdef",
  385. "messaging.source.kind": "topic-endpoint",
  386. }, "(anonymous) send"),
  387. },
  388. {
  389. name: "With Unknown Endpoint source",
  390. spanData: getSendSpan(&egress_v1.SpanData_SendSpan{}),
  391. want: getSpan(map[string]any{}, "(unknown) send"),
  392. expectedUnmarshallingErrors: 1,
  393. },
  394. }
  395. for _, tt := range tests {
  396. t.Run(tt.name, func(t *testing.T) {
  397. u := newTestEgressV1Unmarshaller(t)
  398. actual := ptrace.NewSpan()
  399. u.mapSendSpan(tt.spanData, actual)
  400. compareSpans(t, tt.want, actual)
  401. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors)
  402. })
  403. }
  404. // test the various outcomes
  405. outcomes := map[egress_v1.SpanData_SendSpan_Outcome]string{
  406. egress_v1.SpanData_SendSpan_ACCEPTED: "accepted",
  407. egress_v1.SpanData_SendSpan_REJECTED: "rejected",
  408. egress_v1.SpanData_SendSpan_RELEASED: "released",
  409. egress_v1.SpanData_SendSpan_DELIVERY_FAILED: "delivery failed",
  410. egress_v1.SpanData_SendSpan_FLOW_UNBOUND: "flow unbound",
  411. egress_v1.SpanData_SendSpan_TRANSACTION_COMMIT: "transaction commit",
  412. egress_v1.SpanData_SendSpan_TRANSACTION_COMMIT_FAILED: "transaction commit failed",
  413. egress_v1.SpanData_SendSpan_TRANSACTION_ROLLBACK: "transaction rollback",
  414. }
  415. for outcomeKey, outcomeName := range outcomes {
  416. t.Run("With outcome "+outcomeName, func(t *testing.T) {
  417. u := newTestEgressV1Unmarshaller(t)
  418. expected := getSpan(map[string]any{
  419. "messaging.source.name": "someQueue",
  420. "messaging.source.kind": "queue",
  421. "messaging.solace.send.outcome": outcomeName,
  422. }, "someQueue send")
  423. spanData := getSendSpan(&egress_v1.SpanData_SendSpan{
  424. Source: &egress_v1.SpanData_SendSpan_QueueName{
  425. QueueName: "someQueue",
  426. },
  427. Outcome: outcomeKey,
  428. })
  429. actual := ptrace.NewSpan()
  430. u.mapSendSpan(spanData, actual)
  431. compareSpans(t, expected, actual)
  432. })
  433. }
  434. }
  435. func TestEgressUnmarshallerTransactionEvent(t *testing.T) {
  436. someErrorString := "some error"
  437. tests := []struct {
  438. name string
  439. spanData *egress_v1.SpanData_TransactionEvent
  440. populateExpectedSpan func(span ptrace.Span)
  441. unmarshallingErrors any
  442. }{
  443. { // Local Transaction
  444. name: "Local Transaction Event",
  445. spanData: &egress_v1.SpanData_TransactionEvent{
  446. TimeUnixNano: 123456789,
  447. Type: egress_v1.SpanData_TransactionEvent_COMMIT,
  448. Initiator: egress_v1.SpanData_TransactionEvent_CLIENT,
  449. TransactionId: &egress_v1.SpanData_TransactionEvent_LocalId{
  450. LocalId: &egress_v1.SpanData_TransactionEvent_LocalTransactionId{
  451. TransactionId: 12345,
  452. SessionId: 67890,
  453. SessionName: "my-session-name",
  454. },
  455. },
  456. },
  457. populateExpectedSpan: func(span ptrace.Span) {
  458. populateEvent(t, span, "commit", 123456789, map[string]any{
  459. "messaging.solace.transaction_initiator": "client",
  460. "messaging.solace.transaction_id": 12345,
  461. "messaging.solace.transacted_session_name": "my-session-name",
  462. "messaging.solace.transacted_session_id": 67890,
  463. })
  464. },
  465. },
  466. {
  467. name: "Local Transaction Event with Session Timeout",
  468. spanData: &egress_v1.SpanData_TransactionEvent{
  469. TimeUnixNano: 123456789,
  470. Type: egress_v1.SpanData_TransactionEvent_SESSION_TIMEOUT,
  471. Initiator: egress_v1.SpanData_TransactionEvent_CLIENT,
  472. TransactionId: &egress_v1.SpanData_TransactionEvent_LocalId{
  473. LocalId: &egress_v1.SpanData_TransactionEvent_LocalTransactionId{
  474. TransactionId: 12345,
  475. SessionId: 67890,
  476. SessionName: "my-session-name",
  477. },
  478. },
  479. },
  480. populateExpectedSpan: func(span ptrace.Span) {
  481. populateEvent(t, span, "session_timeout", 123456789, map[string]any{
  482. "messaging.solace.transaction_initiator": "client",
  483. "messaging.solace.transaction_id": 12345,
  484. "messaging.solace.transacted_session_name": "my-session-name",
  485. "messaging.solace.transacted_session_id": 67890,
  486. })
  487. },
  488. },
  489. {
  490. name: "Local Transaction Event with Rollback Only",
  491. spanData: &egress_v1.SpanData_TransactionEvent{
  492. TimeUnixNano: 123456789,
  493. Type: egress_v1.SpanData_TransactionEvent_ROLLBACK_ONLY,
  494. Initiator: egress_v1.SpanData_TransactionEvent_CLIENT,
  495. TransactionId: &egress_v1.SpanData_TransactionEvent_LocalId{
  496. LocalId: &egress_v1.SpanData_TransactionEvent_LocalTransactionId{
  497. TransactionId: 12345,
  498. SessionId: 67890,
  499. SessionName: "my-session-name",
  500. },
  501. },
  502. },
  503. populateExpectedSpan: func(span ptrace.Span) {
  504. populateEvent(t, span, "rollback_only", 123456789, map[string]any{
  505. "messaging.solace.transaction_initiator": "client",
  506. "messaging.solace.transaction_id": 12345,
  507. "messaging.solace.transacted_session_name": "my-session-name",
  508. "messaging.solace.transacted_session_id": 67890,
  509. })
  510. },
  511. },
  512. { // XA transaction
  513. name: "XA Transaction Event",
  514. spanData: &egress_v1.SpanData_TransactionEvent{
  515. TimeUnixNano: 123456789,
  516. Type: egress_v1.SpanData_TransactionEvent_END,
  517. Initiator: egress_v1.SpanData_TransactionEvent_ADMIN,
  518. TransactionId: &egress_v1.SpanData_TransactionEvent_Xid_{
  519. Xid: &egress_v1.SpanData_TransactionEvent_Xid{
  520. FormatId: 123,
  521. BranchQualifier: []byte{0, 8, 20, 254},
  522. GlobalId: []byte{128, 64, 32, 16, 8, 4, 2, 1, 0},
  523. },
  524. },
  525. },
  526. populateExpectedSpan: func(span ptrace.Span) {
  527. populateEvent(t, span, "end", 123456789, map[string]any{
  528. "messaging.solace.transaction_initiator": "administrator",
  529. "messaging.solace.transaction_xid": "0000007b-000814fe-804020100804020100",
  530. })
  531. },
  532. },
  533. { // XA Transaction with no branch qualifier or global ID and with an error
  534. name: "XA Transaction Event with nil fields and error",
  535. spanData: &egress_v1.SpanData_TransactionEvent{
  536. TimeUnixNano: 123456789,
  537. Type: egress_v1.SpanData_TransactionEvent_PREPARE,
  538. Initiator: egress_v1.SpanData_TransactionEvent_BROKER,
  539. TransactionId: &egress_v1.SpanData_TransactionEvent_Xid_{
  540. Xid: &egress_v1.SpanData_TransactionEvent_Xid{
  541. FormatId: 123,
  542. BranchQualifier: nil,
  543. GlobalId: nil,
  544. },
  545. },
  546. ErrorDescription: &someErrorString,
  547. },
  548. populateExpectedSpan: func(span ptrace.Span) {
  549. populateEvent(t, span, "prepare", 123456789, map[string]any{
  550. "messaging.solace.transaction_initiator": "broker",
  551. "messaging.solace.transaction_xid": "0000007b--",
  552. "messaging.solace.transaction_error_message": someErrorString,
  553. })
  554. },
  555. },
  556. { // Type of transaction not handled
  557. name: "Unknown Transaction Type and no ID",
  558. spanData: &egress_v1.SpanData_TransactionEvent{
  559. TimeUnixNano: 123456789,
  560. Type: egress_v1.SpanData_TransactionEvent_Type(12345),
  561. },
  562. populateExpectedSpan: func(span ptrace.Span) {
  563. populateEvent(t, span, "Unknown Transaction Event (12345)", 123456789, map[string]any{
  564. "messaging.solace.transaction_initiator": "client",
  565. })
  566. },
  567. unmarshallingErrors: 2,
  568. },
  569. { // Type of ID not handled, type of initiator not handled
  570. name: "Unknown Transaction Initiator and no ID",
  571. spanData: &egress_v1.SpanData_TransactionEvent{
  572. TimeUnixNano: 123456789,
  573. Type: egress_v1.SpanData_TransactionEvent_ROLLBACK,
  574. Initiator: egress_v1.SpanData_TransactionEvent_Initiator(12345),
  575. TransactionId: nil,
  576. },
  577. populateExpectedSpan: func(span ptrace.Span) {
  578. populateEvent(t, span, "rollback", 123456789, map[string]any{
  579. "messaging.solace.transaction_initiator": "Unknown Transaction Initiator (12345)",
  580. })
  581. },
  582. unmarshallingErrors: 2,
  583. },
  584. }
  585. for _, tt := range tests {
  586. t.Run(tt.name, func(t *testing.T) {
  587. u := newTestEgressV1Unmarshaller(t)
  588. expected := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
  589. tt.populateExpectedSpan(expected)
  590. actual := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
  591. u.mapTransactionEvent(tt.spanData, actual.Events().AppendEmpty())
  592. // order is nondeterministic for attributes, so we must sort to get a valid comparison
  593. compareSpans(t, expected, actual)
  594. validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.unmarshallingErrors)
  595. })
  596. }
  597. }
  598. func newTestEgressV1Unmarshaller(t *testing.T) *brokerTraceEgressUnmarshallerV1 {
  599. m := newTestMetrics(t)
  600. return &brokerTraceEgressUnmarshallerV1{zap.NewNop(), m}
  601. }