factory.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "go.opentelemetry.io/collector/component"
  8. "go.opentelemetry.io/collector/consumer"
  9. "go.opentelemetry.io/collector/receiver"
  10. "go.uber.org/zap"
  11. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver/internal/metadata"
  13. )
  14. const (
  15. logsContainerName = "logs"
  16. tracesContainerName = "traces"
  17. )
  18. var (
  19. errUnexpectedConfigurationType = errors.New("failed to cast configuration to Azure Blob Config")
  20. )
  21. type blobReceiverFactory struct {
  22. receivers *sharedcomponent.SharedComponents
  23. }
  24. // NewFactory returns a factory for Azure Blob receiver.
  25. func NewFactory() receiver.Factory {
  26. f := &blobReceiverFactory{
  27. receivers: sharedcomponent.NewSharedComponents(),
  28. }
  29. return receiver.NewFactory(
  30. metadata.Type,
  31. f.createDefaultConfig,
  32. receiver.WithTraces(f.createTracesReceiver, metadata.TracesStability),
  33. receiver.WithLogs(f.createLogsReceiver, metadata.LogsStability))
  34. }
  35. func (f *blobReceiverFactory) createDefaultConfig() component.Config {
  36. return &Config{
  37. Logs: LogsConfig{ContainerName: logsContainerName},
  38. Traces: TracesConfig{ContainerName: tracesContainerName},
  39. }
  40. }
  41. func (f *blobReceiverFactory) createLogsReceiver(
  42. _ context.Context,
  43. set receiver.CreateSettings,
  44. cfg component.Config,
  45. nextConsumer consumer.Logs,
  46. ) (receiver.Logs, error) {
  47. receiver, err := f.getReceiver(set, cfg)
  48. if err != nil {
  49. set.Logger.Error(err.Error())
  50. return nil, err
  51. }
  52. receiver.(logsDataConsumer).setNextLogsConsumer(nextConsumer)
  53. return receiver, nil
  54. }
  55. func (f *blobReceiverFactory) createTracesReceiver(
  56. _ context.Context,
  57. set receiver.CreateSettings,
  58. cfg component.Config,
  59. nextConsumer consumer.Traces,
  60. ) (receiver.Traces, error) {
  61. receiver, err := f.getReceiver(set, cfg)
  62. if err != nil {
  63. set.Logger.Error(err.Error())
  64. return nil, err
  65. }
  66. receiver.(tracesDataConsumer).setNextTracesConsumer(nextConsumer)
  67. return receiver, nil
  68. }
  69. func (f *blobReceiverFactory) getReceiver(
  70. set receiver.CreateSettings,
  71. cfg component.Config) (component.Component, error) {
  72. var err error
  73. r := f.receivers.GetOrAdd(cfg, func() component.Component {
  74. receiverConfig, ok := cfg.(*Config)
  75. if !ok {
  76. err = errUnexpectedConfigurationType
  77. return nil
  78. }
  79. var beh blobEventHandler
  80. beh, err = f.getBlobEventHandler(receiverConfig, set.Logger)
  81. if err != nil {
  82. return nil
  83. }
  84. var receiver component.Component
  85. receiver, err = newReceiver(set, beh)
  86. return receiver
  87. })
  88. if err != nil {
  89. return nil, err
  90. }
  91. return r.Unwrap(), err
  92. }
  93. func (f *blobReceiverFactory) getBlobEventHandler(cfg *Config, logger *zap.Logger) (blobEventHandler, error) {
  94. bc, err := newBlobClient(cfg.ConnectionString, logger)
  95. if err != nil {
  96. return nil, err
  97. }
  98. return newBlobEventHandler(cfg.EventHub.EndPoint, cfg.Logs.ContainerName, cfg.Traces.ContainerName, bc, logger),
  99. nil
  100. }