123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"
- import (
- "context"
- "errors"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/receiver"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver/internal/metadata"
- )
- const (
- // The receiver scope name
- receiverScopeName = "otelcol/" + metadata.Type + "receiver"
- )
- var (
- errUnexpectedConfigurationType = errors.New("failed to cast configuration to azure event hub config")
- )
- type eventhubReceiverFactory struct {
- receivers *sharedcomponent.SharedComponents
- }
- // NewFactory creates a factory for the Azure Event Hub receiver.
- func NewFactory() receiver.Factory {
- f := &eventhubReceiverFactory{
- receivers: sharedcomponent.NewSharedComponents(),
- }
- return receiver.NewFactory(
- metadata.Type,
- createDefaultConfig,
- receiver.WithLogs(f.createLogsReceiver, metadata.LogsStability),
- receiver.WithMetrics(f.createMetricsReceiver, metadata.MetricsStability))
- }
- func createDefaultConfig() component.Config {
- return &Config{}
- }
- func (f *eventhubReceiverFactory) createLogsReceiver(
- _ context.Context,
- settings receiver.CreateSettings,
- cfg component.Config,
- nextConsumer consumer.Logs,
- ) (receiver.Logs, error) {
- receiver, err := f.getReceiver(component.DataTypeLogs, cfg, settings)
- if err != nil {
- return nil, err
- }
- receiver.(dataConsumer).setNextLogsConsumer(nextConsumer)
- return receiver, nil
- }
- func (f *eventhubReceiverFactory) createMetricsReceiver(
- _ context.Context,
- settings receiver.CreateSettings,
- cfg component.Config,
- nextConsumer consumer.Metrics,
- ) (receiver.Metrics, error) {
- receiver, err := f.getReceiver(component.DataTypeMetrics, cfg, settings)
- if err != nil {
- return nil, err
- }
- receiver.(dataConsumer).setNextMetricsConsumer(nextConsumer)
- return receiver, nil
- }
- func (f *eventhubReceiverFactory) getReceiver(
- receiverType component.Type,
- cfg component.Config,
- settings receiver.CreateSettings,
- ) (component.Component, error) {
- var err error
- r := f.receivers.GetOrAdd(cfg, func() component.Component {
- receiverConfig, ok := cfg.(*Config)
- if !ok {
- err = errUnexpectedConfigurationType
- return nil
- }
- var logsUnmarshaler eventLogsUnmarshaler
- var metricsUnmarshaler eventMetricsUnmarshaler
- switch receiverType {
- case component.DataTypeLogs:
- if logFormat(receiverConfig.Format) == rawLogFormat {
- logsUnmarshaler = newRawLogsUnmarshaler(settings.Logger)
- } else {
- logsUnmarshaler = newAzureResourceLogsUnmarshaler(settings.BuildInfo, settings.Logger)
- }
- case component.DataTypeMetrics:
- if logFormat(receiverConfig.Format) == rawLogFormat {
- metricsUnmarshaler = nil
- err = errors.New("raw format not supported for Metrics")
- } else {
- metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger)
- }
- case component.DataTypeTraces:
- err = errors.New("unsupported traces data")
- }
- if err != nil {
- return nil
- }
- eventHandler := newEventhubHandler(receiverConfig, settings)
- var receiver component.Component
- receiver, err = newReceiver(receiverType, logsUnmarshaler, metricsUnmarshaler, eventHandler, settings)
- return receiver
- })
- if err != nil {
- return nil, err
- }
- return r.Unwrap(), err
- }
|