123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
- import (
- "context"
- "errors"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/receiver"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver/internal/metadata"
- )
- const (
- logsContainerName = "logs"
- tracesContainerName = "traces"
- )
- var (
- errUnexpectedConfigurationType = errors.New("failed to cast configuration to Azure Blob Config")
- )
- type blobReceiverFactory struct {
- receivers *sharedcomponent.SharedComponents
- }
- // NewFactory returns a factory for Azure Blob receiver.
- func NewFactory() receiver.Factory {
- f := &blobReceiverFactory{
- receivers: sharedcomponent.NewSharedComponents(),
- }
- return receiver.NewFactory(
- metadata.Type,
- f.createDefaultConfig,
- receiver.WithTraces(f.createTracesReceiver, metadata.TracesStability),
- receiver.WithLogs(f.createLogsReceiver, metadata.LogsStability))
- }
- func (f *blobReceiverFactory) createDefaultConfig() component.Config {
- return &Config{
- Logs: LogsConfig{ContainerName: logsContainerName},
- Traces: TracesConfig{ContainerName: tracesContainerName},
- }
- }
- func (f *blobReceiverFactory) createLogsReceiver(
- _ context.Context,
- set receiver.CreateSettings,
- cfg component.Config,
- nextConsumer consumer.Logs,
- ) (receiver.Logs, error) {
- receiver, err := f.getReceiver(set, cfg)
- if err != nil {
- set.Logger.Error(err.Error())
- return nil, err
- }
- receiver.(logsDataConsumer).setNextLogsConsumer(nextConsumer)
- return receiver, nil
- }
- func (f *blobReceiverFactory) createTracesReceiver(
- _ context.Context,
- set receiver.CreateSettings,
- cfg component.Config,
- nextConsumer consumer.Traces,
- ) (receiver.Traces, error) {
- receiver, err := f.getReceiver(set, cfg)
- if err != nil {
- set.Logger.Error(err.Error())
- return nil, err
- }
- receiver.(tracesDataConsumer).setNextTracesConsumer(nextConsumer)
- return receiver, nil
- }
- func (f *blobReceiverFactory) getReceiver(
- set receiver.CreateSettings,
- cfg component.Config) (component.Component, error) {
- var err error
- r := f.receivers.GetOrAdd(cfg, func() component.Component {
- receiverConfig, ok := cfg.(*Config)
- if !ok {
- err = errUnexpectedConfigurationType
- return nil
- }
- var beh blobEventHandler
- beh, err = f.getBlobEventHandler(receiverConfig, set.Logger)
- if err != nil {
- return nil
- }
- var receiver component.Component
- receiver, err = newReceiver(set, beh)
- return receiver
- })
- if err != nil {
- return nil, err
- }
- return r.Unwrap(), err
- }
- func (f *blobReceiverFactory) getBlobEventHandler(cfg *Config, logger *zap.Logger) (blobEventHandler, error) {
- bc, err := newBlobClient(cfg.ConnectionString, logger)
- if err != nil {
- return nil, err
- }
- return newBlobEventHandler(cfg.EventHub.EndPoint, cfg.Logs.ContainerName, cfg.Traces.ContainerName, bc, logger),
- nil
- }
|