receiver.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "go.opentelemetry.io/collector/component"
  12. "go.opentelemetry.io/collector/consumer"
  13. "go.opentelemetry.io/collector/consumer/consumererror"
  14. "go.opentelemetry.io/collector/receiver"
  15. "go.uber.org/zap"
  16. )
  17. // solaceTracesReceiver uses azure AMQP to consume and handle telemetry data from SOlace. Implements receiver.Traces
  18. type solaceTracesReceiver struct {
  19. // config is the receiver.Config instance used to build the receiver
  20. config *Config
  21. nextConsumer consumer.Traces
  22. settings receiver.CreateSettings
  23. metrics *opencensusMetrics
  24. unmarshaller tracesUnmarshaller
  25. // cancel is the function that will cancel the context associated with the main worker loop
  26. cancel context.CancelFunc
  27. shutdownWaitGroup *sync.WaitGroup
  28. // newFactory is the constructor to use to build new messagingServiceFactory instances
  29. factory messagingServiceFactory
  30. // terminating is used to indicate that the receiver is terminating
  31. terminating *atomic.Bool
  32. // retryTimeout is the timeout between connection attempts
  33. retryTimeout time.Duration
  34. }
  35. // newTracesReceiver creates a new solaceTraceReceiver as a receiver.Traces
  36. func newTracesReceiver(config *Config, set receiver.CreateSettings, nextConsumer consumer.Traces) (receiver.Traces, error) {
  37. if nextConsumer == nil {
  38. set.Logger.Warn("Next consumer in pipeline is null, stopping receiver")
  39. return nil, component.ErrNilNextConsumer
  40. }
  41. factory, err := newAMQPMessagingServiceFactory(config, set.Logger)
  42. if err != nil {
  43. set.Logger.Warn("Error validating messaging service configuration", zap.Any("error", err))
  44. return nil, err
  45. }
  46. metrics, err := newOpenCensusMetrics(set.ID.Name())
  47. if err != nil {
  48. set.Logger.Warn("Error registering metrics", zap.Any("error", err))
  49. return nil, err
  50. }
  51. unmarshaller := newTracesUnmarshaller(set.Logger, metrics)
  52. return &solaceTracesReceiver{
  53. config: config,
  54. nextConsumer: nextConsumer,
  55. settings: set,
  56. metrics: metrics,
  57. unmarshaller: unmarshaller,
  58. shutdownWaitGroup: &sync.WaitGroup{},
  59. factory: factory,
  60. retryTimeout: 1 * time.Second,
  61. terminating: &atomic.Bool{},
  62. }, nil
  63. }
  64. // Start implements component.Receiver::Start
  65. func (s *solaceTracesReceiver) Start(_ context.Context, _ component.Host) error {
  66. s.metrics.recordReceiverStatus(receiverStateStarting)
  67. s.metrics.recordFlowControlStatus(flowControlStateClear)
  68. var cancelableContext context.Context
  69. cancelableContext, s.cancel = context.WithCancel(context.Background())
  70. s.settings.Logger.Info("Starting receiver")
  71. // start the reconnection loop with a cancellable context and a factory to build new messaging services
  72. go s.connectAndReceive(cancelableContext)
  73. s.settings.Logger.Info("Receiver successfully started")
  74. return nil
  75. }
  76. // Shutdown implements component.Receiver::Shutdown
  77. func (s *solaceTracesReceiver) Shutdown(_ context.Context) error {
  78. s.terminating.Store(true)
  79. s.metrics.recordReceiverStatus(receiverStateTerminating)
  80. s.settings.Logger.Info("Shutdown waiting for all components to complete")
  81. s.cancel() // cancels the context passed to the reconneciton loop
  82. s.shutdownWaitGroup.Wait()
  83. s.settings.Logger.Info("Receiver shutdown successfully")
  84. s.metrics.recordReceiverStatus(receiverStateTerminated)
  85. return nil
  86. }
  87. func (s *solaceTracesReceiver) connectAndReceive(ctx context.Context) {
  88. // indicate we are started in the reconnection loop
  89. s.shutdownWaitGroup.Add(1)
  90. defer func() {
  91. s.settings.Logger.Info("Reconnection loop completed successfully")
  92. s.shutdownWaitGroup.Done()
  93. }()
  94. s.settings.Logger.Info("Starting reconnection and consume loop")
  95. disable := false
  96. // indicate we are in connecting state at the start
  97. s.metrics.recordReceiverStatus(receiverStateConnecting)
  98. reconnectionLoop:
  99. for !disable {
  100. // check that we are not shutting down prior to the dial attempt
  101. select {
  102. case <-ctx.Done():
  103. s.settings.Logger.Debug("Received loop shutdown request")
  104. break reconnectionLoop
  105. default:
  106. }
  107. // create a new connection within the closure to defer the service.close
  108. func() {
  109. defer func() {
  110. // if the receiver is disabled, record the idle state, otherwise record the connecting state
  111. if disable {
  112. s.recordConnectionState(receiverStateIdle)
  113. } else {
  114. s.recordConnectionState(receiverStateConnecting)
  115. }
  116. }()
  117. service := s.factory()
  118. defer service.close(ctx)
  119. if err := service.dial(ctx); err != nil {
  120. s.settings.Logger.Debug("Encountered error while connecting messaging service", zap.Error(err))
  121. s.metrics.recordFailedReconnection()
  122. return
  123. }
  124. // dial was successful, record the connected state
  125. s.recordConnectionState(receiverStateConnected)
  126. if err := s.receiveMessages(ctx, service); err != nil {
  127. s.settings.Logger.Debug("Encountered error while receiving messages", zap.Error(err))
  128. if errors.Is(err, errUpgradeRequired) {
  129. s.metrics.recordNeedUpgrade()
  130. disable = true
  131. return
  132. }
  133. }
  134. }()
  135. // sleep will be interrupted if ctx.Done() is closed
  136. sleep(ctx, s.retryTimeout)
  137. }
  138. }
  139. // recordConnectionState will record the given connection state unless in the terminating state.
  140. // This does not fully prevent the state transitions terminating->(state)->terminated but
  141. // is a best effort without mutex protection and additional state tracking, and in reality if
  142. // this state transition were to happen, it would be short lived.
  143. func (s *solaceTracesReceiver) recordConnectionState(state receiverState) {
  144. if !s.terminating.Load() {
  145. s.metrics.recordReceiverStatus(state)
  146. }
  147. }
  148. // receiveMessages will continuously receive, unmarshal and propagate messages
  149. func (s *solaceTracesReceiver) receiveMessages(ctx context.Context, service messagingService) error {
  150. for {
  151. select { // ctx.Done will be closed when we should terminate
  152. case <-ctx.Done():
  153. return nil
  154. default:
  155. }
  156. // any error encountered will be returned to caller
  157. if err := s.receiveMessage(ctx, service); err != nil {
  158. return err
  159. }
  160. }
  161. }
  162. // receiveMessage is the heart of the receiver's control flow. It will receive messages, unmarshal the message and forward the trace.
  163. // Will return an error if a fatal error occurs. It is expected that any error returned will cause a connection close.
  164. func (s *solaceTracesReceiver) receiveMessage(ctx context.Context, service messagingService) (err error) {
  165. msg, err := service.receiveMessage(ctx)
  166. if err != nil {
  167. s.settings.Logger.Warn("Failed to receive message from messaging service", zap.Error(err))
  168. return err // propagate any receive message error up to caller
  169. }
  170. // only set the disposition action after we have received a message successfully
  171. disposition := service.accept
  172. defer func() { // on return of receiveMessage, we want to either ack or nack the message
  173. if disposition != nil {
  174. if actionErr := disposition(ctx, msg); err == nil && actionErr != nil {
  175. err = actionErr
  176. }
  177. }
  178. }()
  179. // message received successfully
  180. s.metrics.recordReceivedSpanMessages()
  181. // unmarshal the message. unmarshalling errors are not fatal unless the version is unknown
  182. traces, unmarshalErr := s.unmarshaller.unmarshal(msg)
  183. if unmarshalErr != nil {
  184. s.settings.Logger.Error("Encountered error while unmarshalling message", zap.Error(unmarshalErr))
  185. s.metrics.recordFatalUnmarshallingError()
  186. if errors.Is(unmarshalErr, errUpgradeRequired) {
  187. disposition = service.failed // if we don't know the version, reject the trace message since we will disable the receiver
  188. return unmarshalErr
  189. }
  190. s.metrics.recordDroppedSpanMessages() // if the error is some other unmarshalling error, we will ack the message and drop the content
  191. return nil // don't propagate error, but don't continue forwarding traces
  192. }
  193. var flowControlCount int64
  194. flowControlLoop:
  195. for {
  196. // forward to next consumer. Forwarding errors are not fatal so are not propagated to the caller.
  197. // Temporary consumer errors will lead to redelivered messages, permanent will be accepted
  198. forwardErr := s.nextConsumer.ConsumeTraces(ctx, traces)
  199. if forwardErr != nil {
  200. if !consumererror.IsPermanent(forwardErr) {
  201. s.settings.Logger.Info("Encountered temporary error while forwarding traces to next receiver, will allow redelivery", zap.Error(forwardErr))
  202. // handle flow control metrics
  203. if flowControlCount == 0 {
  204. s.metrics.recordFlowControlStatus(flowControlStateControlled)
  205. }
  206. flowControlCount++
  207. s.metrics.recordFlowControlRecentRetries(flowControlCount)
  208. // Backpressure scenario. For now, we are only delayed retry, eventually we may need to handle this
  209. delayTimer := time.NewTimer(s.config.Flow.DelayedRetry.Delay)
  210. select {
  211. case <-delayTimer.C:
  212. continue flowControlLoop
  213. case <-ctx.Done():
  214. s.settings.Logger.Info("Context was cancelled while attempting redelivery, exiting")
  215. disposition = nil // do not make any network requests, we are shutting down
  216. return fmt.Errorf("delayed retry interrupted by shutdown request")
  217. }
  218. } else { // error is permanent, we want to accept the message and increment the number of dropped messages
  219. s.settings.Logger.Warn("Encountered permanent error while forwarding traces to next receiver, will swallow trace", zap.Error(forwardErr))
  220. s.metrics.recordDroppedSpanMessages()
  221. break flowControlLoop
  222. }
  223. } else {
  224. // no forward error
  225. s.metrics.recordReportedSpans(int64(traces.SpanCount()))
  226. break flowControlLoop
  227. }
  228. }
  229. // Make sure to clear the stats no matter what, unless we were interrupted in which case we should preserve the last state
  230. if flowControlCount != 0 {
  231. s.metrics.recordFlowControlStatus(flowControlStateClear)
  232. s.metrics.recordFlowControlTotal()
  233. if flowControlCount == 1 {
  234. s.metrics.recordFlowControlSingleSuccess()
  235. }
  236. }
  237. return nil
  238. }
  239. func sleep(ctx context.Context, d time.Duration) {
  240. timer := time.NewTimer(d)
  241. select {
  242. case <-ctx.Done():
  243. timer.Stop()
  244. case <-timer.C:
  245. }
  246. }