reporter.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"
  4. import (
  5. "context"
  6. "go.opencensus.io/trace"
  7. "go.opentelemetry.io/collector/receiver"
  8. "go.opentelemetry.io/collector/receiver/receiverhelper"
  9. "go.uber.org/zap"
  10. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
  11. )
  12. // reporter struct implements the transport.Reporter interface to give consistent
  13. // observability per Collector metric observability package.
  14. type reporter struct {
  15. logger *zap.Logger
  16. sugaredLogger *zap.SugaredLogger // Used for generic debug logging
  17. obsrecv *receiverhelper.ObsReport
  18. }
  19. var _ transport.Reporter = (*reporter)(nil)
  20. func newReporter(set receiver.CreateSettings) (transport.Reporter, error) {
  21. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
  22. ReceiverID: set.ID,
  23. Transport: "tcp",
  24. ReceiverCreateSettings: set,
  25. })
  26. if err != nil {
  27. return nil, err
  28. }
  29. return &reporter{
  30. logger: set.Logger,
  31. sugaredLogger: set.Logger.Sugar(),
  32. obsrecv: obsrecv,
  33. }, nil
  34. }
  35. // OnDataReceived is called when a message or request is received from
  36. // a client. The returned context should be used in other calls to the same
  37. // reporter instance. The caller code should include a call to end the
  38. // returned span.
  39. func (r *reporter) OnDataReceived(ctx context.Context) context.Context {
  40. return r.obsrecv.StartMetricsOp(ctx)
  41. }
  42. // OnTranslationError is used to report a translation error from original
  43. // format to the internal format of the Collector. The context and span
  44. // passed to it should be the ones returned by OnDataReceived.
  45. func (r *reporter) OnTranslationError(ctx context.Context, err error) {
  46. if err == nil {
  47. return
  48. }
  49. r.logger.Debug("StatsD translation error", zap.Error(err))
  50. // Using annotations since multiple translation errors can happen in the
  51. // same client message/request. The time itself is not relevant.
  52. span := trace.FromContext(ctx)
  53. span.Annotate([]trace.Attribute{
  54. trace.StringAttribute("error", err.Error())},
  55. "translation",
  56. )
  57. }
  58. // OnMetricsProcessed is called when the received data is passed to next
  59. // consumer on the pipeline. The context and span passed to it should be the
  60. // ones returned by OnDataReceived. The error should be error returned by
  61. // the next consumer - the reporter is expected to handle nil error too.
  62. func (r *reporter) OnMetricsProcessed(
  63. ctx context.Context,
  64. numReceivedMessages int,
  65. err error,
  66. ) {
  67. if err != nil {
  68. r.logger.Debug(
  69. "StatsD receiver failed to push metrics into pipeline",
  70. zap.Int("numReceivedMessages", numReceivedMessages),
  71. zap.Error(err))
  72. span := trace.FromContext(ctx)
  73. span.SetStatus(trace.Status{
  74. Code: trace.StatusCodeUnknown,
  75. Message: err.Error(),
  76. })
  77. }
  78. r.obsrecv.EndMetricsOp(ctx, "statsd", numReceivedMessages, err)
  79. }
  80. func (r *reporter) OnDebugf(template string, args ...any) {
  81. if r.logger.Check(zap.DebugLevel, "debug") != nil {
  82. r.sugaredLogger.Debugf(template, args...)
  83. }
  84. }