|
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver"
- import (
- "context"
- "errors"
- "fmt"
- "runtime"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/consumer/consumererror"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.opentelemetry.io/collector/receiver/receivertest"
- )
- // connectAndReceive with connect failure
- // connectAndReceive with lifecycle validation
- // not started, connecting, connected, terminating, terminated, idle
- func TestReceiveMessage(t *testing.T) {
- someError := errors.New("some error")
- validateMetrics := func(receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan any) func(t *testing.T, receiver *solaceTracesReceiver) {
- return func(t *testing.T, receiver *solaceTracesReceiver) {
- validateReceiverMetrics(t, receiver, receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan)
- }
- }
- cases := []struct {
- name string
- nextConsumer consumertest.Consumer
- // errors to return from messagingService.receive, unmarshaller.unmarshal, messagingService.ack and messagingService.nack
- receiveMessageErr, unmarshalErr, ackErr, nackErr error
- // whether or not to expect a nack call instead of an ack
- expectNack bool
- // expected error from receiveMessage
- expectedErr error
- // validate constraints after the fact
- validation func(t *testing.T, receiver *solaceTracesReceiver)
- // traces provided by the trace function
- traces ptrace.Traces
- }{
- { // no errors, expect no error, validate metrics
- name: "Receive Message Success",
- validation: validateMetrics(1, nil, nil, 1),
- traces: newTestTracesWithSpans(1),
- },
- { // no errors, expect no error, validate metrics
- name: "Receive Message Multiple Traces Success",
- validation: validateMetrics(1, nil, nil, 3),
- traces: newTestTracesWithSpans(3),
- },
- { // fail at receiveMessage and expect the error
- name: "Receive Messages Error",
- receiveMessageErr: someError,
- expectedErr: someError,
- validation: validateMetrics(nil, nil, nil, nil),
- },
- { // unmarshal error expecting the error to be swallowed, the message to be acknowledged, stats incremented
- name: "Unmarshal Error",
- unmarshalErr: errUnknownTopic,
- validation: validateMetrics(1, 1, 1, nil),
- },
- { // unmarshal error with wrong version expecting error to be propagated, message to be rejected
- name: "Unmarshal Version Error",
- unmarshalErr: errUpgradeRequired,
- expectedErr: errUpgradeRequired,
- expectNack: true,
- validation: validateMetrics(1, nil, 1, nil),
- },
- { // expect forward to error and message to be swallowed with ack, no error returned
- name: "Forward Permanent Error",
- nextConsumer: consumertest.NewErr(consumererror.NewPermanent(errors.New("a permanent error"))),
- validation: validateMetrics(1, 1, nil, nil),
- },
- { // expect forward to error and message to be swallowed with ack which fails returning an error
- name: "Forward Permanent Error with Ack Error",
- nextConsumer: consumertest.NewErr(consumererror.NewPermanent(errors.New("a permanent error"))),
- ackErr: someError,
- expectedErr: someError,
- validation: validateMetrics(1, 1, nil, nil),
- },
- }
- for _, testCase := range cases {
- t.Run(testCase.name, func(t *testing.T) {
- receiver, messagingService, unmarshaller := newReceiver(t)
- if testCase.nextConsumer != nil {
- receiver.nextConsumer = testCase.nextConsumer
- }
- msg := &inboundMessage{}
- // populate mock messagingService and unmarshaller functions, expecting them each to be called at most once
- var receiveMessagesCalled, ackCalled, nackCalled, unmarshalCalled bool
- messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
- assert.False(t, receiveMessagesCalled)
- receiveMessagesCalled = true
- if testCase.receiveMessageErr != nil {
- return nil, testCase.receiveMessageErr
- }
- return msg, nil
- }
- messagingService.ackFunc = func(ctx context.Context, msg *inboundMessage) error {
- assert.False(t, ackCalled)
- ackCalled = true
- if testCase.ackErr != nil {
- return testCase.ackErr
- }
- return nil
- }
- messagingService.nackFunc = func(ctx context.Context, msg *inboundMessage) error {
- assert.False(t, nackCalled)
- nackCalled = true
- if testCase.nackErr != nil {
- return testCase.nackErr
- }
- return nil
- }
- unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
- assert.False(t, unmarshalCalled)
- unmarshalCalled = true
- if testCase.unmarshalErr != nil {
- return ptrace.Traces{}, testCase.unmarshalErr
- }
- return testCase.traces, nil
- }
- err := receiver.receiveMessage(context.Background(), messagingService)
- if testCase.expectedErr != nil {
- assert.Equal(t, testCase.expectedErr, err)
- } else {
- assert.NoError(t, err)
- }
- assert.True(t, receiveMessagesCalled)
- if testCase.receiveMessageErr == nil {
- assert.True(t, unmarshalCalled)
- assert.Equal(t, testCase.expectNack, nackCalled)
- assert.Equal(t, !testCase.expectNack, ackCalled)
- }
- if testCase.validation != nil {
- testCase.validation(t, receiver)
- }
- })
- }
- }
- // receiveMessages ctx done return
- func TestReceiveMessagesTerminateWithCtxDone(t *testing.T) {
- receiver, messagingService, unmarshaller := newReceiver(t)
- receiveMessagesCalled := false
- ctx, cancel := context.WithCancel(context.Background())
- msg := &inboundMessage{}
- trace := newTestTracesWithSpans(1)
- messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
- assert.False(t, receiveMessagesCalled)
- receiveMessagesCalled = true
- return msg, nil
- }
- ackCalled := false
- messagingService.ackFunc = func(ctx context.Context, msg *inboundMessage) error {
- assert.False(t, ackCalled)
- ackCalled = true
- cancel()
- return nil
- }
- unmarshalCalled := false
- unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
- assert.False(t, unmarshalCalled)
- unmarshalCalled = true
- return trace, nil
- }
- err := receiver.receiveMessages(ctx, messagingService)
- assert.NoError(t, err)
- assert.True(t, receiveMessagesCalled)
- assert.True(t, unmarshalCalled)
- assert.True(t, ackCalled)
- validateReceiverMetrics(t, receiver, 1, nil, nil, 1)
- }
- func TestReceiverLifecycle(t *testing.T) {
- receiver, messagingService, _ := newReceiver(t)
- dialCalled := make(chan struct{})
- messagingService.dialFunc = func(context.Context) error {
- validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnecting)
- validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear)
- close(dialCalled)
- return nil
- }
- closeCalled := make(chan struct{})
- messagingService.closeFunc = func(ctx context.Context) {
- validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminating)
- close(closeCalled)
- }
- receiveMessagesCalled := make(chan struct{})
- messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
- validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnected)
- close(receiveMessagesCalled)
- <-ctx.Done()
- return nil, errors.New("some error")
- }
- // start the receiver
- err := receiver.Start(context.Background(), nil)
- assert.NoError(t, err)
- assertChannelClosed(t, dialCalled)
- assertChannelClosed(t, receiveMessagesCalled)
- err = receiver.Shutdown(context.Background())
- assert.NoError(t, err)
- assertChannelClosed(t, closeCalled)
- validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated)
- // we error on receive message, so we should not report any metrics
- validateReceiverMetrics(t, receiver, nil, nil, nil, nil)
- }
- func TestReceiverDialFailureContinue(t *testing.T) {
- receiver, msgService, _ := newReceiver(t)
- dialErr := errors.New("Some dial error")
- const expectedAttempts = 3 // the number of attempts to perform prior to resolving
- dialCalled := 0
- factoryCalled := 0
- closeCalled := 0
- dialDone := make(chan struct{})
- factoryDone := make(chan struct{})
- closeDone := make(chan struct{})
- receiver.factory = func() messagingService {
- factoryCalled++
- if factoryCalled == expectedAttempts {
- close(factoryDone)
- }
- return msgService
- }
- msgService.dialFunc = func(context.Context) error {
- dialCalled++
- if dialCalled == expectedAttempts {
- close(dialDone)
- }
- return dialErr
- }
- msgService.closeFunc = func(ctx context.Context) {
- closeCalled++
- // asset we never left connecting state prior to closing closeDone
- validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateConnecting)
- if closeCalled == expectedAttempts {
- close(closeDone)
- <-ctx.Done() // wait for ctx.Done
- }
- }
- // start the receiver
- err := receiver.Start(context.Background(), nil)
- assert.NoError(t, err)
- // expect factory to be called twice
- assertChannelClosed(t, factoryDone)
- // expect dial to be called twice
- assertChannelClosed(t, dialDone)
- // expect close to be called twice
- assertChannelClosed(t, closeDone)
- // assert failed reconnections
- validateMetric(t, receiver.metrics.views.failedReconnections, expectedAttempts)
- err = receiver.Shutdown(context.Background())
- assert.NoError(t, err)
- validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated)
- // we error on dial, should never get to receive messages
- validateReceiverMetrics(t, receiver, nil, nil, nil, nil)
- }
- func TestReceiverUnmarshalVersionFailureExpectingDisable(t *testing.T) {
- receiver, msgService, unmarshaller := newReceiver(t)
- dialDone := make(chan struct{})
- nackCalled := make(chan struct{})
- closeDone := make(chan struct{})
- unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
- return ptrace.Traces{}, errUpgradeRequired
- }
- msgService.dialFunc = func(context.Context) error {
- // after we receive an unmarshalling version error, we should not call dial again
- msgService.dialFunc = func(context.Context) error {
- t.Error("did not expect dial to be called again")
- return nil
- }
- close(dialDone)
- return nil
- }
- msgService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
- // we only expect a single receiveMessage call when unmarshal returns unknown version
- msgService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
- t.Error("did not expect receiveMessage to be called again")
- return nil, nil
- }
- return nil, nil
- }
- msgService.nackFunc = func(ctx context.Context, msg *inboundMessage) error {
- close(nackCalled)
- return nil
- }
- msgService.closeFunc = func(ctx context.Context) {
- close(closeDone)
- }
- // start the receiver
- err := receiver.Start(context.Background(), nil)
- assert.NoError(t, err)
- // expect dial to be called twice
- assertChannelClosed(t, dialDone)
- // expect nack to be called
- assertChannelClosed(t, nackCalled)
- // expect close to be called twice
- assertChannelClosed(t, closeDone)
- // we receive 1 message, encounter a fatal unmarshalling error and we nack the message so it is not actually dropped
- validateReceiverMetrics(t, receiver, 1, nil, 1, nil)
- // assert idle state
- validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateIdle)
- err = receiver.Shutdown(context.Background())
- assert.NoError(t, err)
- validateMetric(t, receiver.metrics.views.receiverStatus, receiverStateTerminated)
- }
- func TestReceiverFlowControlDelayedRetry(t *testing.T) {
- someError := consumererror.NewPermanent(fmt.Errorf("some error"))
- testCases := []struct {
- name string
- nextConsumer consumer.Traces
- validation func(*testing.T, *opencensusMetrics)
- }{
- {
- name: "Without error",
- nextConsumer: consumertest.NewNop(),
- },
- {
- name: "With error",
- nextConsumer: consumertest.NewErr(someError),
- validation: func(t *testing.T, metrics *opencensusMetrics) {
- validateMetric(t, metrics.views.droppedSpanMessages, 1)
- },
- },
- }
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- receiver, messagingService, unmarshaller := newReceiver(t)
- delay := 50 * time.Millisecond
- // Increase delay on windows due to tick granularity
- // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17197
- if runtime.GOOS == "windows" {
- delay = 500 * time.Millisecond
- }
- receiver.config.Flow.DelayedRetry.Delay = delay
- var err error
- // we want to return an error at first, then set the next consumer to a noop consumer
- receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
- receiver.nextConsumer = tc.nextConsumer
- return fmt.Errorf("Some temporary error")
- })
- require.NoError(t, err)
- // populate mock messagingService and unmarshaller functions, expecting them each to be called at most once
- var ackCalled bool
- messagingService.ackFunc = func(ctx context.Context, msg *inboundMessage) error {
- assert.False(t, ackCalled)
- ackCalled = true
- return nil
- }
- messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
- return &inboundMessage{}, nil
- }
- unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
- return ptrace.NewTraces(), nil
- }
- receiveMessageComplete := make(chan error, 1)
- go func() {
- receiveMessageComplete <- receiver.receiveMessage(context.Background(), messagingService)
- }()
- select {
- case <-time.After(delay / 2):
- // success
- case <-receiveMessageComplete:
- require.Fail(t, "Did not expect receiveMessage to return before delay interval")
- }
- // Check that we are currently flow controlled
- validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateControlled)
- // since we set the next consumer to a noop, this should succeed
- select {
- case <-time.After(delay):
- require.Fail(t, "receiveMessage did not return after delay interval")
- case err := <-receiveMessageComplete:
- assert.NoError(t, err)
- }
- assert.True(t, ackCalled)
- if tc.validation != nil {
- tc.validation(t, receiver.metrics)
- }
- validateMetric(t, receiver.metrics.views.flowControlRecentRetries, 1)
- validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear)
- validateMetric(t, receiver.metrics.views.flowControlTotal, 1)
- validateMetric(t, receiver.metrics.views.flowControlSingleSuccess, 1)
- })
- }
- }
- func TestReceiverFlowControlDelayedRetryInterrupt(t *testing.T) {
- receiver, messagingService, unmarshaller := newReceiver(t)
- // we won't wait 10 seconds since we will interrupt well before
- receiver.config.Flow.DelayedRetry.Delay = 10 * time.Second
- var err error
- // we want to return an error at first, then set the next consumer to a noop consumer
- receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
- // if we are called again, fatal
- receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
- require.Fail(t, "Did not expect next consumer to be called again")
- return nil
- })
- require.NoError(t, err)
- return fmt.Errorf("Some temporary error")
- })
- require.NoError(t, err)
- // populate mock messagingService and unmarshaller functions, expecting them each to be called at most once
- messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
- return &inboundMessage{}, nil
- }
- unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
- return ptrace.NewTraces(), nil
- }
- ctx, cancel := context.WithCancel(context.Background())
- receiveMessageComplete := make(chan error, 1)
- go func() {
- receiveMessageComplete <- receiver.receiveMessage(ctx, messagingService)
- }()
- select {
- case <-time.After(2 * time.Millisecond):
- // success
- case <-receiveMessageComplete:
- require.Fail(t, "Did not expect receiveMessage to return before delay interval")
- }
- cancel()
- // since we set the next consumer to a noop, this should succeed
- select {
- case <-time.After(2 * time.Millisecond):
- require.Fail(t, "receiveMessage did not return after some time")
- case err := <-receiveMessageComplete:
- assert.ErrorContains(t, err, "delayed retry interrupted by shutdown request")
- }
- }
- func TestReceiverFlowControlDelayedRetryMultipleRetries(t *testing.T) {
- receiver, messagingService, unmarshaller := newReceiver(t)
- // we won't wait 10 seconds since we will interrupt well before
- retryInterval := 50 * time.Millisecond
- // Increase delay on windows due to tick granularity
- // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/19409
- if runtime.GOOS == "windows" {
- retryInterval = 500 * time.Millisecond
- }
- var retryCount int64 = 5
- receiver.config.Flow.DelayedRetry.Delay = retryInterval
- var err error
- var currentRetries int64
- // we want to return an error at first, then set the next consumer to a noop consumer
- receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
- if currentRetries > 0 {
- validateMetric(t, receiver.metrics.views.flowControlRecentRetries, currentRetries)
- }
- currentRetries++
- if currentRetries == retryCount {
- receiver.nextConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error {
- return nil
- })
- }
- require.NoError(t, err)
- return fmt.Errorf("Some temporary error")
- })
- require.NoError(t, err)
- // populate mock messagingService and unmarshaller functions, expecting them each to be called at most once
- var ackCalled bool
- messagingService.ackFunc = func(ctx context.Context, msg *inboundMessage) error {
- assert.False(t, ackCalled)
- ackCalled = true
- return nil
- }
- messagingService.receiveMessageFunc = func(ctx context.Context) (*inboundMessage, error) {
- return &inboundMessage{}, nil
- }
- unmarshaller.unmarshalFunc = func(msg *inboundMessage) (ptrace.Traces, error) {
- return ptrace.NewTraces(), nil
- }
- receiveMessageComplete := make(chan error, 1)
- go func() {
- receiveMessageComplete <- receiver.receiveMessage(context.Background(), messagingService)
- }()
- select {
- case <-time.After(retryInterval * time.Duration(retryCount) / 2):
- // success
- case <-receiveMessageComplete:
- require.Fail(t, "Did not expect receiveMessage to return before delay interval")
- }
- validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateControlled)
- // since we set the next consumer to a noop, this should succeed
- select {
- case <-time.After(2 * retryInterval * time.Duration(retryCount)):
- require.Fail(t, "receiveMessage did not return after some time")
- case err := <-receiveMessageComplete:
- assert.NoError(t, err)
- }
- assert.True(t, ackCalled)
- validateMetric(t, receiver.metrics.views.flowControlRecentRetries, retryCount)
- validateMetric(t, receiver.metrics.views.flowControlStatus, flowControlStateClear)
- validateMetric(t, receiver.metrics.views.flowControlTotal, 1)
- validateMetric(t, receiver.metrics.views.flowControlSingleSuccess, nil)
- }
- func newReceiver(t *testing.T) (*solaceTracesReceiver, *mockMessagingService, *mockUnmarshaller) {
- unmarshaller := &mockUnmarshaller{}
- service := &mockMessagingService{}
- messagingServiceFactory := func() messagingService {
- return service
- }
- metrics := newTestMetrics(t)
- receiver := &solaceTracesReceiver{
- settings: receivertest.NewNopCreateSettings(),
- config: &Config{
- Flow: FlowControl{
- DelayedRetry: &FlowControlDelayedRetry{
- Delay: 10 * time.Millisecond,
- },
- },
- },
- nextConsumer: consumertest.NewNop(),
- metrics: metrics,
- unmarshaller: unmarshaller,
- factory: messagingServiceFactory,
- shutdownWaitGroup: &sync.WaitGroup{},
- retryTimeout: 1 * time.Millisecond,
- terminating: &atomic.Bool{},
- }
- return receiver, service, unmarshaller
- }
- func validateReceiverMetrics(t *testing.T, receiver *solaceTracesReceiver, receivedMsgVal, droppedMsgVal, fatalUnmarshalling, reportedSpan any) {
- validateMetric(t, receiver.metrics.views.receivedSpanMessages, receivedMsgVal)
- validateMetric(t, receiver.metrics.views.droppedSpanMessages, droppedMsgVal)
- validateMetric(t, receiver.metrics.views.fatalUnmarshallingErrors, fatalUnmarshalling)
- validateMetric(t, receiver.metrics.views.reportedSpans, reportedSpan)
- }
- type mockMessagingService struct {
- dialFunc func(ctx context.Context) error
- closeFunc func(ctx context.Context)
- receiveMessageFunc func(ctx context.Context) (*inboundMessage, error)
- ackFunc func(ctx context.Context, msg *inboundMessage) error
- nackFunc func(ctx context.Context, msg *inboundMessage) error
- }
- func (m *mockMessagingService) dial(ctx context.Context) error {
- if m.dialFunc != nil {
- return m.dialFunc(ctx)
- }
- panic("did not expect dial to be called")
- }
- func (m *mockMessagingService) close(ctx context.Context) {
- if m.closeFunc != nil {
- m.closeFunc(ctx)
- return
- }
- panic("did not expect close to be called")
- }
- func (m *mockMessagingService) receiveMessage(ctx context.Context) (*inboundMessage, error) {
- if m.receiveMessageFunc != nil {
- return m.receiveMessageFunc(ctx)
- }
- panic("did not expect receiveMessage to be called")
- }
- func (m *mockMessagingService) accept(ctx context.Context, msg *inboundMessage) error {
- if m.ackFunc != nil {
- return m.ackFunc(ctx, msg)
- }
- panic("did not expect ack to be called")
- }
- func (m *mockMessagingService) failed(ctx context.Context, msg *inboundMessage) error {
- if m.nackFunc != nil {
- return m.nackFunc(ctx, msg)
- }
- panic("did not expect nack to be called")
- }
- type mockUnmarshaller struct {
- unmarshalFunc func(msg *inboundMessage) (ptrace.Traces, error)
- }
- func (m *mockUnmarshaller) unmarshal(message *inboundMessage) (ptrace.Traces, error) {
- if m.unmarshalFunc != nil {
- return m.unmarshalFunc(message)
- }
- panic("did not expect unmarshal to be called")
- }
- func newTestTracesWithSpans(spanCount int) ptrace.Traces {
- traces := ptrace.NewTraces()
- spans := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
- for i := 0; i < spanCount; i++ {
- spans.Spans().AppendEmpty()
- }
- return traces
- }
|