receiver_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "runtime"
  9. "sync"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. "go.opentelemetry.io/collector/consumer"
  16. "go.opentelemetry.io/collector/consumer/consumererror"
  17. "go.opentelemetry.io/collector/consumer/consumertest"
  18. "go.opentelemetry.io/collector/pdata/ptrace"
  19. "go.opentelemetry.io/collector/receiver/receivertest"
  20. )
  21. // connectAndReceive with connect failure
  22. // connectAndReceive with lifecycle validation
  23. // not started, connecting, connected, terminating, terminated, idle
  24. func TestReceiveMessage(t *testing.T) {
  25. someError := errors.New("some error")
  26. validateMetrics := func(receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan any) func(t *testing.T, receiver *solaceTracesReceiver) {
  27. return func(t *testing.T, receiver *solaceTracesReceiver) {
  28. validateReceiverMetrics(t, receiver, receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan)
  29. }
  30. }
  31. cases := []struct {
  32. name string
  33. nextConsumer consumertest.Consumer
  34. // errors to return from messagingService.receive, unmarshaller.unmarshal, messagingService.ack and messagingService.nack
  35. receiveMessageErr, unmarshalErr, ackErr, nackErr error
  36. // whether or not to expect a nack call instead of an ack
  37. expectNack bool
  38. // expected error from receiveMessage
  39. expectedErr error
  40. // validate constraints after the fact
  41. validation func(t *testing.T, receiver *solaceTracesReceiver)
  42. // traces provided by the trace function
  43. traces ptrace.Traces
  44. }{
  45. { // no errors, expect no error, validate metrics
  46. name: "Receive Message Success",
  47. validation: validateMetrics(1, nil, nil, 1),
  48. traces: newTestTracesWithSpans(1),
  49. },
  50. { // no errors, expect no error, validate metrics
  51. name: "Receive Message Multiple Traces Success",
  52. validation: validateMetrics(1, nil, nil, 3),
  53. traces: newTestTracesWithSpans(3),
  54. },
  55. { // fail at receiveMessage and expect the error
  56. name: "Receive Messages Error",
  57. receiveMessageErr: someError,
  58. expectedErr: someError,
  59. validation: validateMetrics(nil, nil, nil, nil),
  60. },
  61. { // unmarshal error expecting the error to be swallowed, the message to be acknowledged, stats incremented
  62. name: "Unmarshal Error",
  63. unmarshalErr: errUnknownTopic,
  64. validation: validateMetrics(1, 1, 1, nil),
  65. },
  66. { // unmarshal error with wrong version expecting error to be propagated, message to be rejected
  67. name: "Unmarshal Version Error",
  68. unmarshalErr: errUpgradeRequired,
  69. expectedErr: errUpgradeRequired,
  70. expectNack: true,
  71. validation: validateMetrics(1, nil, 1, nil),
  72. },
  73. { // expect forward to error and message to be swallowed with ack, no error returned
  74. name: "Forward Permanent Error",
  75. nextConsumer: consumertest.NewErr(consumererror.NewPermanent(errors.New("a permanent error"))),
  76. validation: validateMetrics(1, 1, nil, nil),
  77. },
  78. { // expect forward to error and message to be swallowed with ack which fails returning an error
  79. name: "Forward Permanent Error with Ack Error",
  80. nextConsumer: consumertest.NewErr(consumererror.NewPermanent(errors.New("a permanent error"))),
  81. ackErr: someError,
  82. expectedErr: someError,
  83. validation: validateMetrics(1, 1, nil, nil),
  84. },
  85. }
  86. for _, testCase := range cases {
  87. t.Run(testCase.name, func(t *testing.T) {
  88. receiver, messagingService, unmarshaller := newReceiver(t)
  89. if testCase.nextConsumer != nil {
  90. receiver.nextConsumer = testCase.nextConsumer
  91. }
  92. msg := &inboundMessage{}
  93. // populate mock messagingService and unmarshaller functions, expecting them each to be called at most once
  94. var receiveMessagesCalled, ackCalled, nackCalled, unmarshalCalled bool
  95. messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
  96. assert.False(t, receiveMessagesCalled)
  97. receiveMessagesCalled = true
  98. if testCase.receiveMessageErr != nil {
  99. return nil, testCase.receiveMessageErr
  100. }
  101. return msg, nil
  102. }
  103. messagingService.ackFunc = func(ctx context.Context, msg *inboundMessage) error {
  104. assert.False(t, ackCalled)
  105. ackCalled = true
  106. if testCase.ackErr != nil {
  107. return testCase.ackErr
  108. }
  109. return nil
  110. }
  111. messagingService.nackFunc = func(ctx context.Context, msg *inboundMessage) error {
  112. assert.False(t, nackCalled)
  113. nackCalled = true
  114. if testCase.nackErr != nil {
  115. return testCase.nackErr
  116. }
  117. return nil
  118. }
  119. unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
  120. assert.False(t, unmarshalCalled)
  121. unmarshalCalled = true
  122. if testCase.unmarshalErr != nil {
  123. return ptrace.Traces{}, testCase.unmarshalErr
  124. }
  125. return testCase.traces, nil
  126. }
  127. err := receiver.receiveMessage(context.Background(), messagingService)
  128. if testCase.expectedErr != nil {
  129. assert.Equal(t, testCase.expectedErr, err)
  130. } else {
  131. assert.NoError(t, err)
  132. }
  133. assert.True(t, receiveMessagesCalled)
  134. if testCase.receiveMessageErr == nil {
  135. assert.True(t, unmarshalCalled)
  136. assert.Equal(t, testCase.expectNack, nackCalled)
  137. assert.Equal(t, !testCase.expectNack, ackCalled)
  138. }
  139. if testCase.validation != nil {
  140. testCase.validation(t, receiver)
  141. }
  142. })
  143. }
  144. }
  145. // receiveMessages ctx done return
  146. func TestReceiveMessagesTerminateWithCtxDone(t *testing.T) {
  147. receiver, messagingService, unmarshaller := newReceiver(t)
  148. receiveMessagesCalled := false
  149. ctx, cancel := context.WithCancel(context.Background())
  150. msg := &inboundMessage{}
  151. trace := newTestTracesWithSpans(1)
  152. messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
  153. assert.False(t, receiveMessagesCalled)
  154. receiveMessagesCalled = true
  155. return msg, nil
  156. }
  157. ackCalled := false
  158. messagingService.ackFunc = func(ctx context.Context, msg *inboundMessage) error {
  159. assert.False(t, ackCalled)
  160. ackCalled = true
  161. cancel()
  162. return nil
  163. }
  164. unmarshalCalled := false
  165. unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
  166. assert.False(t, unmarshalCalled)
  167. unmarshalCalled = true
  168. return trace, nil
  169. }
  170. err := receiver.receiveMessages(ctx, messagingService)
  171. assert.NoError(t, err)
  172. assert.True(t, receiveMessagesCalled)
  173. assert.True(t, unmarshalCalled)
  174. assert.True(t, ackCalled)
  175. validateReceiverMetrics(t, receiver, 1, nil, nil, 1)
  176. }
  177. func TestReceiverLifecycle(t *testing.T) {
  178. receiver, messagingService, _ := newReceiver(t)
  179. dialCalled := make(chan struct{})
  180. messagingService.dialFunc = func(context.Context) error {
  181. validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnecting)
  182. validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear)
  183. close(dialCalled)
  184. return nil
  185. }
  186. closeCalled := make(chan struct{})
  187. messagingService.closeFunc = func(ctx context.Context) {
  188. validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminating)
  189. close(closeCalled)
  190. }
  191. receiveMessagesCalled := make(chan struct{})
  192. messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
  193. validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnected)
  194. close(receiveMessagesCalled)
  195. <-ctx.Done()
  196. return nil, errors.New("some error")
  197. }
  198. // start the receiver
  199. err := receiver.Start(context.Background(), nil)
  200. assert.NoError(t, err)
  201. assertChannelClosed(t, dialCalled)
  202. assertChannelClosed(t, receiveMessagesCalled)
  203. err = receiver.Shutdown(context.Background())
  204. assert.NoError(t, err)
  205. assertChannelClosed(t, closeCalled)
  206. validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated)
  207. // we error on receive message, so we should not report any metrics
  208. validateReceiverMetrics(t, receiver, nil, nil, nil, nil)
  209. }
  210. func TestReceiverDialFailureContinue(t *testing.T) {
  211. receiver, msgService, _ := newReceiver(t)
  212. dialErr := errors.New("Some dial error")
  213. const expectedAttempts = 3 // the number of attempts to perform prior to resolving
  214. dialCalled := 0
  215. factoryCalled := 0
  216. closeCalled := 0
  217. dialDone := make(chan struct{})
  218. factoryDone := make(chan struct{})
  219. closeDone := make(chan struct{})
  220. receiver.factory = func() messagingService {
  221. factoryCalled++
  222. if factoryCalled == expectedAttempts {
  223. close(factoryDone)
  224. }
  225. return msgService
  226. }
  227. msgService.dialFunc = func(context.Context) error {
  228. dialCalled++
  229. if dialCalled == expectedAttempts {
  230. close(dialDone)
  231. }
  232. return dialErr
  233. }
  234. msgService.closeFunc = func(ctx context.Context) {
  235. closeCalled++
  236. // asset we never left connecting state prior to closing closeDone
  237. validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnecting)
  238. if closeCalled == expectedAttempts {
  239. close(closeDone)
  240. <-ctx.Done() // wait for ctx.Done
  241. }
  242. }
  243. // start the receiver
  244. err := receiver.Start(context.Background(), nil)
  245. assert.NoError(t, err)
  246. // expect factory to be called twice
  247. assertChannelClosed(t, factoryDone)
  248. // expect dial to be called twice
  249. assertChannelClosed(t, dialDone)
  250. // expect close to be called twice
  251. assertChannelClosed(t, closeDone)
  252. // assert failed reconnections
  253. validateMetric(t, receiver.metrics.views.failedReconnections, expectedAttempts)
  254. err = receiver.Shutdown(context.Background())
  255. assert.NoError(t, err)
  256. validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated)
  257. // we error on dial, should never get to receive messages
  258. validateReceiverMetrics(t, receiver, nil, nil, nil, nil)
  259. }
  260. func TestReceiverUnmarshalVersionFailureExpectingDisable(t *testing.T) {
  261. receiver, msgService, unmarshaller := newReceiver(t)
  262. dialDone := make(chan struct{})
  263. nackCalled := make(chan struct{})
  264. closeDone := make(chan struct{})
  265. unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
  266. return ptrace.Traces{}, errUpgradeRequired
  267. }
  268. msgService.dialFunc = func(context.Context) error {
  269. // after we receive an unmarshalling version error, we should not call dial again
  270. msgService.dialFunc = func(context.Context) error {
  271. t.Error("did not expect dial to be called again")
  272. return nil
  273. }
  274. close(dialDone)
  275. return nil
  276. }
  277. msgService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
  278. // we only expect a single receiveMessage call when unmarshal returns unknown version
  279. msgService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
  280. t.Error("did not expect receiveMessage to be called again")
  281. return nil, nil
  282. }
  283. return nil, nil
  284. }
  285. msgService.nackFunc = func(ctx context.Context, msg *inboundMessage) error {
  286. close(nackCalled)
  287. return nil
  288. }
  289. msgService.closeFunc = func(ctx context.Context) {
  290. close(closeDone)
  291. }
  292. // start the receiver
  293. err := receiver.Start(context.Background(), nil)
  294. assert.NoError(t, err)
  295. // expect dial to be called twice
  296. assertChannelClosed(t, dialDone)
  297. // expect nack to be called
  298. assertChannelClosed(t, nackCalled)
  299. // expect close to be called twice
  300. assertChannelClosed(t, closeDone)
  301. // we receive 1 message, encounter a fatal unmarshalling error and we nack the message so it is not actually dropped
  302. validateReceiverMetrics(t, receiver, 1, nil, 1, nil)
  303. // assert idle state
  304. validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateIdle)
  305. err = receiver.Shutdown(context.Background())
  306. assert.NoError(t, err)
  307. validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated)
  308. }
  309. func TestReceiverFlowControlDelayedRetry(t *testing.T) {
  310. someError := consumererror.NewPermanent(fmt.Errorf("some error"))
  311. testCases := []struct {
  312. name string
  313. nextConsumer consumer.Traces
  314. validation func(*testing.T, *opencensusMetrics)
  315. }{
  316. {
  317. name: "Without error",
  318. nextConsumer: consumertest.NewNop(),
  319. },
  320. {
  321. name: "With error",
  322. nextConsumer: consumertest.NewErr(someError),
  323. validation: func(t *testing.T, metrics *opencensusMetrics) {
  324. validateMetric(t, metrics.views.droppedSpanMessages, 1)
  325. },
  326. },
  327. }
  328. for _, tc := range testCases {
  329. t.Run(tc.name, func(t *testing.T) {
  330. receiver, messagingService, unmarshaller := newReceiver(t)
  331. delay := 50 * time.Millisecond
  332. // Increase delay on windows due to tick granularity
  333. // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17197
  334. if runtime.GOOS == "windows" {
  335. delay = 500 * time.Millisecond
  336. }
  337. receiver.config.Flow.DelayedRetry.Delay = delay
  338. var err error
  339. // we want to return an error at first, then set the next consumer to a noop consumer
  340. receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
  341. receiver.nextConsumer = tc.nextConsumer
  342. return fmt.Errorf("Some temporary error")
  343. })
  344. require.NoError(t, err)
  345. // populate mock messagingService and unmarshaller functions, expecting them each to be called at most once
  346. var ackCalled bool
  347. messagingService.ackFunc = func(ctx context.Context, msg *inboundMessage) error {
  348. assert.False(t, ackCalled)
  349. ackCalled = true
  350. return nil
  351. }
  352. messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
  353. return &inboundMessage{}, nil
  354. }
  355. unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
  356. return ptrace.NewTraces(), nil
  357. }
  358. receiveMessageComplete := make(chan error, 1)
  359. go func() {
  360. receiveMessageComplete <- receiver.receiveMessage(context.Background(), messagingService)
  361. }()
  362. select {
  363. case <-time.After(delay / 2):
  364. // success
  365. case <-receiveMessageComplete:
  366. require.Fail(t, "Did not expect receiveMessage to return before delay interval")
  367. }
  368. // Check that we are currently flow controlled
  369. validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateControlled)
  370. // since we set the next consumer to a noop, this should succeed
  371. select {
  372. case <-time.After(delay):
  373. require.Fail(t, "receiveMessage did not return after delay interval")
  374. case err := <-receiveMessageComplete:
  375. assert.NoError(t, err)
  376. }
  377. assert.True(t, ackCalled)
  378. if tc.validation != nil {
  379. tc.validation(t, receiver.metrics)
  380. }
  381. validateMetric(t, receiver.metrics.views.flowControlRecentRetries, 1)
  382. validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear)
  383. validateMetric(t, receiver.metrics.views.flowControlTotal, 1)
  384. validateMetric(t, receiver.metrics.views.flowControlSingleSuccess, 1)
  385. })
  386. }
  387. }
  388. func TestReceiverFlowControlDelayedRetryInterrupt(t *testing.T) {
  389. receiver, messagingService, unmarshaller := newReceiver(t)
  390. // we won't wait 10 seconds since we will interrupt well before
  391. receiver.config.Flow.DelayedRetry.Delay = 10 * time.Second
  392. var err error
  393. // we want to return an error at first, then set the next consumer to a noop consumer
  394. receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
  395. // if we are called again, fatal
  396. receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
  397. require.Fail(t, "Did not expect next consumer to be called again")
  398. return nil
  399. })
  400. require.NoError(t, err)
  401. return fmt.Errorf("Some temporary error")
  402. })
  403. require.NoError(t, err)
  404. // populate mock messagingService and unmarshaller functions, expecting them each to be called at most once
  405. messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
  406. return &inboundMessage{}, nil
  407. }
  408. unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
  409. return ptrace.NewTraces(), nil
  410. }
  411. ctx, cancel := context.WithCancel(context.Background())
  412. receiveMessageComplete := make(chan error, 1)
  413. go func() {
  414. receiveMessageComplete <- receiver.receiveMessage(ctx, messagingService)
  415. }()
  416. select {
  417. case <-time.After(2 * time.Millisecond):
  418. // success
  419. case <-receiveMessageComplete:
  420. require.Fail(t, "Did not expect receiveMessage to return before delay interval")
  421. }
  422. cancel()
  423. // since we set the next consumer to a noop, this should succeed
  424. select {
  425. case <-time.After(2 * time.Millisecond):
  426. require.Fail(t, "receiveMessage did not return after some time")
  427. case err := <-receiveMessageComplete:
  428. assert.ErrorContains(t, err, "delayed retry interrupted by shutdown request")
  429. }
  430. }
  431. func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) {
  432. receiver, messagingService, unmarshaller := newReceiver(t)
  433. // we won't wait 10 seconds since we will interrupt well before
  434. retryInterval := 50 * time.Millisecond
  435. // Increase delay on windows due to tick granularity
  436. // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/19409
  437. if runtime.GOOS == "windows" {
  438. retryInterval = 500 * time.Millisecond
  439. }
  440. var retryCount int64 = 5
  441. receiver.config.Flow.DelayedRetry.Delay = retryInterval
  442. var err error
  443. var currentRetries int64
  444. // we want to return an error at first, then set the next consumer to a noop consumer
  445. receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
  446. if currentRetries > 0 {
  447. validateMetric(t, receiver.metrics.views.flowControlRecentRetries, currentRetries)
  448. }
  449. currentRetries++
  450. if currentRetries == retryCount {
  451. receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
  452. return nil
  453. })
  454. }
  455. require.NoError(t, err)
  456. return fmt.Errorf("Some temporary error")
  457. })
  458. require.NoError(t, err)
  459. // populate mock messagingService and unmarshaller functions, expecting them each to be called at most once
  460. var ackCalled bool
  461. messagingService.ackFunc = func(ctx context.Context, msg *inboundMessage) error {
  462. assert.False(t, ackCalled)
  463. ackCalled = true
  464. return nil
  465. }
  466. messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
  467. return &inboundMessage{}, nil
  468. }
  469. unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
  470. return ptrace.NewTraces(), nil
  471. }
  472. receiveMessageComplete := make(chan error, 1)
  473. go func() {
  474. receiveMessageComplete <- receiver.receiveMessage(context.Background(), messagingService)
  475. }()
  476. select {
  477. case <-time.After(retryInterval * time.Duration(retryCount) / 2):
  478. // success
  479. case <-receiveMessageComplete:
  480. require.Fail(t, "Did not expect receiveMessage to return before delay interval")
  481. }
  482. validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateControlled)
  483. // since we set the next consumer to a noop, this should succeed
  484. select {
  485. case <-time.After(2 * retryInterval * time.Duration(retryCount)):
  486. require.Fail(t, "receiveMessage did not return after some time")
  487. case err := <-receiveMessageComplete:
  488. assert.NoError(t, err)
  489. }
  490. assert.True(t, ackCalled)
  491. validateMetric(t, receiver.metrics.views.flowControlRecentRetries, retryCount)
  492. validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear)
  493. validateMetric(t, receiver.metrics.views.flowControlTotal, 1)
  494. validateMetric(t, receiver.metrics.views.flowControlSingleSuccess, nil)
  495. }
  496. func newReceiver(t *testing.T) (*solaceTracesReceiver, *mockMessagingService, *mockUnmarshaller) {
  497. unmarshaller := &mockUnmarshaller{}
  498. service := &mockMessagingService{}
  499. messagingServiceFactory := func() messagingService {
  500. return service
  501. }
  502. metrics := newTestMetrics(t)
  503. receiver := &solaceTracesReceiver{
  504. settings: receivertest.NewNopCreateSettings(),
  505. config: &Config{
  506. Flow: FlowControl{
  507. DelayedRetry: &FlowControlDelayedRetry{
  508. Delay: 10 * time.Millisecond,
  509. },
  510. },
  511. },
  512. nextConsumer: consumertest.NewNop(),
  513. metrics: metrics,
  514. unmarshaller: unmarshaller,
  515. factory: messagingServiceFactory,
  516. shutdownWaitGroup: &sync.WaitGroup{},
  517. retryTimeout: 1 * time.Millisecond,
  518. terminating: &atomic.Bool{},
  519. }
  520. return receiver, service, unmarshaller
  521. }
  522. func validateReceiverMetrics(t *testing.T, receiver *solaceTracesReceiver, receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan any) {
  523. validateMetric(t, receiver.metrics.views.receivedSpanMessages, receivedMsgVal)
  524. validateMetric(t, receiver.metrics.views.droppedSpanMessages, droppedMsgVal)
  525. validateMetric(t, receiver.metrics.views.fatalUnmarshallingErrors, fatalUnmarshalling)
  526. validateMetric(t, receiver.metrics.views.reportedSpans, reportedSpan)
  527. }
  528. type mockMessagingService struct {
  529. dialFunc func(ctx context.Context) error
  530. closeFunc func(ctx context.Context)
  531. receiveMessageFunc func(ctx context.Context) (*inboundMessage, error)
  532. ackFunc func(ctx context.Context, msg *inboundMessage) error
  533. nackFunc func(ctx context.Context, msg *inboundMessage) error
  534. }
  535. func (m *mockMessagingService) dial(ctx context.Context) error {
  536. if m.dialFunc != nil {
  537. return m.dialFunc(ctx)
  538. }
  539. panic("did not expect dial to be called")
  540. }
  541. func (m *mockMessagingService) close(ctx context.Context) {
  542. if m.closeFunc != nil {
  543. m.closeFunc(ctx)
  544. return
  545. }
  546. panic("did not expect close to be called")
  547. }
  548. func (m *mockMessagingService) receiveMessage(ctx context.Context) (*inboundMessage, error) {
  549. if m.receiveMessageFunc != nil {
  550. return m.receiveMessageFunc(ctx)
  551. }
  552. panic("did not expect receiveMessage to be called")
  553. }
  554. func (m *mockMessagingService) accept(ctx context.Context, msg *inboundMessage) error {
  555. if m.ackFunc != nil {
  556. return m.ackFunc(ctx, msg)
  557. }
  558. panic("did not expect ack to be called")
  559. }
  560. func (m *mockMessagingService) failed(ctx context.Context, msg *inboundMessage) error {
  561. if m.nackFunc != nil {
  562. return m.nackFunc(ctx, msg)
  563. }
  564. panic("did not expect nack to be called")
  565. }
  566. type mockUnmarshaller struct {
  567. unmarshalFunc func(msg *inboundMessage) (ptrace.Traces, error)
  568. }
  569. func (m *mockUnmarshaller) unmarshal(message *inboundMessage) (ptrace.Traces, error) {
  570. if m.unmarshalFunc != nil {
  571. return m.unmarshalFunc(message)
  572. }
  573. panic("did not expect unmarshal to be called")
  574. }
  575. func newTestTracesWithSpans(spanCount int) ptrace.Traces {
  576. traces := ptrace.NewTraces()
  577. spans := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
  578. for i := 0; i < spanCount; i++ {
  579. spans.Spans().AppendEmpty()
  580. }
  581. return traces
  582. }