receiver.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "strings"
  10. "time"
  11. "go.opentelemetry.io/collector/client"
  12. "go.opentelemetry.io/collector/component"
  13. "go.opentelemetry.io/collector/consumer"
  14. "go.opentelemetry.io/collector/pdata/pmetric"
  15. "go.opentelemetry.io/collector/receiver"
  16. "go.uber.org/zap"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
  19. )
  20. var _ receiver.Metrics = (*statsdReceiver)(nil)
  21. // statsdReceiver implements the receiver.Metrics for StatsD protocol.
  22. type statsdReceiver struct {
  23. settings receiver.CreateSettings
  24. config *Config
  25. server transport.Server
  26. reporter transport.Reporter
  27. parser protocol.Parser
  28. nextConsumer consumer.Metrics
  29. cancel context.CancelFunc
  30. }
  31. // newReceiver creates the StatsD receiver with the given parameters.
  32. func newReceiver(
  33. set receiver.CreateSettings,
  34. config Config,
  35. nextConsumer consumer.Metrics,
  36. ) (receiver.Metrics, error) {
  37. if nextConsumer == nil {
  38. return nil, component.ErrNilNextConsumer
  39. }
  40. if config.NetAddr.Endpoint == "" {
  41. config.NetAddr.Endpoint = "localhost:8125"
  42. }
  43. rep, err := newReporter(set)
  44. if err != nil {
  45. return nil, err
  46. }
  47. r := &statsdReceiver{
  48. settings: set,
  49. config: &config,
  50. nextConsumer: nextConsumer,
  51. reporter: rep,
  52. parser: &protocol.StatsDParser{
  53. BuildInfo: set.BuildInfo,
  54. },
  55. }
  56. return r, nil
  57. }
  58. func buildTransportServer(config Config) (transport.Server, error) {
  59. // TODO: Add TCP/unix socket transport implementations
  60. switch strings.ToLower(config.NetAddr.Transport) {
  61. case "", "udp":
  62. return transport.NewUDPServer(config.NetAddr.Endpoint)
  63. case "tcp":
  64. return transport.NewTCPServer(config.NetAddr.Endpoint)
  65. }
  66. return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport)
  67. }
  68. // Start starts a UDP server that can process StatsD messages.
  69. func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error {
  70. ctx, r.cancel = context.WithCancel(ctx)
  71. server, err := buildTransportServer(*r.config)
  72. if err != nil {
  73. return err
  74. }
  75. r.server = server
  76. transferChan := make(chan transport.Metric, 10)
  77. ticker := time.NewTicker(r.config.AggregationInterval)
  78. err = r.parser.Initialize(
  79. r.config.EnableMetricType,
  80. r.config.IsMonotonicCounter,
  81. r.config.TimerHistogramMapping,
  82. )
  83. if err != nil {
  84. return err
  85. }
  86. go func() {
  87. if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan); err != nil {
  88. if !errors.Is(err, net.ErrClosed) {
  89. host.ReportFatalError(err)
  90. }
  91. }
  92. }()
  93. go func() {
  94. for {
  95. select {
  96. case <-ticker.C:
  97. batchMetrics := r.parser.GetMetrics()
  98. for _, batch := range batchMetrics {
  99. batchCtx := client.NewContext(ctx, batch.Info)
  100. if err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer); err != nil {
  101. r.reporter.OnDebugf("Error flushing metrics", zap.Error(err))
  102. }
  103. }
  104. case metric := <-transferChan:
  105. if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil {
  106. r.reporter.OnDebugf("Error aggregating metric", zap.Error(err))
  107. }
  108. case <-ctx.Done():
  109. ticker.Stop()
  110. return
  111. }
  112. }
  113. }()
  114. return nil
  115. }
  116. // Shutdown stops the StatsD receiver.
  117. func (r *statsdReceiver) Shutdown(context.Context) error {
  118. if r.cancel == nil || r.server == nil {
  119. return nil
  120. }
  121. err := r.server.Close()
  122. r.cancel()
  123. return err
  124. }
  125. func (r *statsdReceiver) Flush(ctx context.Context, metrics pmetric.Metrics, nextConsumer consumer.Metrics) error {
  126. return nextConsumer.ConsumeMetrics(ctx, metrics)
  127. }