handler.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. pubsub "cloud.google.com/go/pubsub/apiv1"
  13. "cloud.google.com/go/pubsub/apiv1/pubsubpb"
  14. "go.uber.org/zap"
  15. "google.golang.org/grpc/codes"
  16. "google.golang.org/grpc/status"
  17. )
  18. // Time to wait before restarting, when the stream stopped
  19. const streamRecoveryBackoffPeriod = 250 * time.Millisecond
  20. type StreamHandler struct {
  21. stream pubsubpb.Subscriber_StreamingPullClient
  22. pushMessage func(ctx context.Context, message *pubsubpb.ReceivedMessage) error
  23. acks []string
  24. mutex sync.Mutex
  25. client *pubsub.SubscriberClient
  26. clientID string
  27. subscription string
  28. cancel context.CancelFunc
  29. // wait group for the send/receive function
  30. streamWaitGroup sync.WaitGroup
  31. // wait group for the handler
  32. handlerWaitGroup sync.WaitGroup
  33. logger *zap.Logger
  34. // time that acknowledge loop waits before acknowledging messages
  35. ackBatchWait time.Duration
  36. isRunning atomic.Bool
  37. }
  38. func (handler *StreamHandler) ack(ackID string) {
  39. handler.mutex.Lock()
  40. defer handler.mutex.Unlock()
  41. handler.acks = append(handler.acks, ackID)
  42. }
  43. func NewHandler(
  44. ctx context.Context,
  45. logger *zap.Logger,
  46. client *pubsub.SubscriberClient,
  47. clientID string,
  48. subscription string,
  49. callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error) (*StreamHandler, error) {
  50. handler := StreamHandler{
  51. logger: logger,
  52. client: client,
  53. clientID: clientID,
  54. subscription: subscription,
  55. pushMessage: callback,
  56. ackBatchWait: 10 * time.Second,
  57. }
  58. return &handler, handler.initStream(ctx)
  59. }
  60. func (handler *StreamHandler) initStream(ctx context.Context) error {
  61. var err error
  62. // Create a stream, but with the receivers context as we don't want to cancel and ongoing operation
  63. handler.stream, err = handler.client.StreamingPull(ctx)
  64. if err != nil {
  65. return err
  66. }
  67. request := pubsubpb.StreamingPullRequest{
  68. Subscription: handler.subscription,
  69. StreamAckDeadlineSeconds: 60,
  70. ClientId: handler.clientID,
  71. }
  72. if err := handler.stream.Send(&request); err != nil {
  73. _ = handler.stream.CloseSend()
  74. return err
  75. }
  76. return nil
  77. }
  78. func (handler *StreamHandler) RecoverableStream(ctx context.Context) {
  79. handler.handlerWaitGroup.Add(1)
  80. handler.isRunning.Swap(true)
  81. var handlerCtx context.Context
  82. handlerCtx, handler.cancel = context.WithCancel(ctx)
  83. go handler.recoverableStream(handlerCtx)
  84. }
  85. func (handler *StreamHandler) recoverableStream(ctx context.Context) {
  86. for handler.isRunning.Load() {
  87. // Create a new cancelable context for the handler, so we can recover the stream
  88. var loopCtx context.Context
  89. loopCtx, cancel := context.WithCancel(ctx)
  90. handler.logger.Info("Starting Streaming Pull")
  91. handler.streamWaitGroup.Add(2)
  92. go handler.requestStream(loopCtx, cancel)
  93. go handler.responseStream(loopCtx, cancel)
  94. select {
  95. case <-loopCtx.Done():
  96. handler.streamWaitGroup.Wait()
  97. case <-ctx.Done():
  98. cancel()
  99. handler.streamWaitGroup.Wait()
  100. }
  101. if handler.isRunning.Load() {
  102. err := handler.initStream(ctx)
  103. if err != nil {
  104. handler.logger.Error("Failed to recovery stream.")
  105. }
  106. }
  107. handler.logger.Warn("End of recovery loop, restarting.")
  108. time.Sleep(streamRecoveryBackoffPeriod)
  109. }
  110. handler.logger.Warn("Shutting down recovery loop.")
  111. handler.handlerWaitGroup.Done()
  112. }
  113. func (handler *StreamHandler) CancelNow() {
  114. handler.isRunning.Swap(false)
  115. if handler.cancel != nil {
  116. handler.cancel()
  117. handler.Wait()
  118. }
  119. }
  120. func (handler *StreamHandler) Wait() {
  121. handler.handlerWaitGroup.Wait()
  122. }
  123. func (handler *StreamHandler) acknowledgeMessages() error {
  124. handler.mutex.Lock()
  125. defer handler.mutex.Unlock()
  126. if len(handler.acks) == 0 {
  127. return nil
  128. }
  129. request := pubsubpb.StreamingPullRequest{
  130. AckIds: handler.acks,
  131. }
  132. handler.acks = nil
  133. return handler.stream.Send(&request)
  134. }
  135. func (handler *StreamHandler) requestStream(ctx context.Context, cancel context.CancelFunc) {
  136. timer := time.NewTimer(handler.ackBatchWait)
  137. for {
  138. if err := handler.acknowledgeMessages(); err != nil {
  139. if errors.Is(err, io.EOF) {
  140. handler.logger.Warn("EOF reached")
  141. break
  142. }
  143. handler.logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err))
  144. break
  145. }
  146. select {
  147. case <-ctx.Done():
  148. handler.logger.Warn("requestStream <-ctx.Done()")
  149. case <-timer.C:
  150. timer.Reset(handler.ackBatchWait)
  151. }
  152. if errors.Is(ctx.Err(), context.Canceled) {
  153. _ = handler.acknowledgeMessages()
  154. timer.Stop()
  155. break
  156. }
  157. }
  158. cancel()
  159. handler.logger.Warn("Request Stream loop ended.")
  160. _ = handler.stream.CloseSend()
  161. handler.streamWaitGroup.Done()
  162. }
  163. func (handler *StreamHandler) responseStream(ctx context.Context, cancel context.CancelFunc) {
  164. activeStreaming := true
  165. for activeStreaming {
  166. // block until the next message or timeout expires
  167. resp, err := handler.stream.Recv()
  168. if err == nil {
  169. for _, message := range resp.ReceivedMessages {
  170. // handle all the messages in the response, could be one or more
  171. err = handler.pushMessage(context.Background(), message)
  172. if err == nil {
  173. // When sending a message though the pipeline fails, we ignore the error. We'll let Pubsub
  174. // handle the flow control.
  175. handler.ack(message.AckId)
  176. }
  177. }
  178. } else {
  179. var s, grpcStatus = status.FromError(err)
  180. switch {
  181. case errors.Is(err, io.EOF):
  182. activeStreaming = false
  183. case !grpcStatus:
  184. handler.logger.Warn("response stream breaking on error",
  185. zap.Error(err))
  186. activeStreaming = false
  187. case s.Code() == codes.Unavailable:
  188. handler.logger.Info("response stream breaking on gRPC s 'Unavailable'")
  189. activeStreaming = false
  190. case s.Code() == codes.NotFound:
  191. handler.logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream")
  192. time.Sleep(time.Second * 60)
  193. activeStreaming = false
  194. default:
  195. handler.logger.Warn(fmt.Sprintf("response stream breaking on gRPC s %s", s.Message()),
  196. zap.String("s", s.Message()),
  197. zap.Error(err))
  198. activeStreaming = false
  199. }
  200. }
  201. if errors.Is(ctx.Err(), context.Canceled) {
  202. // Canceling the loop, collector is probably stopping
  203. handler.logger.Warn("response stream ctx.Err() == context.Canceled")
  204. break
  205. }
  206. }
  207. cancel()
  208. handler.logger.Warn("Response Stream loop ended.")
  209. handler.streamWaitGroup.Done()
  210. }