123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package solacereceiver
- import (
- "fmt"
- "testing"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.uber.org/zap"
- receive_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/receive/v1"
- )
- func TestReceiveUnmarshallerMapResourceSpan(t *testing.T) {
- var (
- routerName = "someRouterName"
- vpnName = "someVpnName"
- version = "10.0.0"
- )
- tests := []struct {
- name string
- spanData *receive_v1.SpanData
- want map[string]any
- expectedUnmarshallingErrors any
- }{
- {
- name: "Maps All Fields When Present",
- spanData: &receive_v1.SpanData{
- RouterName: routerName,
- MessageVpnName: &vpnName,
- SolosVersion: version,
- },
- want: map[string]any{
- "service.name": routerName,
- "service.instance.id": vpnName,
- "service.version": version,
- },
- },
- {
- name: "Does Not Map Fields When Not Present",
- spanData: &receive_v1.SpanData{},
- want: map[string]any{
- "service.version": "",
- "service.name": "",
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- u := newTestReceiveV1Unmarshaller(t)
- actual := pcommon.NewMap()
- u.mapResourceSpanAttributes(tt.spanData, actual)
- assert.Equal(t, tt.want, actual.AsRaw())
- validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors)
- })
- }
- }
- // Tests the received span to traces mappings
- // Includes all required opentelemetry fields such as trace ID, span ID, etc.
- func TestReceiveUnmarshallerMapClientSpanData(t *testing.T) {
- someTraceState := "some trace status"
- tests := []struct {
- name string
- data *receive_v1.SpanData
- want func(ptrace.Span)
- }{
- // no trace state no status no parent span
- {
- name: "Without Optional Fields",
- data: &receive_v1.SpanData{
- TraceId: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
- SpanId: []byte{7, 6, 5, 4, 3, 2, 1, 0},
- StartTimeUnixNano: 1234567890,
- EndTimeUnixNano: 2234567890,
- },
- want: func(span ptrace.Span) {
- span.SetTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
- span.SetSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0})
- span.SetStartTimestamp(1234567890)
- span.SetEndTimestamp(2234567890)
- // expect some constants
- span.SetKind(5)
- span.SetName("(topic) receive")
- span.Status().SetCode(ptrace.StatusCodeUnset)
- },
- },
- // trace state status and parent span
- {
- name: "With Optional Fields",
- data: &receive_v1.SpanData{
- TraceId: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
- SpanId: []byte{7, 6, 5, 4, 3, 2, 1, 0},
- StartTimeUnixNano: 1234567890,
- EndTimeUnixNano: 2234567890,
- ParentSpanId: []byte{15, 14, 13, 12, 11, 10, 9, 8},
- TraceState: &someTraceState,
- ErrorDescription: "some error",
- },
- want: func(span ptrace.Span) {
- span.SetTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
- span.SetSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0})
- span.SetStartTimestamp(1234567890)
- span.SetEndTimestamp(2234567890)
- span.SetParentSpanID([8]byte{15, 14, 13, 12, 11, 10, 9, 8})
- span.TraceState().FromRaw(someTraceState)
- span.Status().SetCode(ptrace.StatusCodeError)
- span.Status().SetMessage("some error")
- // expect some constants
- span.SetKind(5)
- span.SetName("(topic) receive")
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- u := newTestReceiveV1Unmarshaller(t)
- actual := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
- u.mapClientSpanData(tt.data, actual)
- expected := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
- tt.want(expected)
- assert.Equal(t, expected, actual)
- })
- }
- }
- func TestReceiveUnmarshallerMapClientSpanAttributes(t *testing.T) {
- var (
- protocolVersion = "5.0"
- applicationMessageID = "someMessageID"
- correlationID = "someConversationID"
- replyToTopic = "someReplyToTopic"
- baggageString = `someKey=someVal;someProp=someOtherThing,someOtherKey=someOtherVal;someProp=NewProp123;someOtherProp=AnotherProp192`
- invalidBaggageString = `someKey"=someVal;someProp=someOtherThing`
- priority = uint32(1)
- ttl = int64(86000)
- )
- tests := []struct {
- name string
- spanData *receive_v1.SpanData
- want map[string]any
- expectedUnmarshallingErrors any
- }{
- {
- name: "With All Valid Attributes",
- spanData: &receive_v1.SpanData{
- Protocol: "MQTT",
- ProtocolVersion: &protocolVersion,
- ApplicationMessageId: &applicationMessageID,
- CorrelationId: &correlationID,
- BinaryAttachmentSize: 1000,
- XmlAttachmentSize: 200,
- MetadataSize: 34,
- ClientUsername: "someClientUsername",
- ClientName: "someClient1234",
- ReplyToTopic: &replyToTopic,
- DeliveryMode: receive_v1.SpanData_PERSISTENT,
- Topic: "someTopic",
- ReplicationGroupMessageId: []byte{0x01, 0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
- Priority: &priority,
- Ttl: &ttl,
- DmqEligible: true,
- DroppedEnqueueEventsSuccess: 42,
- DroppedEnqueueEventsFailed: 24,
- HostIp: []byte{1, 2, 3, 4},
- HostPort: 55555,
- PeerIp: []byte{35, 69, 4, 37, 44, 161, 0, 0, 0, 0, 5, 103, 86, 115, 35, 181},
- PeerPort: 12345,
- BrokerReceiveTimeUnixNano: 1357924680,
- DroppedApplicationMessageProperties: false,
- Baggage: &baggageString,
- UserProperties: map[string]*receive_v1.SpanData_UserPropertyValue{
- "special_key": {
- Value: &receive_v1.SpanData_UserPropertyValue_BoolValue{
- BoolValue: true,
- },
- },
- },
- },
- want: map[string]any{
- "messaging.system": "SolacePubSub+",
- "messaging.operation": "receive",
- "messaging.protocol": "MQTT",
- "messaging.protocol_version": "5.0",
- "messaging.message_id": "someMessageID",
- "messaging.conversation_id": "someConversationID",
- "messaging.message_payload_size_bytes": int64(1234),
- "messaging.destination": "someTopic",
- "messaging.solace.client_username": "someClientUsername",
- "messaging.solace.client_name": "someClient1234",
- "messaging.solace.replication_group_message_id": "rmid1:00010-40910192431-40516479-90a9c4e1",
- "messaging.solace.priority": int64(1),
- "messaging.solace.ttl": int64(86000),
- "messaging.solace.dmq_eligible": true,
- "messaging.solace.dropped_enqueue_events_success": int64(42),
- "messaging.solace.dropped_enqueue_events_failed": int64(24),
- "messaging.solace.reply_to_topic": "someReplyToTopic",
- "messaging.solace.delivery_mode": "persistent",
- "net.host.ip": "1.2.3.4",
- "net.host.port": int64(55555),
- "net.peer.ip": "2345:425:2ca1::567:5673:23b5",
- "net.peer.port": int64(12345),
- "messaging.solace.user_properties.special_key": true,
- "messaging.solace.broker_receive_time_unix_nano": int64(1357924680),
- "messaging.solace.dropped_application_message_properties": false,
- "messaging.solace.message.baggage.someKey": "someVal",
- "messaging.solace.message.baggage_metadata.someKey": "someProp=someOtherThing",
- "messaging.solace.message.baggage.someOtherKey": `someOtherVal`,
- "messaging.solace.message.baggage_metadata.someOtherKey": "someProp=NewProp123;someOtherProp=AnotherProp192",
- },
- },
- {
- name: "With Only Required Fields",
- spanData: &receive_v1.SpanData{
- Protocol: "MQTT",
- BinaryAttachmentSize: 1000,
- XmlAttachmentSize: 200,
- MetadataSize: 34,
- ClientUsername: "someClientUsername",
- ClientName: "someClient1234",
- Topic: "someTopic",
- DeliveryMode: receive_v1.SpanData_NON_PERSISTENT,
- DmqEligible: true,
- DroppedEnqueueEventsSuccess: 42,
- DroppedEnqueueEventsFailed: 24,
- HostIp: []byte{1, 2, 3, 4},
- HostPort: 55555,
- PeerIp: []byte{35, 69, 4, 37, 44, 161, 0, 0, 0, 0, 5, 103, 86, 115, 35, 181},
- PeerPort: 12345,
- BrokerReceiveTimeUnixNano: 1357924680,
- DroppedApplicationMessageProperties: true,
- UserProperties: map[string]*receive_v1.SpanData_UserPropertyValue{
- "special_key": nil,
- },
- },
- want: map[string]any{
- "messaging.system": "SolacePubSub+",
- "messaging.operation": "receive",
- "messaging.protocol": "MQTT",
- "messaging.message_payload_size_bytes": int64(1234),
- "messaging.destination": "someTopic",
- "messaging.solace.client_username": "someClientUsername",
- "messaging.solace.client_name": "someClient1234",
- "messaging.solace.dmq_eligible": true,
- "messaging.solace.delivery_mode": "non_persistent",
- "messaging.solace.dropped_enqueue_events_success": int64(42),
- "messaging.solace.dropped_enqueue_events_failed": int64(24),
- "net.host.ip": "1.2.3.4",
- "net.host.port": int64(55555),
- "net.peer.ip": "2345:425:2ca1::567:5673:23b5",
- "net.peer.port": int64(12345),
- "messaging.solace.broker_receive_time_unix_nano": int64(1357924680),
- "messaging.solace.dropped_application_message_properties": true,
- },
- },
- {
- name: "With Some Invalid Fields",
- spanData: &receive_v1.SpanData{
- Protocol: "MQTT",
- BinaryAttachmentSize: 1000,
- XmlAttachmentSize: 200,
- MetadataSize: 34,
- ClientUsername: "someClientUsername",
- ClientName: "someClient1234",
- Topic: "someTopic",
- DeliveryMode: receive_v1.SpanData_DeliveryMode(1000),
- DmqEligible: true,
- DroppedEnqueueEventsSuccess: 42,
- DroppedEnqueueEventsFailed: 24,
- HostPort: 55555,
- PeerPort: 12345,
- BrokerReceiveTimeUnixNano: 1357924680,
- DroppedApplicationMessageProperties: true,
- Baggage: &invalidBaggageString,
- UserProperties: map[string]*receive_v1.SpanData_UserPropertyValue{
- "special_key": nil,
- },
- },
- // we no longer expect the port when the IP is not present
- want: map[string]any{
- "messaging.system": "SolacePubSub+",
- "messaging.operation": "receive",
- "messaging.protocol": "MQTT",
- "messaging.message_payload_size_bytes": int64(1234),
- "messaging.destination": "someTopic",
- "messaging.solace.client_username": "someClientUsername",
- "messaging.solace.client_name": "someClient1234",
- "messaging.solace.dmq_eligible": true,
- "messaging.solace.delivery_mode": "Unknown Delivery Mode (1000)",
- "messaging.solace.dropped_enqueue_events_success": int64(42),
- "messaging.solace.dropped_enqueue_events_failed": int64(24),
- "messaging.solace.broker_receive_time_unix_nano": int64(1357924680),
- "messaging.solace.dropped_application_message_properties": true,
- },
- // Invalid delivery mode, missing IPs, invalid baggage string
- expectedUnmarshallingErrors: 2,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- u := newTestReceiveV1Unmarshaller(t)
- actual := pcommon.NewMap()
- u.mapClientSpanAttributes(tt.spanData, actual)
- assert.Equal(t, tt.want, actual.AsRaw())
- validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.expectedUnmarshallingErrors)
- })
- }
- }
- // Validate that all event types are properly handled and appended into the span data
- func TestReceiveUnmarshallerEvents(t *testing.T) {
- someErrorString := "some error"
- somePartitionNumber := uint32(345)
- tests := []struct {
- name string
- spanData *receive_v1.SpanData
- populateExpectedSpan func(span ptrace.Span)
- unmarshallingErrors any
- }{
- { // don't expect any events when none are present in the span data
- name: "No Events",
- spanData: &receive_v1.SpanData{},
- populateExpectedSpan: func(span ptrace.Span) {},
- },
- { // when an enqueue event is present, expect it to be added to the span events
- name: "Enqueue Event Queue",
- spanData: &receive_v1.SpanData{
- EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
- {
- Dest: &receive_v1.SpanData_EnqueueEvent_QueueName{QueueName: "somequeue"},
- TimeUnixNano: 123456789,
- PartitionNumber: &somePartitionNumber,
- },
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "somequeue enqueue", 123456789, map[string]any{
- "messaging.solace.destination_type": "queue",
- "messaging.solace.rejects_all_enqueues": false,
- "messaging.solace.partition_number": 345,
- })
- },
- },
- { // when a topic endpoint enqueue event is present, expect it to be added to the span events
- name: "Enqueue Event Topic Endpoint",
- spanData: &receive_v1.SpanData{
- EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
- {
- Dest: &receive_v1.SpanData_EnqueueEvent_TopicEndpointName{TopicEndpointName: "sometopic"},
- TimeUnixNano: 123456789,
- ErrorDescription: &someErrorString,
- RejectsAllEnqueues: true,
- },
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "sometopic enqueue", 123456789, map[string]any{
- "messaging.solace.destination_type": "topic-endpoint",
- "messaging.solace.enqueue_error_message": someErrorString,
- "messaging.solace.rejects_all_enqueues": true,
- })
- },
- },
- { // when a both a queue and topic endpoint enqueue event is present, expect it to be added to the span events
- name: "Enqueue Event Queue and Topic Endpoint",
- spanData: &receive_v1.SpanData{
- EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
- {
- Dest: &receive_v1.SpanData_EnqueueEvent_QueueName{QueueName: "somequeue"},
- TimeUnixNano: 123456789,
- },
- {
- Dest: &receive_v1.SpanData_EnqueueEvent_TopicEndpointName{TopicEndpointName: "sometopic"},
- TimeUnixNano: 2345678,
- },
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "somequeue enqueue", 123456789, map[string]any{
- "messaging.solace.destination_type": "queue",
- "messaging.solace.rejects_all_enqueues": false,
- })
- populateEvent(t, span, "sometopic enqueue", 2345678, map[string]any{
- "messaging.solace.destination_type": "topic-endpoint",
- "messaging.solace.rejects_all_enqueues": false,
- })
- },
- },
- { // when an enqueue event does not have a valid dest (ie. nil)
- name: "Enqueue Event no Dest",
- spanData: &receive_v1.SpanData{
- EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
- {
- Dest: nil,
- TimeUnixNano: 123456789,
- },
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {},
- unmarshallingErrors: 1,
- },
- { // Local Transaction
- name: "Local Transaction Event",
- spanData: &receive_v1.SpanData{
- TransactionEvent: &receive_v1.SpanData_TransactionEvent{
- TimeUnixNano: 123456789,
- Type: receive_v1.SpanData_TransactionEvent_COMMIT,
- Initiator: receive_v1.SpanData_TransactionEvent_CLIENT,
- TransactionId: &receive_v1.SpanData_TransactionEvent_LocalId{
- LocalId: &receive_v1.SpanData_TransactionEvent_LocalTransactionId{
- TransactionId: 12345,
- SessionId: 67890,
- SessionName: "my-session-name",
- },
- },
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "commit", 123456789, map[string]any{
- "messaging.solace.transaction_initiator": "client",
- "messaging.solace.transaction_id": 12345,
- "messaging.solace.transacted_session_name": "my-session-name",
- "messaging.solace.transacted_session_id": 67890,
- })
- },
- },
- { // XA transaction
- name: "XA Transaction Event",
- spanData: &receive_v1.SpanData{
- TransactionEvent: &receive_v1.SpanData_TransactionEvent{
- TimeUnixNano: 123456789,
- Type: receive_v1.SpanData_TransactionEvent_END,
- Initiator: receive_v1.SpanData_TransactionEvent_ADMIN,
- TransactionId: &receive_v1.SpanData_TransactionEvent_Xid_{
- Xid: &receive_v1.SpanData_TransactionEvent_Xid{
- FormatId: 123,
- BranchQualifier: []byte{0, 8, 20, 254},
- GlobalId: []byte{128, 64, 32, 16, 8, 4, 2, 1, 0},
- },
- },
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "end", 123456789, map[string]any{
- "messaging.solace.transaction_initiator": "administrator",
- "messaging.solace.transaction_xid": "0000007b-000814fe-804020100804020100",
- })
- },
- },
- { // XA Transaction with no branch qualifier or global ID and with an error
- name: "XA Transaction Event with nil fields and error",
- spanData: &receive_v1.SpanData{
- TransactionEvent: &receive_v1.SpanData_TransactionEvent{
- TimeUnixNano: 123456789,
- Type: receive_v1.SpanData_TransactionEvent_PREPARE,
- Initiator: receive_v1.SpanData_TransactionEvent_BROKER,
- TransactionId: &receive_v1.SpanData_TransactionEvent_Xid_{
- Xid: &receive_v1.SpanData_TransactionEvent_Xid{
- FormatId: 123,
- BranchQualifier: nil,
- GlobalId: nil,
- },
- },
- ErrorDescription: &someErrorString,
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "prepare", 123456789, map[string]any{
- "messaging.solace.transaction_initiator": "broker",
- "messaging.solace.transaction_xid": "0000007b--",
- "messaging.solace.transaction_error_message": someErrorString,
- })
- },
- },
- { // Type of transaction not handled
- name: "Unknown Transaction Type and no ID",
- spanData: &receive_v1.SpanData{
- TransactionEvent: &receive_v1.SpanData_TransactionEvent{
- TimeUnixNano: 123456789,
- Type: receive_v1.SpanData_TransactionEvent_Type(12345),
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "Unknown Transaction Event (12345)", 123456789, map[string]any{
- "messaging.solace.transaction_initiator": "client",
- })
- },
- unmarshallingErrors: 2,
- },
- { // Type of ID not handled, type of initiator not handled
- name: "Unknown Transaction Initiator and no ID",
- spanData: &receive_v1.SpanData{
- TransactionEvent: &receive_v1.SpanData_TransactionEvent{
- TimeUnixNano: 123456789,
- Type: receive_v1.SpanData_TransactionEvent_ROLLBACK,
- Initiator: receive_v1.SpanData_TransactionEvent_Initiator(12345),
- TransactionId: nil,
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "rollback", 123456789, map[string]any{
- "messaging.solace.transaction_initiator": "Unknown Transaction Initiator (12345)",
- })
- },
- unmarshallingErrors: 2,
- },
- { // when a both a queue and topic endpoint enqueue event is present, expect it to be added to the span events
- name: "Multiple Events",
- spanData: &receive_v1.SpanData{
- EnqueueEvents: []*receive_v1.SpanData_EnqueueEvent{
- {
- Dest: &receive_v1.SpanData_EnqueueEvent_QueueName{QueueName: "somequeue"},
- TimeUnixNano: 123456789,
- },
- {
- Dest: &receive_v1.SpanData_EnqueueEvent_TopicEndpointName{TopicEndpointName: "sometopic"},
- TimeUnixNano: 2345678,
- RejectsAllEnqueues: true,
- },
- },
- TransactionEvent: &receive_v1.SpanData_TransactionEvent{
- TimeUnixNano: 123456789,
- Type: receive_v1.SpanData_TransactionEvent_ROLLBACK_ONLY,
- Initiator: receive_v1.SpanData_TransactionEvent_CLIENT,
- TransactionId: &receive_v1.SpanData_TransactionEvent_LocalId{
- LocalId: &receive_v1.SpanData_TransactionEvent_LocalTransactionId{
- TransactionId: 12345,
- SessionId: 67890,
- SessionName: "my-session-name",
- },
- },
- },
- },
- populateExpectedSpan: func(span ptrace.Span) {
- populateEvent(t, span, "somequeue enqueue", 123456789, map[string]any{
- "messaging.solace.destination_type": "queue",
- "messaging.solace.rejects_all_enqueues": false,
- })
- populateEvent(t, span, "sometopic enqueue", 2345678, map[string]any{
- "messaging.solace.destination_type": "topic-endpoint",
- "messaging.solace.rejects_all_enqueues": true,
- })
- populateEvent(t, span, "rollback_only", 123456789, map[string]any{
- "messaging.solace.transaction_initiator": "client",
- "messaging.solace.transaction_id": 12345,
- "messaging.solace.transacted_session_name": "my-session-name",
- "messaging.solace.transacted_session_id": 67890,
- })
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- u := newTestReceiveV1Unmarshaller(t)
- expected := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
- tt.populateExpectedSpan(expected)
- actual := ptrace.NewTraces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
- u.mapEvents(tt.spanData, actual)
- // order is nondeterministic for attributes, so we must sort to get a valid comparison
- compareSpans(t, expected, actual)
- validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.unmarshallingErrors)
- })
- }
- }
- func TestReceiveUnmarshallerRGMID(t *testing.T) {
- tests := []struct {
- name string
- in []byte
- expected string
- numErr any
- }{
- {
- name: "Valid RGMID",
- in: []byte{0x01, 0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
- expected: "rmid1:00010-40910192431-40516479-90a9c4e1",
- },
- {
- name: "Bad RGMID Version",
- in: []byte{0x02, 0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
- expected: "0200010409101924314051647990a9c4e1", // expect default behavior of hex dump
- numErr: 1,
- },
- {
- name: "Bad RGMID length",
- in: []byte{0x00, 0x01, 0x04, 0x09, 0x10, 0x19, 0x24, 0x31, 0x40, 0x51, 0x64, 0x79, 0x90, 0xa9, 0xc4, 0xe1},
- expected: "00010409101924314051647990a9c4e1", // expect default behavior of hex dump
- numErr: 1,
- },
- {
- name: "Nil RGMID",
- in: nil,
- expected: "",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- u := newTestReceiveV1Unmarshaller(t)
- actual := u.rgmidToString(tt.in)
- assert.Equal(t, tt.expected, actual)
- validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, tt.numErr)
- })
- }
- }
- func TestReceiveUnmarshallerReceiveBaggageString(t *testing.T) {
- testCases := []struct {
- name string
- baggage string
- expected func(pcommon.Map)
- errStr string
- }{
- {
- name: "Valid baggage",
- baggage: `someKey=someVal`,
- expected: func(m pcommon.Map) {
- assert.NoError(t, m.FromRaw(map[string]any{
- "messaging.solace.message.baggage.someKey": "someVal",
- }))
- },
- },
- {
- name: "Valid baggage with properties",
- baggage: `someKey=someVal;someProp=someOtherThing,someOtherKey=someOtherVal;someProp=NewProp123;someOtherProp=AnotherProp192`,
- expected: func(m pcommon.Map) {
- assert.NoError(t, m.FromRaw(map[string]any{
- "messaging.solace.message.baggage.someKey": "someVal",
- "messaging.solace.message.baggage_metadata.someKey": "someProp=someOtherThing",
- "messaging.solace.message.baggage.someOtherKey": `someOtherVal`,
- "messaging.solace.message.baggage_metadata.someOtherKey": "someProp=NewProp123;someOtherProp=AnotherProp192",
- }))
- },
- },
- {
- name: "Invalid baggage",
- baggage: `someKey"=someVal;someProp=someOtherThing`,
- errStr: "invalid key",
- },
- }
- for _, testCase := range testCases {
- t.Run(fmt.Sprintf("%T", testCase.name), func(t *testing.T) {
- actual := pcommon.NewMap()
- u := newTestReceiveV1Unmarshaller(t)
- err := u.unmarshalBaggage(actual, testCase.baggage)
- if testCase.errStr == "" {
- assert.Nil(t, err)
- } else {
- assert.ErrorContains(t, err, testCase.errStr)
- }
- if testCase.expected != nil {
- expected := pcommon.NewMap()
- testCase.expected(expected)
- assert.Equal(t, expected.AsRaw(), actual.AsRaw())
- } else {
- // assert we didn't add anything if we don't have a result map
- assert.Equal(t, 0, actual.Len())
- }
- })
- }
- }
- func TestReceiveUnmarshallerInsertUserProperty(t *testing.T) {
- emojiVal := 0xf09f92a9
- testCases := []struct {
- data any
- expectedType pcommon.ValueType
- validate func(val pcommon.Value)
- }{
- {
- &receive_v1.SpanData_UserPropertyValue_NullValue{},
- pcommon.ValueTypeEmpty,
- nil,
- },
- {
- &receive_v1.SpanData_UserPropertyValue_BoolValue{BoolValue: true},
- pcommon.ValueTypeBool,
- func(val pcommon.Value) {
- assert.Equal(t, true, val.Bool())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_DoubleValue{DoubleValue: 12.34},
- pcommon.ValueTypeDouble,
- func(val pcommon.Value) {
- assert.Equal(t, float64(12.34), val.Double())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_ByteArrayValue{ByteArrayValue: []byte{1, 2, 3, 4}},
- pcommon.ValueTypeBytes,
- func(val pcommon.Value) {
- assert.Equal(t, []byte{1, 2, 3, 4}, val.Bytes().AsRaw())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_FloatValue{FloatValue: 12.34},
- pcommon.ValueTypeDouble,
- func(val pcommon.Value) {
- assert.Equal(t, float64(float32(12.34)), val.Double())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_Int8Value{Int8Value: 8},
- pcommon.ValueTypeInt,
- func(val pcommon.Value) {
- assert.Equal(t, int64(8), val.Int())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_Int16Value{Int16Value: 16},
- pcommon.ValueTypeInt,
- func(val pcommon.Value) {
- assert.Equal(t, int64(16), val.Int())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_Int32Value{Int32Value: 32},
- pcommon.ValueTypeInt,
- func(val pcommon.Value) {
- assert.Equal(t, int64(32), val.Int())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_Int64Value{Int64Value: 64},
- pcommon.ValueTypeInt,
- func(val pcommon.Value) {
- assert.Equal(t, int64(64), val.Int())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_Uint8Value{Uint8Value: 8},
- pcommon.ValueTypeInt,
- func(val pcommon.Value) {
- assert.Equal(t, int64(8), val.Int())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_Uint16Value{Uint16Value: 16},
- pcommon.ValueTypeInt,
- func(val pcommon.Value) {
- assert.Equal(t, int64(16), val.Int())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_Uint32Value{Uint32Value: 32},
- pcommon.ValueTypeInt,
- func(val pcommon.Value) {
- assert.Equal(t, int64(32), val.Int())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_Uint64Value{Uint64Value: 64},
- pcommon.ValueTypeInt,
- func(val pcommon.Value) {
- assert.Equal(t, int64(64), val.Int())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_StringValue{StringValue: "hello world"},
- pcommon.ValueTypeStr,
- func(val pcommon.Value) {
- assert.Equal(t, "hello world", val.Str())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_DestinationValue{DestinationValue: "some_dest"},
- pcommon.ValueTypeStr,
- func(val pcommon.Value) {
- assert.Equal(t, "some_dest", val.Str())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_CharacterValue{CharacterValue: 0x61},
- pcommon.ValueTypeStr,
- func(val pcommon.Value) {
- assert.Equal(t, "a", val.Str())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_CharacterValue{CharacterValue: 0xe68080},
- pcommon.ValueTypeStr,
- func(val pcommon.Value) {
- assert.Equal(t, string(rune(0xe68080)), val.Str())
- },
- },
- {
- &receive_v1.SpanData_UserPropertyValue_CharacterValue{CharacterValue: 0xf09f92a9},
- pcommon.ValueTypeStr,
- func(val pcommon.Value) {
- assert.Equal(t, string(rune(emojiVal)), val.Str())
- },
- },
- }
- unmarshaller := &brokerTraceReceiveUnmarshallerV1{
- logger: zap.NewNop(),
- }
- for _, testCase := range testCases {
- t.Run(fmt.Sprintf("%T", testCase.data), func(t *testing.T) {
- const key = "some-property"
- attributeMap := pcommon.NewMap()
- unmarshaller.insertUserProperty(attributeMap, key, testCase.data)
- actual, ok := attributeMap.Get("messaging.solace.user_properties." + key)
- require.True(t, ok)
- assert.Equal(t, testCase.expectedType, actual.Type())
- if testCase.validate != nil {
- testCase.validate(actual)
- }
- })
- }
- }
- func TestSolaceMessageReceiveUnmarshallerV1InsertUserPropertyUnsupportedType(t *testing.T) {
- u := newTestReceiveV1Unmarshaller(t)
- const key = "some-property"
- attributeMap := pcommon.NewMap()
- u.insertUserProperty(attributeMap, key, "invalid data type")
- _, ok := attributeMap.Get("messaging.solace.user_properties." + key)
- assert.False(t, ok)
- validateMetric(t, u.metrics.views.recoverableUnmarshallingErrors, 1)
- }
- func newTestReceiveV1Unmarshaller(t *testing.T) *brokerTraceReceiveUnmarshallerV1 {
- m := newTestMetrics(t)
- return &brokerTraceReceiveUnmarshallerV1{zap.NewNop(), m}
- }
|