receiver_test.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package statsdreceiver
  4. import (
  5. "context"
  6. "errors"
  7. "net"
  8. "strconv"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/stretchr/testify/require"
  13. "go.opentelemetry.io/collector/component"
  14. "go.opentelemetry.io/collector/component/componenttest"
  15. "go.opentelemetry.io/collector/config/confignet"
  16. "go.opentelemetry.io/collector/consumer"
  17. "go.opentelemetry.io/collector/consumer/consumertest"
  18. "go.opentelemetry.io/collector/pdata/pmetric"
  19. "go.opentelemetry.io/collector/receiver/receivertest"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
  21. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
  22. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client"
  23. )
  24. func Test_statsdreceiver_New(t *testing.T) {
  25. defaultConfig := createDefaultConfig().(*Config)
  26. type args struct {
  27. config Config
  28. nextConsumer consumer.Metrics
  29. }
  30. tests := []struct {
  31. name string
  32. args args
  33. wantErr error
  34. }{
  35. {
  36. name: "nil_nextConsumer",
  37. args: args{
  38. config: *defaultConfig,
  39. },
  40. wantErr: component.ErrNilNextConsumer,
  41. },
  42. }
  43. for _, tt := range tests {
  44. t.Run(tt.name, func(t *testing.T) {
  45. _, err := newReceiver(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer)
  46. assert.Equal(t, tt.wantErr, err)
  47. })
  48. }
  49. }
  50. func Test_statsdreceiver_Start(t *testing.T) {
  51. type args struct {
  52. config Config
  53. nextConsumer consumer.Metrics
  54. }
  55. tests := []struct {
  56. name string
  57. args args
  58. wantErr error
  59. }{
  60. {
  61. name: "unsupported transport",
  62. args: args{
  63. config: Config{
  64. NetAddr: confignet.NetAddr{
  65. Endpoint: "localhost:8125",
  66. Transport: "unknown",
  67. },
  68. },
  69. nextConsumer: consumertest.NewNop(),
  70. },
  71. wantErr: errors.New("unsupported transport \"unknown\""),
  72. },
  73. }
  74. for _, tt := range tests {
  75. t.Run(tt.name, func(t *testing.T) {
  76. receiver, err := newReceiver(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer)
  77. require.NoError(t, err)
  78. err = receiver.Start(context.Background(), componenttest.NewNopHost())
  79. assert.Equal(t, tt.wantErr, err)
  80. assert.NoError(t, receiver.Shutdown(context.Background()))
  81. })
  82. }
  83. }
  84. func TestStatsdReceiver_ShutdownBeforeStart(t *testing.T) {
  85. ctx := context.Background()
  86. cfg := createDefaultConfig().(*Config)
  87. nextConsumer := consumertest.NewNop()
  88. rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, nextConsumer)
  89. assert.NoError(t, err)
  90. r := rcv.(*statsdReceiver)
  91. assert.NoError(t, r.Shutdown(ctx))
  92. }
  93. func TestStatsdReceiver_Flush(t *testing.T) {
  94. ctx := context.Background()
  95. cfg := createDefaultConfig().(*Config)
  96. nextConsumer := consumertest.NewNop()
  97. rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, nextConsumer)
  98. assert.NoError(t, err)
  99. r := rcv.(*statsdReceiver)
  100. var metrics = pmetric.NewMetrics()
  101. assert.Nil(t, r.Flush(ctx, metrics, nextConsumer))
  102. assert.NoError(t, r.Start(ctx, componenttest.NewNopHost()))
  103. assert.NoError(t, r.Shutdown(ctx))
  104. }
  105. func Test_statsdreceiver_EndToEnd(t *testing.T) {
  106. addr := testutil.GetAvailableLocalAddress(t)
  107. host, portStr, err := net.SplitHostPort(addr)
  108. require.NoError(t, err)
  109. port, err := strconv.Atoi(portStr)
  110. require.NoError(t, err)
  111. tests := []struct {
  112. name string
  113. configFn func() *Config
  114. clientFn func(t *testing.T) *client.StatsD
  115. }{
  116. {
  117. name: "default_config with 4s interval",
  118. configFn: func() *Config {
  119. return &Config{
  120. NetAddr: confignet.NetAddr{
  121. Endpoint: defaultBindEndpoint,
  122. Transport: defaultTransport,
  123. },
  124. AggregationInterval: 4 * time.Second,
  125. }
  126. },
  127. clientFn: func(t *testing.T) *client.StatsD {
  128. c, err := client.NewStatsD(client.UDP, host, port)
  129. require.NoError(t, err)
  130. return c
  131. },
  132. },
  133. }
  134. for _, tt := range tests {
  135. t.Run(tt.name, func(t *testing.T) {
  136. cfg := tt.configFn()
  137. cfg.NetAddr.Endpoint = addr
  138. sink := new(consumertest.MetricsSink)
  139. rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, sink)
  140. require.NoError(t, err)
  141. r := rcv.(*statsdReceiver)
  142. mr := transport.NewMockReporter(1)
  143. r.reporter = mr
  144. require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
  145. defer func() {
  146. assert.NoError(t, r.Shutdown(context.Background()))
  147. }()
  148. statsdClient := tt.clientFn(t)
  149. statsdMetric := client.Metric{
  150. Name: "test.metric",
  151. Value: "42",
  152. Type: "c",
  153. }
  154. err = statsdClient.SendMetric(statsdMetric)
  155. require.NoError(t, err)
  156. time.Sleep(5 * time.Second)
  157. mdd := sink.AllMetrics()
  158. require.Len(t, mdd, 1)
  159. require.Equal(t, 1, mdd[0].ResourceMetrics().Len())
  160. require.Equal(t, 1, mdd[0].ResourceMetrics().At(0).ScopeMetrics().Len())
  161. require.Equal(t, 1, mdd[0].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len())
  162. metric := mdd[0].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
  163. assert.Equal(t, statsdMetric.Name, metric.Name())
  164. assert.Equal(t, pmetric.MetricTypeSum, metric.Type())
  165. require.Equal(t, 1, metric.Sum().DataPoints().Len())
  166. assert.NotEqual(t, 0, metric.Sum().DataPoints().At(0).Timestamp())
  167. assert.NotEqual(t, 0, metric.Sum().DataPoints().At(0).StartTimestamp())
  168. assert.Less(t, metric.Sum().DataPoints().At(0).StartTimestamp(), metric.Sum().DataPoints().At(0).Timestamp())
  169. // Send the same metric again to ensure that the timestamps of successive data points
  170. // are aligned.
  171. statsdMetric.Value = "43"
  172. err = statsdClient.SendMetric(statsdMetric)
  173. require.NoError(t, err)
  174. time.Sleep(5 * time.Second)
  175. mddAfter := sink.AllMetrics()
  176. require.Len(t, mddAfter, 2)
  177. metricAfter := mddAfter[1].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
  178. require.Equal(t, metric.Sum().DataPoints().At(0).Timestamp(), metricAfter.Sum().DataPoints().At(0).StartTimestamp())
  179. })
  180. }
  181. }