123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
- import (
- "context"
- "encoding/json"
- "strings"
- eventhub "github.com/Azure/azure-event-hubs-go/v3"
- "go.uber.org/zap"
- )
- type blobEventHandler interface {
- run(ctx context.Context) error
- close(ctx context.Context) error
- setLogsDataConsumer(logsDataConsumer logsDataConsumer)
- setTracesDataConsumer(tracesDataConsumer tracesDataConsumer)
- }
- type azureBlobEventHandler struct {
- blobClient blobClient
- logsDataConsumer logsDataConsumer
- tracesDataConsumer tracesDataConsumer
- logsContainerName string
- tracesContainerName string
- eventHubSonnectionString string
- hub *eventhub.Hub
- logger *zap.Logger
- }
- var _ blobEventHandler = (*azureBlobEventHandler)(nil)
- const (
- blobCreatedEventType = "Microsoft.Storage.BlobCreated"
- )
- func (p *azureBlobEventHandler) run(ctx context.Context) error {
- if p.hub != nil {
- return nil
- }
- hub, err := eventhub.NewHubFromConnectionString(p.eventHubSonnectionString)
- if err != nil {
- return err
- }
- p.hub = hub
- runtimeInfo, err := hub.GetRuntimeInformation(ctx)
- if err != nil {
- return err
- }
- for _, partitionID := range runtimeInfo.PartitionIDs {
- _, err := hub.Receive(ctx, partitionID, p.newMessageHandler, eventhub.ReceiveWithLatestOffset())
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (p *azureBlobEventHandler) newMessageHandler(ctx context.Context, event *eventhub.Event) error {
- type eventData struct {
- Topic string
- Subject string
- EventType string
- ID string
- Data map[string]any
- DataVersion string
- MetadataVersion string
- EsventTime string
- }
- var eventDataSlice []eventData
- marshalErr := json.Unmarshal(event.Data, &eventDataSlice)
- if marshalErr != nil {
- return marshalErr
- }
- subject := eventDataSlice[0].Subject
- containerName := strings.Split(strings.Split(subject, "containers/")[1], "/")[0]
- eventType := eventDataSlice[0].EventType
- blobName := strings.Split(subject, "blobs/")[1]
- if eventType == blobCreatedEventType {
- blobData, err := p.blobClient.readBlob(ctx, containerName, blobName)
- if err != nil {
- return err
- }
- switch {
- case containerName == p.logsContainerName:
- err = p.logsDataConsumer.consumeLogsJSON(ctx, blobData.Bytes())
- if err != nil {
- return err
- }
- case containerName == p.tracesContainerName:
- err = p.tracesDataConsumer.consumeTracesJSON(ctx, blobData.Bytes())
- if err != nil {
- return err
- }
- default:
- p.logger.Debug("Unknown container name", zap.String("containerName", containerName))
- }
- }
- return nil
- }
- func (p *azureBlobEventHandler) close(ctx context.Context) error {
- if p.hub != nil {
- err := p.hub.Close(ctx)
- if err != nil {
- return err
- }
- p.hub = nil
- }
- return nil
- }
- func (p *azureBlobEventHandler) setLogsDataConsumer(logsDataConsumer logsDataConsumer) {
- p.logsDataConsumer = logsDataConsumer
- }
- func (p *azureBlobEventHandler) setTracesDataConsumer(tracesDataConsumer tracesDataConsumer) {
- p.tracesDataConsumer = tracesDataConsumer
- }
- func newBlobEventHandler(eventHubSonnectionString string, logsContainerName string, tracesContainerName string, blobClient blobClient, logger *zap.Logger) *azureBlobEventHandler {
- return &azureBlobEventHandler{
- blobClient: blobClient,
- logsContainerName: logsContainerName,
- tracesContainerName: tracesContainerName,
- eventHubSonnectionString: eventHubSonnectionString,
- logger: logger,
- }
- }
|