factory.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "go.opentelemetry.io/collector/component"
  8. "go.opentelemetry.io/collector/consumer"
  9. "go.opentelemetry.io/collector/receiver"
  10. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
  11. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver/internal/metadata"
  12. )
  13. const (
  14. // The receiver scope name
  15. receiverScopeName = "otelcol/" + metadata.Type + "receiver"
  16. )
  17. var (
  18. errUnexpectedConfigurationType = errors.New("failed to cast configuration to azure event hub config")
  19. )
  20. type eventhubReceiverFactory struct {
  21. receivers *sharedcomponent.SharedComponents
  22. }
  23. // NewFactory creates a factory for the Azure Event Hub receiver.
  24. func NewFactory() receiver.Factory {
  25. f := &eventhubReceiverFactory{
  26. receivers: sharedcomponent.NewSharedComponents(),
  27. }
  28. return receiver.NewFactory(
  29. metadata.Type,
  30. createDefaultConfig,
  31. receiver.WithLogs(f.createLogsReceiver, metadata.LogsStability),
  32. receiver.WithMetrics(f.createMetricsReceiver, metadata.MetricsStability))
  33. }
  34. func createDefaultConfig() component.Config {
  35. return &Config{}
  36. }
  37. func (f *eventhubReceiverFactory) createLogsReceiver(
  38. _ context.Context,
  39. settings receiver.CreateSettings,
  40. cfg component.Config,
  41. nextConsumer consumer.Logs,
  42. ) (receiver.Logs, error) {
  43. receiver, err := f.getReceiver(component.DataTypeLogs, cfg, settings)
  44. if err != nil {
  45. return nil, err
  46. }
  47. receiver.(dataConsumer).setNextLogsConsumer(nextConsumer)
  48. return receiver, nil
  49. }
  50. func (f *eventhubReceiverFactory) createMetricsReceiver(
  51. _ context.Context,
  52. settings receiver.CreateSettings,
  53. cfg component.Config,
  54. nextConsumer consumer.Metrics,
  55. ) (receiver.Metrics, error) {
  56. receiver, err := f.getReceiver(component.DataTypeMetrics, cfg, settings)
  57. if err != nil {
  58. return nil, err
  59. }
  60. receiver.(dataConsumer).setNextMetricsConsumer(nextConsumer)
  61. return receiver, nil
  62. }
  63. func (f *eventhubReceiverFactory) getReceiver(
  64. receiverType component.Type,
  65. cfg component.Config,
  66. settings receiver.CreateSettings,
  67. ) (component.Component, error) {
  68. var err error
  69. r := f.receivers.GetOrAdd(cfg, func() component.Component {
  70. receiverConfig, ok := cfg.(*Config)
  71. if !ok {
  72. err = errUnexpectedConfigurationType
  73. return nil
  74. }
  75. var logsUnmarshaler eventLogsUnmarshaler
  76. var metricsUnmarshaler eventMetricsUnmarshaler
  77. switch receiverType {
  78. case component.DataTypeLogs:
  79. if logFormat(receiverConfig.Format) == rawLogFormat {
  80. logsUnmarshaler = newRawLogsUnmarshaler(settings.Logger)
  81. } else {
  82. logsUnmarshaler = newAzureResourceLogsUnmarshaler(settings.BuildInfo, settings.Logger)
  83. }
  84. case component.DataTypeMetrics:
  85. if logFormat(receiverConfig.Format) == rawLogFormat {
  86. metricsUnmarshaler = nil
  87. err = errors.New("raw format not supported for Metrics")
  88. } else {
  89. metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger)
  90. }
  91. case component.DataTypeTraces:
  92. err = errors.New("unsupported traces data")
  93. }
  94. if err != nil {
  95. return nil
  96. }
  97. eventHandler := newEventhubHandler(receiverConfig, settings)
  98. var receiver component.Component
  99. receiver, err = newReceiver(receiverType, logsUnmarshaler, metricsUnmarshaler, eventHandler, settings)
  100. return receiver
  101. })
  102. if err != nil {
  103. return nil, err
  104. }
  105. return r.Unwrap(), err
  106. }