blobeventhandler_test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
  4. import (
  5. "context"
  6. "testing"
  7. eventhub "github.com/Azure/azure-event-hubs-go/v3"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. "go.uber.org/zap/zaptest"
  11. )
  12. const (
  13. eventHubString = "Endpoint=sb://oteldata.servicebus.windows.net/;SharedAccessKeyName=oteldatahubpolicy;SharedAccessKey=sharedAccessKey;EntityPath=otelddatahub"
  14. )
  15. var (
  16. logEventData = []byte(`[{"topic":"someTopic","subject":"/blobServices/default/containers/logs/blobs/logs-1","eventType":"Microsoft.Storage.BlobCreated","id":"1","data":{"api":"PutBlob","clientRequestId":"1","requestId":"1","eTag":"1","contentType":"text","contentLength":10,"blobType":"BlockBlob","url":"https://oteldata.blob.core.windows.net/logs/logs-1","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1","eventTime":"2022-03-25T15:59:50.9251748Z"}]`)
  17. traceEventData = []byte(`[{"topic":"someTopic","subject":"/blobServices/default/containers/traces/blobs/traces-1","eventType":"Microsoft.Storage.BlobCreated","id":"1","data":{"api":"PutBlob","clientRequestId":"1","requestId":"1","eTag":"1","contentType":"text","contentLength":10,"blobType":"BlockBlob","url":"https://oteldata.blob.core.windows.net/traces/traces-1","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1","eventTime":"2022-03-25T15:59:50.9251748Z"}]`)
  18. )
  19. func TestNewBlobEventHandler(t *testing.T) {
  20. blobClient := newMockBlobClient()
  21. blobEventHandler := getBlobEventHandler(t, blobClient)
  22. require.NotNil(t, blobEventHandler)
  23. assert.Equal(t, blobClient, blobEventHandler.blobClient)
  24. }
  25. func TestNewMessageHangdler(t *testing.T) {
  26. blobClient := newMockBlobClient()
  27. blobEventHandler := getBlobEventHandler(t, blobClient)
  28. logsDataConsumer := newMockLogsDataConsumer()
  29. tracesDataConsumer := newMockTracesDataConsumer()
  30. blobEventHandler.setLogsDataConsumer(logsDataConsumer)
  31. blobEventHandler.setTracesDataConsumer(tracesDataConsumer)
  32. logEvent := getEvent(logEventData)
  33. err := blobEventHandler.newMessageHandler(context.Background(), logEvent)
  34. require.NoError(t, err)
  35. traceEvent := getEvent(traceEventData)
  36. err = blobEventHandler.newMessageHandler(context.Background(), traceEvent)
  37. require.NoError(t, err)
  38. logsDataConsumer.AssertNumberOfCalls(t, "consumeLogsJSON", 1)
  39. tracesDataConsumer.AssertNumberOfCalls(t, "consumeTracesJSON", 1)
  40. blobClient.AssertNumberOfCalls(t, "readBlob", 2)
  41. }
  42. func getEvent(eventData []byte) *eventhub.Event {
  43. return &eventhub.Event{Data: eventData}
  44. }
  45. func getBlobEventHandler(tb testing.TB, blobClient blobClient) *azureBlobEventHandler {
  46. blobEventHandler := newBlobEventHandler(eventHubString, logsContainerName, tracesContainerName, blobClient, zaptest.NewLogger(tb))
  47. return blobEventHandler
  48. }