123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver"
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/consumer/consumererror"
- "go.opentelemetry.io/collector/receiver"
- "go.uber.org/zap"
- )
- // solaceTracesReceiver uses azure AMQP to consume and handle telemetry data from SOlace. Implements receiver.Traces
- type solaceTracesReceiver struct {
- // config is the receiver.Config instance used to build the receiver
- config *Config
- nextConsumer consumer.Traces
- settings receiver.CreateSettings
- metrics *opencensusMetrics
- unmarshaller tracesUnmarshaller
- // cancel is the function that will cancel the context associated with the main worker loop
- cancel context.CancelFunc
- shutdownWaitGroup *sync.WaitGroup
- // newFactory is the constructor to use to build new messagingServiceFactory instances
- factory messagingServiceFactory
- // terminating is used to indicate that the receiver is terminating
- terminating *atomic.Bool
- // retryTimeout is the timeout between connection attempts
- retryTimeout time.Duration
- }
- // newTracesReceiver creates a new solaceTraceReceiver as a receiver.Traces
- func newTracesReceiver(config *Config, set receiver.CreateSettings, nextConsumer consumer.Traces) (receiver.Traces, error) {
- if nextConsumer == nil {
- set.Logger.Warn("Next consumer in pipeline is null, stopping receiver")
- return nil, component.ErrNilNextConsumer
- }
- factory, err := newAMQPMessagingServiceFactory(config, set.Logger)
- if err != nil {
- set.Logger.Warn("Error validating messaging service configuration", zap.Any("error", err))
- return nil, err
- }
- metrics, err := newOpenCensusMetrics(set.ID.Name())
- if err != nil {
- set.Logger.Warn("Error registering metrics", zap.Any("error", err))
- return nil, err
- }
- unmarshaller := newTracesUnmarshaller(set.Logger, metrics)
- return &solaceTracesReceiver{
- config: config,
- nextConsumer: nextConsumer,
- settings: set,
- metrics: metrics,
- unmarshaller: unmarshaller,
- shutdownWaitGroup: &sync.WaitGroup{},
- factory: factory,
- retryTimeout: 1 * time.Second,
- terminating: &atomic.Bool{},
- }, nil
- }
- // Start implements component.Receiver::Start
- func (s *solaceTracesReceiver) Start(_ context.Context, _ component.Host) error {
- s.metrics.recordReceiverStatus(receiverStateStarting)
- s.metrics.recordFlowControlStatus(flowControlStateClear)
- var cancelableContext context.Context
- cancelableContext, s.cancel = context.WithCancel(context.Background())
- s.settings.Logger.Info("Starting receiver")
- // start the reconnection loop with a cancellable context and a factory to build new messaging services
- go s.connectAndReceive(cancelableContext)
- s.settings.Logger.Info("Receiver successfully started")
- return nil
- }
- // Shutdown implements component.Receiver::Shutdown
- func (s *solaceTracesReceiver) Shutdown(_ context.Context) error {
- s.terminating.Store(true)
- s.metrics.recordReceiverStatus(receiverStateTerminating)
- s.settings.Logger.Info("Shutdown waiting for all components to complete")
- s.cancel() // cancels the context passed to the reconneciton loop
- s.shutdownWaitGroup.Wait()
- s.settings.Logger.Info("Receiver shutdown successfully")
- s.metrics.recordReceiverStatus(receiverStateTerminated)
- return nil
- }
- func (s *solaceTracesReceiver) connectAndReceive(ctx context.Context) {
- // indicate we are started in the reconnection loop
- s.shutdownWaitGroup.Add(1)
- defer func() {
- s.settings.Logger.Info("Reconnection loop completed successfully")
- s.shutdownWaitGroup.Done()
- }()
- s.settings.Logger.Info("Starting reconnection and consume loop")
- disable := false
- // indicate we are in connecting state at the start
- s.metrics.recordReceiverStatus(receiverStateConnecting)
- reconnectionLoop:
- for !disable {
- // check that we are not shutting down prior to the dial attempt
- select {
- case <-ctx.Done():
- s.settings.Logger.Debug("Received loop shutdown request")
- break reconnectionLoop
- default:
- }
- // create a new connection within the closure to defer the service.close
- func() {
- defer func() {
- // if the receiver is disabled, record the idle state, otherwise record the connecting state
- if disable {
- s.recordConnectionState(receiverStateIdle)
- } else {
- s.recordConnectionState(receiverStateConnecting)
- }
- }()
- service := s.factory()
- defer service.close(ctx)
- if err := service.dial(ctx); err != nil {
- s.settings.Logger.Debug("Encountered error while connecting messaging service", zap.Error(err))
- s.metrics.recordFailedReconnection()
- return
- }
- // dial was successful, record the connected state
- s.recordConnectionState(receiverStateConnected)
- if err := s.receiveMessages(ctx, service); err != nil {
- s.settings.Logger.Debug("Encountered error while receiving messages", zap.Error(err))
- if errors.Is(err, errUpgradeRequired) {
- s.metrics.recordNeedUpgrade()
- disable = true
- return
- }
- }
- }()
- // sleep will be interrupted if ctx.Done() is closed
- sleep(ctx, s.retryTimeout)
- }
- }
- // recordConnectionState will record the given connection state unless in the terminating state.
- // This does not fully prevent the state transitions terminating->(state)->terminated but
- // is a best effort without mutex protection and additional state tracking, and in reality if
- // this state transition were to happen, it would be short lived.
- func (s *solaceTracesReceiver) recordConnectionState(state receiverState) {
- if !s.terminating.Load() {
- s.metrics.recordReceiverStatus(state)
- }
- }
- // receiveMessages will continuously receive, unmarshal and propagate messages
- func (s *solaceTracesReceiver) receiveMessages(ctx context.Context, service messagingService) error {
- for {
- select { // ctx.Done will be closed when we should terminate
- case <-ctx.Done():
- return nil
- default:
- }
- // any error encountered will be returned to caller
- if err := s.receiveMessage(ctx, service); err != nil {
- return err
- }
- }
- }
- // receiveMessage is the heart of the receiver's control flow. It will receive messages, unmarshal the message and forward the trace.
- // Will return an error if a fatal error occurs. It is expected that any error returned will cause a connection close.
- func (s *solaceTracesReceiver) receiveMessage(ctx context.Context, service messagingService) (err error) {
- msg, err := service.receiveMessage(ctx)
- if err != nil {
- s.settings.Logger.Warn("Failed to receive message from messaging service", zap.Error(err))
- return err // propagate any receive message error up to caller
- }
- // only set the disposition action after we have received a message successfully
- disposition := service.accept
- defer func() { // on return of receiveMessage, we want to either ack or nack the message
- if disposition != nil {
- if actionErr := disposition(ctx, msg); err == nil && actionErr != nil {
- err = actionErr
- }
- }
- }()
- // message received successfully
- s.metrics.recordReceivedSpanMessages()
- // unmarshal the message. unmarshalling errors are not fatal unless the version is unknown
- traces, unmarshalErr := s.unmarshaller.unmarshal(msg)
- if unmarshalErr != nil {
- s.settings.Logger.Error("Encountered error while unmarshalling message", zap.Error(unmarshalErr))
- s.metrics.recordFatalUnmarshallingError()
- if errors.Is(unmarshalErr, errUpgradeRequired) {
- disposition = service.failed // if we don't know the version, reject the trace message since we will disable the receiver
- return unmarshalErr
- }
- s.metrics.recordDroppedSpanMessages() // if the error is some other unmarshalling error, we will ack the message and drop the content
- return nil // don't propagate error, but don't continue forwarding traces
- }
- var flowControlCount int64
- flowControlLoop:
- for {
- // forward to next consumer. Forwarding errors are not fatal so are not propagated to the caller.
- // Temporary consumer errors will lead to redelivered messages, permanent will be accepted
- forwardErr := s.nextConsumer.ConsumeTraces(ctx, traces)
- if forwardErr != nil {
- if !consumererror.IsPermanent(forwardErr) {
- s.settings.Logger.Info("Encountered temporary error while forwarding traces to next receiver, will allow redelivery", zap.Error(forwardErr))
- // handle flow control metrics
- if flowControlCount == 0 {
- s.metrics.recordFlowControlStatus(flowControlStateControlled)
- }
- flowControlCount++
- s.metrics.recordFlowControlRecentRetries(flowControlCount)
- // Backpressure scenario. For now, we are only delayed retry, eventually we may need to handle this
- delayTimer := time.NewTimer(s.config.Flow.DelayedRetry.Delay)
- select {
- case <-delayTimer.C:
- continue flowControlLoop
- case <-ctx.Done():
- s.settings.Logger.Info("Context was cancelled while attempting redelivery, exiting")
- disposition = nil // do not make any network requests, we are shutting down
- return fmt.Errorf("delayed retry interrupted by shutdown request")
- }
- } else { // error is permanent, we want to accept the message and increment the number of dropped messages
- s.settings.Logger.Warn("Encountered permanent error while forwarding traces to next receiver, will swallow trace", zap.Error(forwardErr))
- s.metrics.recordDroppedSpanMessages()
- break flowControlLoop
- }
- } else {
- // no forward error
- s.metrics.recordReportedSpans(int64(traces.SpanCount()))
- break flowControlLoop
- }
- }
- // Make sure to clear the stats no matter what, unless we were interrupted in which case we should preserve the last state
- if flowControlCount != 0 {
- s.metrics.recordFlowControlStatus(flowControlStateClear)
- s.metrics.recordFlowControlTotal()
- if flowControlCount == 1 {
- s.metrics.recordFlowControlSingleSuccess()
- }
- }
- return nil
- }
- func sleep(ctx context.Context, d time.Duration) {
- timer := time.NewTimer(d)
- select {
- case <-ctx.Done():
- timer.Stop()
- case <-timer.C:
- }
- }
|