messaging_service.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package solacereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver"
  4. import (
  5. "context"
  6. "crypto/tls"
  7. "fmt"
  8. "time"
  9. "github.com/Azure/go-amqp"
  10. "go.uber.org/zap"
  11. )
  12. // inboundMessage is an alias for amqp.Message
  13. type inboundMessage = amqp.Message
  14. // messagingService abstracts out the AMQP transport capabilities for unit testing
  15. type messagingService interface {
  16. dial(ctx context.Context) error
  17. close(ctx context.Context)
  18. receiveMessage(ctx context.Context) (*inboundMessage, error)
  19. accept(ctx context.Context, msg *inboundMessage) error
  20. failed(ctx context.Context, msg *inboundMessage) error
  21. }
  22. // messagingServiceFactory is a factory to create new messagingService instances
  23. type messagingServiceFactory func() messagingService
  24. // newAMQPMessagingServiceFactory creates a new messagingServiceFactory backed by AMQP
  25. func newAMQPMessagingServiceFactory(cfg *Config, logger *zap.Logger) (messagingServiceFactory, error) {
  26. saslConnOption, authErr := toAMQPAuthentication(cfg)
  27. if authErr != nil {
  28. return nil, authErr
  29. }
  30. // Use the default load config for TLS. Note that in the case where "insecure" is true and no
  31. // ca file is provided, tlsConfig will be nil representing a plaintext connection.
  32. loadedTLSConfig, err := cfg.TLS.LoadTLSConfig()
  33. if err != nil {
  34. return nil, err
  35. }
  36. broker := cfg.Broker[0]
  37. // If the TLS config is nil, insecure is true and we should use amqp rather than amqps
  38. scheme := "amqp"
  39. if loadedTLSConfig != nil {
  40. scheme = "amqps"
  41. }
  42. amqpHostAddress := fmt.Sprintf("%s://%s", scheme, broker)
  43. connectConfig := &amqpConnectConfig{
  44. addr: amqpHostAddress,
  45. tlsConfig: loadedTLSConfig,
  46. saslConfig: saslConnOption,
  47. }
  48. receiverConfig := &amqpReceiverConfig{
  49. queue: cfg.Queue,
  50. maxUnacked: cfg.MaxUnacked,
  51. batchMaxAge: 1 * time.Second,
  52. }
  53. return func() messagingService {
  54. return &amqpMessagingService{
  55. connectConfig: connectConfig,
  56. receiverConfig: receiverConfig,
  57. logger: logger,
  58. }
  59. }, nil
  60. }
  61. type amqpConnectConfig struct {
  62. // conenct config
  63. addr string
  64. saslConfig amqp.SASLType
  65. tlsConfig *tls.Config
  66. }
  67. type amqpReceiverConfig struct {
  68. queue string
  69. maxUnacked int32
  70. batchMaxAge time.Duration
  71. }
  72. type amqpMessagingService struct {
  73. // factory fields
  74. connectConfig *amqpConnectConfig
  75. receiverConfig *amqpReceiverConfig
  76. logger *zap.Logger
  77. // runtime fields
  78. client *amqp.Conn
  79. session *amqp.Session
  80. receiver *amqp.Receiver
  81. }
  82. // dialFunc is abstracted out into a variable in order for substitutions
  83. var dialFunc = amqp.Dial
  84. // telemetryLinkName will be used to create the single receiver link in order to standardize the connection.
  85. // Mainly useful for testing to mock amqp frames.
  86. const telemetryLinkName = "rx"
  87. func (m *amqpMessagingService) dial(ctx context.Context) (err error) {
  88. opts := &amqp.ConnOptions{}
  89. opts.SASLType = m.connectConfig.saslConfig
  90. if m.connectConfig.tlsConfig != nil {
  91. opts.TLSConfig = m.connectConfig.tlsConfig
  92. }
  93. m.logger.Debug("Dialing AMQP", zap.String("addr", m.connectConfig.addr))
  94. m.client, err = dialFunc(ctx, m.connectConfig.addr, opts)
  95. if err != nil {
  96. m.logger.Debug("Dial AMQP failure", zap.Error(err))
  97. return err
  98. }
  99. m.logger.Debug("Creating new AMQP Session")
  100. m.session, err = m.client.NewSession(ctx, &amqp.SessionOptions{})
  101. if err != nil {
  102. m.logger.Debug("Create AMQP Session failure", zap.Error(err))
  103. return err
  104. }
  105. m.logger.Debug("Creating new AMQP Receive Link", zap.String("source", m.receiverConfig.queue))
  106. m.receiver, err = m.session.NewReceiver(ctx, m.receiverConfig.queue, &amqp.ReceiverOptions{
  107. Credit: m.receiverConfig.maxUnacked,
  108. Name: telemetryLinkName,
  109. })
  110. if err != nil {
  111. m.logger.Debug("Create AMQP Receiver Link failure", zap.Error(err))
  112. return err
  113. }
  114. return nil
  115. }
  116. func (m *amqpMessagingService) close(ctx context.Context) {
  117. if m.receiver != nil {
  118. m.logger.Debug("Closing AMQP Receiver")
  119. err := m.receiver.Close(ctx)
  120. if err != nil {
  121. m.logger.Debug("Receiver close failed", zap.Error(err))
  122. }
  123. }
  124. if m.session != nil {
  125. m.logger.Debug("Closing AMQP Session")
  126. err := m.session.Close(ctx)
  127. if err != nil {
  128. m.logger.Debug("Session closed failed", zap.Error(err))
  129. }
  130. }
  131. if m.client != nil {
  132. m.logger.Debug("Closing AMQP Client")
  133. err := m.client.Close()
  134. if err != nil {
  135. m.logger.Debug("Client closed failed", zap.Error(err))
  136. }
  137. }
  138. }
  139. func (m *amqpMessagingService) receiveMessage(ctx context.Context) (*inboundMessage, error) {
  140. return m.receiver.Receive(ctx, &amqp.ReceiveOptions{})
  141. }
  142. func (m *amqpMessagingService) accept(ctx context.Context, msg *inboundMessage) error {
  143. return m.receiver.AcceptMessage(ctx, msg)
  144. }
  145. func (m *amqpMessagingService) failed(ctx context.Context, msg *inboundMessage) error {
  146. return m.receiver.ModifyMessage(ctx, msg, &amqp.ModifyMessageOptions{
  147. DeliveryFailed: true,
  148. UndeliverableHere: false,
  149. Annotations: nil,
  150. })
  151. }
  152. // Allow for substitution in testing to assert correct data is passed to AMQP
  153. // Due to the way that AMQP authentication is configured in Azure/amqp, we
  154. // need to monkey substitute here since ConnSASL<auth> returns a function that
  155. // acts on a private struct meaning we cannot meaningfully assert validity otherwise.
  156. var (
  157. connSASLPlain func(username, password string) amqp.SASLType = amqp.SASLTypePlain
  158. connSASLXOAUTH2 func(username, bearer string, maxFrameSizeOverride uint32) amqp.SASLType = amqp.SASLTypeXOAUTH2
  159. connSASLExternal func(resp string) amqp.SASLType = amqp.SASLTypeExternal
  160. )
  161. // toAMQPAuthentication configures authentication in amqp.ConnOption slice
  162. func toAMQPAuthentication(config *Config) (amqp.SASLType, error) {
  163. if config.Auth.PlainText != nil {
  164. plaintext := config.Auth.PlainText
  165. if plaintext.Password == "" || plaintext.Username == "" {
  166. return nil, errMissingPlainTextParams
  167. }
  168. return connSASLPlain(plaintext.Username, string(plaintext.Password)), nil
  169. }
  170. if config.Auth.XAuth2 != nil {
  171. xauth := config.Auth.XAuth2
  172. if xauth.Bearer == "" || xauth.Username == "" {
  173. return nil, errMissingXauth2Params
  174. }
  175. return connSASLXOAUTH2(xauth.Username, xauth.Bearer, saslMaxInitFrameSizeOverride), nil
  176. }
  177. if config.Auth.External != nil {
  178. return connSASLExternal(""), nil
  179. }
  180. return nil, errMissingAuthDetails
  181. }