eventhubhandler_test.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"
  4. import (
  5. "context"
  6. "testing"
  7. "time"
  8. eventhub "github.com/Azure/azure-event-hubs-go/v3"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. "go.opentelemetry.io/collector/component"
  12. "go.opentelemetry.io/collector/component/componenttest"
  13. "go.opentelemetry.io/collector/consumer"
  14. "go.opentelemetry.io/collector/consumer/consumertest"
  15. "go.opentelemetry.io/collector/receiver/receiverhelper"
  16. "go.opentelemetry.io/collector/receiver/receivertest"
  17. "go.uber.org/zap"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver/internal/metadata"
  19. )
  20. type mockHubWrapper struct {
  21. }
  22. func (m mockHubWrapper) GetRuntimeInformation(_ context.Context) (*eventhub.HubRuntimeInformation, error) {
  23. return &eventhub.HubRuntimeInformation{
  24. Path: "foo",
  25. CreatedAt: time.Now(),
  26. PartitionCount: 1,
  27. PartitionIDs: []string{"foo"},
  28. }, nil
  29. }
  30. func (m mockHubWrapper) Receive(ctx context.Context, _ string, _ eventhub.Handler, _ ...eventhub.ReceiveOption) (listerHandleWrapper, error) {
  31. return &mockListenerHandleWrapper{
  32. ctx: ctx,
  33. }, nil
  34. }
  35. func (m mockHubWrapper) Close(_ context.Context) error {
  36. return nil
  37. }
  38. type mockListenerHandleWrapper struct {
  39. ctx context.Context
  40. }
  41. func (m *mockListenerHandleWrapper) Done() <-chan struct{} {
  42. return m.ctx.Done()
  43. }
  44. func (m mockListenerHandleWrapper) Err() error {
  45. return nil
  46. }
  47. type mockDataConsumer struct {
  48. logsUnmarshaler eventLogsUnmarshaler
  49. nextLogsConsumer consumer.Logs
  50. obsrecv *receiverhelper.ObsReport
  51. }
  52. func (m *mockDataConsumer) setNextLogsConsumer(nextLogsConsumer consumer.Logs) {
  53. m.nextLogsConsumer = nextLogsConsumer
  54. }
  55. func (m *mockDataConsumer) setNextMetricsConsumer(_ consumer.Metrics) {}
  56. func (m *mockDataConsumer) consume(ctx context.Context, event *eventhub.Event) error {
  57. logsContext := m.obsrecv.StartLogsOp(ctx)
  58. logs, err := m.logsUnmarshaler.UnmarshalLogs(event)
  59. if err != nil {
  60. return err
  61. }
  62. err = m.nextLogsConsumer.ConsumeLogs(logsContext, logs)
  63. m.obsrecv.EndLogsOp(logsContext, metadata.Type, 1, err)
  64. return err
  65. }
  66. func TestEventhubHandler_Start(t *testing.T) {
  67. config := createDefaultConfig()
  68. config.(*Config).Connection = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
  69. ehHandler := &eventhubHandler{
  70. settings: receivertest.NewNopCreateSettings(),
  71. dataConsumer: &mockDataConsumer{},
  72. config: config.(*Config),
  73. }
  74. ehHandler.hub = &mockHubWrapper{}
  75. err := ehHandler.run(context.Background(), componenttest.NewNopHost())
  76. assert.NoError(t, err)
  77. err = ehHandler.close(context.Background())
  78. assert.NoError(t, err)
  79. }
  80. func TestEventhubHandler_newMessageHandler(t *testing.T) {
  81. config := createDefaultConfig()
  82. config.(*Config).Connection = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
  83. sink := new(consumertest.LogsSink)
  84. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  85. ReceiverID: component.NewID(metadata.Type),
  86. Transport: "",
  87. LongLivedCtx: false,
  88. ReceiverCreateSettings: receivertest.NewNopCreateSettings(),
  89. })
  90. require.NoError(t, err)
  91. ehHandler := &eventhubHandler{
  92. settings: receivertest.NewNopCreateSettings(),
  93. config: config.(*Config),
  94. dataConsumer: &mockDataConsumer{
  95. logsUnmarshaler: newRawLogsUnmarshaler(zap.NewNop()),
  96. nextLogsConsumer: sink,
  97. obsrecv: obsrecv,
  98. },
  99. }
  100. ehHandler.hub = &mockHubWrapper{}
  101. err = ehHandler.run(context.Background(), componenttest.NewNopHost())
  102. assert.NoError(t, err)
  103. now := time.Now()
  104. err = ehHandler.newMessageHandler(context.Background(), &eventhub.Event{
  105. Data: []byte("hello"),
  106. PartitionKey: nil,
  107. Properties: map[string]any{"foo": "bar"},
  108. ID: "11234",
  109. SystemProperties: &eventhub.SystemProperties{
  110. SequenceNumber: nil,
  111. EnqueuedTime: &now,
  112. Offset: nil,
  113. PartitionID: nil,
  114. PartitionKey: nil,
  115. Annotations: nil,
  116. },
  117. })
  118. assert.NoError(t, err)
  119. assert.Len(t, sink.AllLogs(), 1)
  120. assert.Equal(t, 1, sink.AllLogs()[0].LogRecordCount())
  121. assert.Equal(t, []byte("hello"), sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().Bytes().AsRaw())
  122. read, ok := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Get("foo")
  123. assert.True(t, ok)
  124. assert.Equal(t, "bar", read.AsString())
  125. }