factory.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package googlecloudpubsubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver"
  4. import (
  5. "context"
  6. "strings"
  7. "go.opentelemetry.io/collector/component"
  8. "go.opentelemetry.io/collector/consumer"
  9. "go.opentelemetry.io/collector/receiver"
  10. "go.opentelemetry.io/collector/receiver/receiverhelper"
  11. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata"
  12. )
  13. const (
  14. reportTransport = "pubsub"
  15. reportFormatProtobuf = "protobuf"
  16. )
  17. func NewFactory() receiver.Factory {
  18. f := &pubsubReceiverFactory{
  19. receivers: make(map[*Config]*pubsubReceiver),
  20. }
  21. return receiver.NewFactory(
  22. metadata.Type,
  23. f.CreateDefaultConfig,
  24. receiver.WithTraces(f.CreateTracesReceiver, metadata.TracesStability),
  25. receiver.WithMetrics(f.CreateMetricsReceiver, metadata.MetricsStability),
  26. receiver.WithLogs(f.CreateLogsReceiver, metadata.LogsStability),
  27. )
  28. }
  29. type pubsubReceiverFactory struct {
  30. receivers map[*Config]*pubsubReceiver
  31. }
  32. func (factory *pubsubReceiverFactory) CreateDefaultConfig() component.Config {
  33. return &Config{}
  34. }
  35. func (factory *pubsubReceiverFactory) ensureReceiver(params receiver.CreateSettings, config component.Config) (*pubsubReceiver, error) {
  36. receiver := factory.receivers[config.(*Config)]
  37. if receiver != nil {
  38. return receiver, nil
  39. }
  40. rconfig := config.(*Config)
  41. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  42. ReceiverID: params.ID,
  43. Transport: reportTransport,
  44. ReceiverCreateSettings: params,
  45. })
  46. if err != nil {
  47. return nil, err
  48. }
  49. receiver = &pubsubReceiver{
  50. logger: params.Logger,
  51. obsrecv: obsrecv,
  52. userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", params.BuildInfo.Version),
  53. config: rconfig,
  54. }
  55. factory.receivers[config.(*Config)] = receiver
  56. return receiver, nil
  57. }
  58. func (factory *pubsubReceiverFactory) CreateTracesReceiver(
  59. _ context.Context,
  60. params receiver.CreateSettings,
  61. cfg component.Config,
  62. consumer consumer.Traces) (receiver.Traces, error) {
  63. if consumer == nil {
  64. return nil, component.ErrNilNextConsumer
  65. }
  66. err := cfg.(*Config).validateForTrace()
  67. if err != nil {
  68. return nil, err
  69. }
  70. receiver, err := factory.ensureReceiver(params, cfg)
  71. if err != nil {
  72. return nil, err
  73. }
  74. receiver.tracesConsumer = consumer
  75. return receiver, nil
  76. }
  77. func (factory *pubsubReceiverFactory) CreateMetricsReceiver(
  78. _ context.Context,
  79. params receiver.CreateSettings,
  80. cfg component.Config,
  81. consumer consumer.Metrics) (receiver.Metrics, error) {
  82. if consumer == nil {
  83. return nil, component.ErrNilNextConsumer
  84. }
  85. err := cfg.(*Config).validateForMetric()
  86. if err != nil {
  87. return nil, err
  88. }
  89. receiver, err := factory.ensureReceiver(params, cfg)
  90. if err != nil {
  91. return nil, err
  92. }
  93. receiver.metricsConsumer = consumer
  94. return receiver, nil
  95. }
  96. func (factory *pubsubReceiverFactory) CreateLogsReceiver(
  97. _ context.Context,
  98. params receiver.CreateSettings,
  99. cfg component.Config,
  100. consumer consumer.Logs) (receiver.Logs, error) {
  101. if consumer == nil {
  102. return nil, component.ErrNilNextConsumer
  103. }
  104. err := cfg.(*Config).validateForLog()
  105. if err != nil {
  106. return nil, err
  107. }
  108. receiver, err := factory.ensureReceiver(params, cfg)
  109. if err != nil {
  110. return nil, err
  111. }
  112. receiver.logsConsumer = consumer
  113. return receiver, nil
  114. }