receiver.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. eventhub "github.com/Azure/azure-event-hubs-go/v3"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/consumer"
  11. "go.opentelemetry.io/collector/pdata/plog"
  12. "go.opentelemetry.io/collector/pdata/pmetric"
  13. "go.opentelemetry.io/collector/receiver"
  14. "go.opentelemetry.io/collector/receiver/receiverhelper"
  15. "go.uber.org/zap"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver/internal/metadata"
  17. )
  18. type dataConsumer interface {
  19. consume(ctx context.Context, event *eventhub.Event) error
  20. setNextLogsConsumer(nextLogsConsumer consumer.Logs)
  21. setNextMetricsConsumer(nextLogsConsumer consumer.Metrics)
  22. }
  23. type eventLogsUnmarshaler interface {
  24. UnmarshalLogs(event *eventhub.Event) (plog.Logs, error)
  25. }
  26. type eventMetricsUnmarshaler interface {
  27. UnmarshalMetrics(event *eventhub.Event) (pmetric.Metrics, error)
  28. }
  29. type eventhubReceiver struct {
  30. eventHandler eventHandler
  31. dataType component.Type
  32. logger *zap.Logger
  33. logsUnmarshaler eventLogsUnmarshaler
  34. metricsUnmarshaler eventMetricsUnmarshaler
  35. nextLogsConsumer consumer.Logs
  36. nextMetricsConsumer consumer.Metrics
  37. obsrecv *receiverhelper.ObsReport
  38. }
  39. func (receiver *eventhubReceiver) Start(ctx context.Context, host component.Host) error {
  40. err := receiver.eventHandler.run(ctx, host)
  41. return err
  42. }
  43. func (receiver *eventhubReceiver) Shutdown(ctx context.Context) error {
  44. return receiver.eventHandler.close(ctx)
  45. }
  46. func (receiver *eventhubReceiver) setNextLogsConsumer(nextLogsConsumer consumer.Logs) {
  47. receiver.nextLogsConsumer = nextLogsConsumer
  48. }
  49. func (receiver *eventhubReceiver) setNextMetricsConsumer(nextMetricsConsumer consumer.Metrics) {
  50. receiver.nextMetricsConsumer = nextMetricsConsumer
  51. }
  52. func (receiver *eventhubReceiver) consume(ctx context.Context, event *eventhub.Event) error {
  53. switch receiver.dataType {
  54. case component.DataTypeLogs:
  55. return receiver.consumeLogs(ctx, event)
  56. case component.DataTypeMetrics:
  57. return receiver.consumeMetrics(ctx, event)
  58. case component.DataTypeTraces:
  59. fallthrough
  60. default:
  61. return fmt.Errorf("invalid data type: %v", receiver.dataType)
  62. }
  63. }
  64. func (receiver *eventhubReceiver) consumeLogs(ctx context.Context, event *eventhub.Event) error {
  65. if receiver.nextLogsConsumer == nil {
  66. return nil
  67. }
  68. if receiver.logsUnmarshaler == nil {
  69. return errors.New("unable to unmarshal logs with configured format")
  70. }
  71. logsContext := receiver.obsrecv.StartLogsOp(ctx)
  72. logs, err := receiver.logsUnmarshaler.UnmarshalLogs(event)
  73. if err != nil {
  74. return fmt.Errorf("failed to unmarshal logs: %w", err)
  75. }
  76. receiver.logger.Debug("Log Records", zap.Any("logs", logs))
  77. err = receiver.nextLogsConsumer.ConsumeLogs(logsContext, logs)
  78. receiver.obsrecv.EndLogsOp(logsContext, metadata.Type, 1, err)
  79. return err
  80. }
  81. func (receiver *eventhubReceiver) consumeMetrics(ctx context.Context, event *eventhub.Event) error {
  82. if receiver.nextMetricsConsumer == nil {
  83. return nil
  84. }
  85. if receiver.metricsUnmarshaler == nil {
  86. return errors.New("unable to unmarshal metrics with configured format")
  87. }
  88. metricsContext := receiver.obsrecv.StartMetricsOp(ctx)
  89. metrics, err := receiver.metricsUnmarshaler.UnmarshalMetrics(event)
  90. if err != nil {
  91. return fmt.Errorf("failed to unmarshal metrics: %w", err)
  92. }
  93. receiver.logger.Debug("Metric Records", zap.Any("metrics", metrics))
  94. err = receiver.nextMetricsConsumer.ConsumeMetrics(metricsContext, metrics)
  95. receiver.obsrecv.EndMetricsOp(metricsContext, metadata.Type, 1, err)
  96. return err
  97. }
  98. func newReceiver(
  99. receiverType component.Type,
  100. logsUnmarshaler eventLogsUnmarshaler,
  101. metricsUnmarshaler eventMetricsUnmarshaler,
  102. eventHandler eventHandler,
  103. settings receiver.CreateSettings,
  104. ) (component.Component, error) {
  105. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  106. ReceiverID: settings.ID,
  107. Transport: "event",
  108. ReceiverCreateSettings: settings,
  109. })
  110. if err != nil {
  111. return nil, err
  112. }
  113. eventhubReceiver := &eventhubReceiver{
  114. dataType: receiverType,
  115. eventHandler: eventHandler,
  116. logger: settings.Logger,
  117. logsUnmarshaler: logsUnmarshaler,
  118. metricsUnmarshaler: metricsUnmarshaler,
  119. obsrecv: obsrecv,
  120. }
  121. eventHandler.setDataConsumer(eventhubReceiver)
  122. return eventhubReceiver, nil
  123. }