123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package googlecloudpubsubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver"
- import (
- "bytes"
- "compress/gzip"
- "context"
- "errors"
- "fmt"
- "io"
- "strings"
- "sync"
- pubsub "cloud.google.com/go/pubsub/apiv1"
- "cloud.google.com/go/pubsub/apiv1/pubsubpb"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.opentelemetry.io/collector/receiver/receiverhelper"
- "go.uber.org/zap"
- "google.golang.org/api/option"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
- )
- // https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#streamingpullrequest
- type pubsubReceiver struct {
- logger *zap.Logger
- obsrecv *receiverhelper.ObsReport
- tracesConsumer consumer.Traces
- metricsConsumer consumer.Metrics
- logsConsumer consumer.Logs
- userAgent string
- config *Config
- client *pubsub.SubscriberClient
- tracesUnmarshaler ptrace.Unmarshaler
- metricsUnmarshaler pmetric.Unmarshaler
- logsUnmarshaler plog.Unmarshaler
- handler *internal.StreamHandler
- startOnce sync.Once
- }
- type encoding int
- const (
- unknown encoding = iota
- otlpProtoTrace = iota
- otlpProtoMetric = iota
- otlpProtoLog = iota
- rawTextLog = iota
- )
- type compression int
- const (
- uncompressed compression = iota
- gZip = iota
- )
- func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOption) {
- if receiver.userAgent != "" {
- copts = append(copts, option.WithUserAgent(receiver.userAgent))
- }
- if receiver.config.Endpoint != "" {
- if receiver.config.Insecure {
- var dialOpts []grpc.DialOption
- if receiver.userAgent != "" {
- dialOpts = append(dialOpts, grpc.WithUserAgent(receiver.userAgent))
- }
- conn, _ := grpc.Dial(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
- copts = append(copts, option.WithGRPCConn(conn))
- } else {
- copts = append(copts, option.WithEndpoint(receiver.config.Endpoint))
- }
- }
- return copts
- }
- func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) error {
- if receiver.tracesConsumer == nil && receiver.metricsConsumer == nil && receiver.logsConsumer == nil {
- return errors.New("cannot start receiver: no consumers were specified")
- }
- var startErr error
- receiver.startOnce.Do(func() {
- copts := receiver.generateClientOptions()
- client, err := pubsub.NewSubscriberClient(ctx, copts...)
- if err != nil {
- startErr = fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err)
- return
- }
- receiver.client = client
- err = receiver.createReceiverHandler(ctx)
- if err != nil {
- startErr = fmt.Errorf("failed to create ReceiverHandler: %w", err)
- return
- }
- })
- receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{}
- receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{}
- receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{}
- return startErr
- }
- func (receiver *pubsubReceiver) Shutdown(_ context.Context) error {
- receiver.logger.Info("Stopping Google Pubsub receiver")
- receiver.handler.CancelNow()
- receiver.logger.Info("Stopped Google Pubsub receiver")
- return nil
- }
- func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error {
- if receiver.logsConsumer == nil {
- return nil
- }
- data := string(message.Message.Data)
- timestamp := message.GetMessage().PublishTime
- out := plog.NewLogs()
- logs := out.ResourceLogs()
- rls := logs.AppendEmpty()
- ills := rls.ScopeLogs().AppendEmpty()
- lr := ills.LogRecords().AppendEmpty()
- lr.Body().SetStr(data)
- lr.SetTimestamp(pcommon.NewTimestampFromTime(timestamp.AsTime()))
- return receiver.logsConsumer.ConsumeLogs(ctx, out)
- }
- func decompress(payload []byte, compression compression) ([]byte, error) {
- if compression == gZip {
- reader, err := gzip.NewReader(bytes.NewReader(payload))
- if err != nil {
- return nil, err
- }
- return io.ReadAll(reader)
- }
- return payload, nil
- }
- func (receiver *pubsubReceiver) handleTrace(ctx context.Context, payload []byte, compression compression) error {
- payload, err := decompress(payload, compression)
- if err != nil {
- return err
- }
- otlpData, err := receiver.tracesUnmarshaler.UnmarshalTraces(payload)
- count := otlpData.SpanCount()
- if err != nil {
- return err
- }
- ctx = receiver.obsrecv.StartTracesOp(ctx)
- err = receiver.tracesConsumer.ConsumeTraces(ctx, otlpData)
- receiver.obsrecv.EndTracesOp(ctx, reportFormatProtobuf, count, err)
- return nil
- }
- func (receiver *pubsubReceiver) handleMetric(ctx context.Context, payload []byte, compression compression) error {
- payload, err := decompress(payload, compression)
- if err != nil {
- return err
- }
- otlpData, err := receiver.metricsUnmarshaler.UnmarshalMetrics(payload)
- count := otlpData.MetricCount()
- if err != nil {
- return err
- }
- ctx = receiver.obsrecv.StartMetricsOp(ctx)
- err = receiver.metricsConsumer.ConsumeMetrics(ctx, otlpData)
- receiver.obsrecv.EndMetricsOp(ctx, reportFormatProtobuf, count, err)
- return nil
- }
- func (receiver *pubsubReceiver) handleLog(ctx context.Context, payload []byte, compression compression) error {
- payload, err := decompress(payload, compression)
- if err != nil {
- return err
- }
- otlpData, err := receiver.logsUnmarshaler.UnmarshalLogs(payload)
- count := otlpData.LogRecordCount()
- if err != nil {
- return err
- }
- ctx = receiver.obsrecv.StartLogsOp(ctx)
- err = receiver.logsConsumer.ConsumeLogs(ctx, otlpData)
- receiver.obsrecv.EndLogsOp(ctx, reportFormatProtobuf, count, err)
- return nil
- }
- func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (encoding, compression) {
- otlpEncoding := unknown
- otlpCompression := uncompressed
- ceType := attributes["ce-type"]
- ceContentType := attributes["content-type"]
- if strings.HasSuffix(ceContentType, "application/protobuf") {
- switch ceType {
- case "org.opentelemetry.otlp.traces.v1":
- otlpEncoding = otlpProtoTrace
- case "org.opentelemetry.otlp.metrics.v1":
- otlpEncoding = otlpProtoMetric
- case "org.opentelemetry.otlp.logs.v1":
- otlpEncoding = otlpProtoLog
- }
- } else if strings.HasSuffix(ceContentType, "text/plain") {
- otlpEncoding = rawTextLog
- }
- if otlpEncoding == unknown && receiver.config.Encoding != "" {
- switch receiver.config.Encoding {
- case "otlp_proto_trace":
- otlpEncoding = otlpProtoTrace
- case "otlp_proto_metric":
- otlpEncoding = otlpProtoMetric
- case "otlp_proto_log":
- otlpEncoding = otlpProtoLog
- case "raw_text":
- otlpEncoding = rawTextLog
- }
- }
- ceContentEncoding := attributes["content-encoding"]
- if ceContentEncoding == "gzip" {
- otlpCompression = gZip
- }
- if otlpCompression == uncompressed && receiver.config.Compression != "" {
- if receiver.config.Compression == "gzip" {
- otlpCompression = gZip
- }
- }
- return otlpEncoding, otlpCompression
- }
- func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error {
- var err error
- receiver.handler, err = internal.NewHandler(
- ctx,
- receiver.logger,
- receiver.client,
- receiver.config.ClientID,
- receiver.config.Subscription,
- func(ctx context.Context, message *pubsubpb.ReceivedMessage) error {
- payload := message.Message.Data
- encoding, compression := receiver.detectEncoding(message.Message.Attributes)
- switch encoding {
- case otlpProtoTrace:
- if receiver.tracesConsumer != nil {
- return receiver.handleTrace(ctx, payload, compression)
- }
- case otlpProtoMetric:
- if receiver.metricsConsumer != nil {
- return receiver.handleMetric(ctx, payload, compression)
- }
- case otlpProtoLog:
- if receiver.logsConsumer != nil {
- return receiver.handleLog(ctx, payload, compression)
- }
- case rawTextLog:
- return receiver.handleLogStrings(ctx, message)
- case unknown:
- return errors.New("unknown encoding")
- }
- return errors.New("unknown encoding")
- })
- if err != nil {
- return err
- }
- receiver.handler.RecoverableStream(ctx)
- return nil
- }
|