blobeventhandler.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
  4. import (
  5. "context"
  6. "encoding/json"
  7. "strings"
  8. eventhub "github.com/Azure/azure-event-hubs-go/v3"
  9. "go.uber.org/zap"
  10. )
  11. type blobEventHandler interface {
  12. run(ctx context.Context) error
  13. close(ctx context.Context) error
  14. setLogsDataConsumer(logsDataConsumer logsDataConsumer)
  15. setTracesDataConsumer(tracesDataConsumer tracesDataConsumer)
  16. }
  17. type azureBlobEventHandler struct {
  18. blobClient blobClient
  19. logsDataConsumer logsDataConsumer
  20. tracesDataConsumer tracesDataConsumer
  21. logsContainerName string
  22. tracesContainerName string
  23. eventHubSonnectionString string
  24. hub *eventhub.Hub
  25. logger *zap.Logger
  26. }
  27. var _ blobEventHandler = (*azureBlobEventHandler)(nil)
  28. const (
  29. blobCreatedEventType = "Microsoft.Storage.BlobCreated"
  30. )
  31. func (p *azureBlobEventHandler) run(ctx context.Context) error {
  32. if p.hub != nil {
  33. return nil
  34. }
  35. hub, err := eventhub.NewHubFromConnectionString(p.eventHubSonnectionString)
  36. if err != nil {
  37. return err
  38. }
  39. p.hub = hub
  40. runtimeInfo, err := hub.GetRuntimeInformation(ctx)
  41. if err != nil {
  42. return err
  43. }
  44. for _, partitionID := range runtimeInfo.PartitionIDs {
  45. _, err := hub.Receive(ctx, partitionID, p.newMessageHandler, eventhub.ReceiveWithLatestOffset())
  46. if err != nil {
  47. return err
  48. }
  49. }
  50. return nil
  51. }
  52. func (p *azureBlobEventHandler) newMessageHandler(ctx context.Context, event *eventhub.Event) error {
  53. type eventData struct {
  54. Topic string
  55. Subject string
  56. EventType string
  57. ID string
  58. Data map[string]any
  59. DataVersion string
  60. MetadataVersion string
  61. EsventTime string
  62. }
  63. var eventDataSlice []eventData
  64. marshalErr := json.Unmarshal(event.Data, &eventDataSlice)
  65. if marshalErr != nil {
  66. return marshalErr
  67. }
  68. subject := eventDataSlice[0].Subject
  69. containerName := strings.Split(strings.Split(subject, "containers/")[1], "/")[0]
  70. eventType := eventDataSlice[0].EventType
  71. blobName := strings.Split(subject, "blobs/")[1]
  72. if eventType == blobCreatedEventType {
  73. blobData, err := p.blobClient.readBlob(ctx, containerName, blobName)
  74. if err != nil {
  75. return err
  76. }
  77. switch {
  78. case containerName == p.logsContainerName:
  79. err = p.logsDataConsumer.consumeLogsJSON(ctx, blobData.Bytes())
  80. if err != nil {
  81. return err
  82. }
  83. case containerName == p.tracesContainerName:
  84. err = p.tracesDataConsumer.consumeTracesJSON(ctx, blobData.Bytes())
  85. if err != nil {
  86. return err
  87. }
  88. default:
  89. p.logger.Debug("Unknown container name", zap.String("containerName", containerName))
  90. }
  91. }
  92. return nil
  93. }
  94. func (p *azureBlobEventHandler) close(ctx context.Context) error {
  95. if p.hub != nil {
  96. err := p.hub.Close(ctx)
  97. if err != nil {
  98. return err
  99. }
  100. p.hub = nil
  101. }
  102. return nil
  103. }
  104. func (p *azureBlobEventHandler) setLogsDataConsumer(logsDataConsumer logsDataConsumer) {
  105. p.logsDataConsumer = logsDataConsumer
  106. }
  107. func (p *azureBlobEventHandler) setTracesDataConsumer(tracesDataConsumer tracesDataConsumer) {
  108. p.tracesDataConsumer = tracesDataConsumer
  109. }
  110. func newBlobEventHandler(eventHubSonnectionString string, logsContainerName string, tracesContainerName string, blobClient blobClient, logger *zap.Logger) *azureBlobEventHandler {
  111. return &azureBlobEventHandler{
  112. blobClient: blobClient,
  113. logsContainerName: logsContainerName,
  114. tracesContainerName: tracesContainerName,
  115. eventHubSonnectionString: eventHubSonnectionString,
  116. logger: logger,
  117. }
  118. }