receiver.go 8.2 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package googlecloudpubsubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver"
  4. import (
  5. "bytes"
  6. "compress/gzip"
  7. "context"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "strings"
  12. "sync"
  13. pubsub "cloud.google.com/go/pubsub/apiv1"
  14. "cloud.google.com/go/pubsub/apiv1/pubsubpb"
  15. "go.opentelemetry.io/collector/component"
  16. "go.opentelemetry.io/collector/consumer"
  17. "go.opentelemetry.io/collector/pdata/pcommon"
  18. "go.opentelemetry.io/collector/pdata/plog"
  19. "go.opentelemetry.io/collector/pdata/pmetric"
  20. "go.opentelemetry.io/collector/pdata/ptrace"
  21. "go.opentelemetry.io/collector/receiver/receiverhelper"
  22. "go.uber.org/zap"
  23. "google.golang.org/api/option"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/credentials/insecure"
  26. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
  27. )
  28. // https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#streamingpullrequest
  29. type pubsubReceiver struct {
  30. logger *zap.Logger
  31. obsrecv *receiverhelper.ObsReport
  32. tracesConsumer consumer.Traces
  33. metricsConsumer consumer.Metrics
  34. logsConsumer consumer.Logs
  35. userAgent string
  36. config *Config
  37. client *pubsub.SubscriberClient
  38. tracesUnmarshaler ptrace.Unmarshaler
  39. metricsUnmarshaler pmetric.Unmarshaler
  40. logsUnmarshaler plog.Unmarshaler
  41. handler *internal.StreamHandler
  42. startOnce sync.Once
  43. }
  44. type encoding int
  45. const (
  46. unknown encoding = iota
  47. otlpProtoTrace = iota
  48. otlpProtoMetric = iota
  49. otlpProtoLog = iota
  50. rawTextLog = iota
  51. )
  52. type compression int
  53. const (
  54. uncompressed compression = iota
  55. gZip = iota
  56. )
  57. func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOption) {
  58. if receiver.userAgent != "" {
  59. copts = append(copts, option.WithUserAgent(receiver.userAgent))
  60. }
  61. if receiver.config.Endpoint != "" {
  62. if receiver.config.Insecure {
  63. var dialOpts []grpc.DialOption
  64. if receiver.userAgent != "" {
  65. dialOpts = append(dialOpts, grpc.WithUserAgent(receiver.userAgent))
  66. }
  67. conn, _ := grpc.Dial(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
  68. copts = append(copts, option.WithGRPCConn(conn))
  69. } else {
  70. copts = append(copts, option.WithEndpoint(receiver.config.Endpoint))
  71. }
  72. }
  73. return copts
  74. }
  75. func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) error {
  76. if receiver.tracesConsumer == nil && receiver.metricsConsumer == nil && receiver.logsConsumer == nil {
  77. return errors.New("cannot start receiver: no consumers were specified")
  78. }
  79. var startErr error
  80. receiver.startOnce.Do(func() {
  81. copts := receiver.generateClientOptions()
  82. client, err := pubsub.NewSubscriberClient(ctx, copts...)
  83. if err != nil {
  84. startErr = fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err)
  85. return
  86. }
  87. receiver.client = client
  88. err = receiver.createReceiverHandler(ctx)
  89. if err != nil {
  90. startErr = fmt.Errorf("failed to create ReceiverHandler: %w", err)
  91. return
  92. }
  93. })
  94. receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{}
  95. receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{}
  96. receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{}
  97. return startErr
  98. }
  99. func (receiver *pubsubReceiver) Shutdown(_ context.Context) error {
  100. receiver.logger.Info("Stopping Google Pubsub receiver")
  101. receiver.handler.CancelNow()
  102. receiver.logger.Info("Stopped Google Pubsub receiver")
  103. return nil
  104. }
  105. func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error {
  106. if receiver.logsConsumer == nil {
  107. return nil
  108. }
  109. data := string(message.Message.Data)
  110. timestamp := message.GetMessage().PublishTime
  111. out := plog.NewLogs()
  112. logs := out.ResourceLogs()
  113. rls := logs.AppendEmpty()
  114. ills := rls.ScopeLogs().AppendEmpty()
  115. lr := ills.LogRecords().AppendEmpty()
  116. lr.Body().SetStr(data)
  117. lr.SetTimestamp(pcommon.NewTimestampFromTime(timestamp.AsTime()))
  118. return receiver.logsConsumer.ConsumeLogs(ctx, out)
  119. }
  120. func decompress(payload []byte, compression compression) ([]byte, error) {
  121. if compression == gZip {
  122. reader, err := gzip.NewReader(bytes.NewReader(payload))
  123. if err != nil {
  124. return nil, err
  125. }
  126. return io.ReadAll(reader)
  127. }
  128. return payload, nil
  129. }
  130. func (receiver *pubsubReceiver) handleTrace(ctx context.Context, payload []byte, compression compression) error {
  131. payload, err := decompress(payload, compression)
  132. if err != nil {
  133. return err
  134. }
  135. otlpData, err := receiver.tracesUnmarshaler.UnmarshalTraces(payload)
  136. count := otlpData.SpanCount()
  137. if err != nil {
  138. return err
  139. }
  140. ctx = receiver.obsrecv.StartTracesOp(ctx)
  141. err = receiver.tracesConsumer.ConsumeTraces(ctx, otlpData)
  142. receiver.obsrecv.EndTracesOp(ctx, reportFormatProtobuf, count, err)
  143. return nil
  144. }
  145. func (receiver *pubsubReceiver) handleMetric(ctx context.Context, payload []byte, compression compression) error {
  146. payload, err := decompress(payload, compression)
  147. if err != nil {
  148. return err
  149. }
  150. otlpData, err := receiver.metricsUnmarshaler.UnmarshalMetrics(payload)
  151. count := otlpData.MetricCount()
  152. if err != nil {
  153. return err
  154. }
  155. ctx = receiver.obsrecv.StartMetricsOp(ctx)
  156. err = receiver.metricsConsumer.ConsumeMetrics(ctx, otlpData)
  157. receiver.obsrecv.EndMetricsOp(ctx, reportFormatProtobuf, count, err)
  158. return nil
  159. }
  160. func (receiver *pubsubReceiver) handleLog(ctx context.Context, payload []byte, compression compression) error {
  161. payload, err := decompress(payload, compression)
  162. if err != nil {
  163. return err
  164. }
  165. otlpData, err := receiver.logsUnmarshaler.UnmarshalLogs(payload)
  166. count := otlpData.LogRecordCount()
  167. if err != nil {
  168. return err
  169. }
  170. ctx = receiver.obsrecv.StartLogsOp(ctx)
  171. err = receiver.logsConsumer.ConsumeLogs(ctx, otlpData)
  172. receiver.obsrecv.EndLogsOp(ctx, reportFormatProtobuf, count, err)
  173. return nil
  174. }
  175. func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (encoding, compression) {
  176. otlpEncoding := unknown
  177. otlpCompression := uncompressed
  178. ceType := attributes["ce-type"]
  179. ceContentType := attributes["content-type"]
  180. if strings.HasSuffix(ceContentType, "application/protobuf") {
  181. switch ceType {
  182. case "org.opentelemetry.otlp.traces.v1":
  183. otlpEncoding = otlpProtoTrace
  184. case "org.opentelemetry.otlp.metrics.v1":
  185. otlpEncoding = otlpProtoMetric
  186. case "org.opentelemetry.otlp.logs.v1":
  187. otlpEncoding = otlpProtoLog
  188. }
  189. } else if strings.HasSuffix(ceContentType, "text/plain") {
  190. otlpEncoding = rawTextLog
  191. }
  192. if otlpEncoding == unknown && receiver.config.Encoding != "" {
  193. switch receiver.config.Encoding {
  194. case "otlp_proto_trace":
  195. otlpEncoding = otlpProtoTrace
  196. case "otlp_proto_metric":
  197. otlpEncoding = otlpProtoMetric
  198. case "otlp_proto_log":
  199. otlpEncoding = otlpProtoLog
  200. case "raw_text":
  201. otlpEncoding = rawTextLog
  202. }
  203. }
  204. ceContentEncoding := attributes["content-encoding"]
  205. if ceContentEncoding == "gzip" {
  206. otlpCompression = gZip
  207. }
  208. if otlpCompression == uncompressed && receiver.config.Compression != "" {
  209. if receiver.config.Compression == "gzip" {
  210. otlpCompression = gZip
  211. }
  212. }
  213. return otlpEncoding, otlpCompression
  214. }
  215. func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error {
  216. var err error
  217. receiver.handler, err = internal.NewHandler(
  218. ctx,
  219. receiver.logger,
  220. receiver.client,
  221. receiver.config.ClientID,
  222. receiver.config.Subscription,
  223. func(ctx context.Context, message *pubsubpb.ReceivedMessage) error {
  224. payload := message.Message.Data
  225. encoding, compression := receiver.detectEncoding(message.Message.Attributes)
  226. switch encoding {
  227. case otlpProtoTrace:
  228. if receiver.tracesConsumer != nil {
  229. return receiver.handleTrace(ctx, payload, compression)
  230. }
  231. case otlpProtoMetric:
  232. if receiver.metricsConsumer != nil {
  233. return receiver.handleMetric(ctx, payload, compression)
  234. }
  235. case otlpProtoLog:
  236. if receiver.logsConsumer != nil {
  237. return receiver.handleLog(ctx, payload, compression)
  238. }
  239. case rawTextLog:
  240. return receiver.handleLogStrings(ctx, message)
  241. case unknown:
  242. return errors.New("unknown encoding")
  243. }
  244. return errors.New("unknown encoding")
  245. })
  246. if err != nil {
  247. return err
  248. }
  249. receiver.handler.RecoverableStream(ctx)
  250. return nil
  251. }