receiver_test.go 6.5 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package carbonreceiver
  4. import (
  5. "context"
  6. "errors"
  7. "runtime"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. "go.opentelemetry.io/collector/component"
  13. "go.opentelemetry.io/collector/component/componenttest"
  14. "go.opentelemetry.io/collector/config/confignet"
  15. "go.opentelemetry.io/collector/consumer"
  16. "go.opentelemetry.io/collector/consumer/consumertest"
  17. "go.opentelemetry.io/collector/receiver/receivertest"
  18. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  19. "go.opentelemetry.io/otel/sdk/trace/tracetest"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
  21. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
  22. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport/client"
  23. )
  24. func Test_carbonreceiver_New(t *testing.T) {
  25. defaultConfig := createDefaultConfig().(*Config)
  26. type args struct {
  27. config Config
  28. nextConsumer consumer.Metrics
  29. }
  30. tests := []struct {
  31. name string
  32. args args
  33. wantErr error
  34. }{
  35. {
  36. name: "default_config",
  37. args: args{
  38. config: *defaultConfig,
  39. nextConsumer: consumertest.NewNop(),
  40. },
  41. },
  42. {
  43. name: "zero_value_parser",
  44. args: args{
  45. config: Config{
  46. NetAddr: confignet.NetAddr{
  47. Endpoint: defaultConfig.Endpoint,
  48. Transport: defaultConfig.Transport,
  49. },
  50. TCPIdleTimeout: defaultConfig.TCPIdleTimeout,
  51. },
  52. nextConsumer: consumertest.NewNop(),
  53. },
  54. },
  55. {
  56. name: "nil_nextConsumer",
  57. args: args{
  58. config: *defaultConfig,
  59. },
  60. wantErr: component.ErrNilNextConsumer,
  61. },
  62. {
  63. name: "empty_endpoint",
  64. args: args{
  65. config: Config{},
  66. nextConsumer: consumertest.NewNop(),
  67. },
  68. wantErr: errEmptyEndpoint,
  69. },
  70. {
  71. name: "regex_parser",
  72. args: args{
  73. config: Config{
  74. NetAddr: confignet.NetAddr{
  75. Endpoint: "localhost:2003",
  76. Transport: "tcp",
  77. },
  78. Parser: &protocol.Config{
  79. Type: "regex",
  80. Config: &protocol.RegexParserConfig{
  81. Rules: []*protocol.RegexRule{
  82. {
  83. Regexp: `(?P<key_root>[^.]*)\.test`,
  84. },
  85. },
  86. },
  87. },
  88. },
  89. nextConsumer: consumertest.NewNop(),
  90. },
  91. },
  92. }
  93. for _, tt := range tests {
  94. t.Run(tt.name, func(t *testing.T) {
  95. got, err := newMetricsReceiver(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer)
  96. assert.Equal(t, tt.wantErr, err)
  97. if err == nil {
  98. require.NotNil(t, got)
  99. assert.NoError(t, got.Shutdown(context.Background()))
  100. } else {
  101. assert.Nil(t, got)
  102. }
  103. })
  104. }
  105. }
  106. func Test_carbonreceiver_Start(t *testing.T) {
  107. type args struct {
  108. config Config
  109. nextConsumer consumer.Metrics
  110. }
  111. tests := []struct {
  112. name string
  113. args args
  114. wantErr error
  115. }{
  116. {
  117. name: "invalid_transport",
  118. args: args{
  119. config: Config{
  120. NetAddr: confignet.NetAddr{
  121. Endpoint: "localhost:2003",
  122. Transport: "unknown_transp",
  123. },
  124. Parser: &protocol.Config{
  125. Type: "plaintext",
  126. Config: &protocol.PlaintextConfig{},
  127. },
  128. },
  129. nextConsumer: consumertest.NewNop(),
  130. },
  131. wantErr: errors.New("unsupported transport \"unknown_transp\""),
  132. },
  133. {
  134. name: "negative_tcp_idle_timeout",
  135. args: args{
  136. config: Config{
  137. NetAddr: confignet.NetAddr{
  138. Endpoint: "localhost:2003",
  139. Transport: "tcp",
  140. },
  141. TCPIdleTimeout: -1 * time.Second,
  142. Parser: &protocol.Config{
  143. Type: "plaintext",
  144. Config: &protocol.PlaintextConfig{},
  145. },
  146. },
  147. nextConsumer: consumertest.NewNop(),
  148. },
  149. wantErr: errors.New("invalid idle timeout: -1s"),
  150. },
  151. }
  152. for _, tt := range tests {
  153. t.Run(tt.name, func(t *testing.T) {
  154. got, err := newMetricsReceiver(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer)
  155. require.NoError(t, err)
  156. err = got.Start(context.Background(), componenttest.NewNopHost())
  157. assert.Equal(t, tt.wantErr, err)
  158. assert.NoError(t, got.Shutdown(context.Background()))
  159. })
  160. }
  161. }
  162. func Test_carbonreceiver_EndToEnd(t *testing.T) {
  163. addr := testutil.GetAvailableLocalAddress(t)
  164. tests := []struct {
  165. name string
  166. configFn func() *Config
  167. clientFn func(t *testing.T) func(client.Metric) error
  168. }{
  169. {
  170. name: "default_config",
  171. configFn: func() *Config {
  172. return createDefaultConfig().(*Config)
  173. },
  174. clientFn: func(t *testing.T) func(client.Metric) error {
  175. c, err := client.NewGraphite(client.TCP, addr)
  176. require.NoError(t, err)
  177. return c.SendMetric
  178. },
  179. },
  180. {
  181. name: "tcp_reconnect",
  182. configFn: func() *Config {
  183. return createDefaultConfig().(*Config)
  184. },
  185. clientFn: func(t *testing.T) func(client.Metric) error {
  186. c, err := client.NewGraphite(client.TCP, addr)
  187. require.NoError(t, err)
  188. return c.SputterThenSendMetric
  189. },
  190. },
  191. {
  192. name: "default_config_udp",
  193. configFn: func() *Config {
  194. cfg := createDefaultConfig().(*Config)
  195. cfg.Transport = "udp"
  196. return cfg
  197. },
  198. clientFn: func(t *testing.T) func(client.Metric) error {
  199. c, err := client.NewGraphite(client.UDP, addr)
  200. require.NoError(t, err)
  201. return c.SendMetric
  202. },
  203. },
  204. }
  205. for _, tt := range tests {
  206. t.Run(tt.name, func(t *testing.T) {
  207. cfg := tt.configFn()
  208. cfg.Endpoint = addr
  209. sink := new(consumertest.MetricsSink)
  210. recorder := tracetest.NewSpanRecorder()
  211. rt := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder))
  212. cs := receivertest.NewNopCreateSettings()
  213. cs.TracerProvider = rt
  214. rcv, err := newMetricsReceiver(cs, *cfg, sink)
  215. require.NoError(t, err)
  216. r := rcv.(*carbonReceiver)
  217. mr, err := newReporter(cs)
  218. require.NoError(t, err)
  219. r.reporter = mr
  220. require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
  221. runtime.Gosched()
  222. defer func() {
  223. require.NoError(t, r.Shutdown(context.Background()))
  224. }()
  225. snd := tt.clientFn(t)
  226. ts := time.Now()
  227. carbonMetric := client.Metric{
  228. Name: "tst_dbl",
  229. Value: 1.23,
  230. Timestamp: ts,
  231. }
  232. err = snd(carbonMetric)
  233. require.NoError(t, err)
  234. require.Eventually(t, func() bool {
  235. return len(recorder.Ended()) == 1
  236. }, 30*time.Second, 100*time.Millisecond)
  237. mdd := sink.AllMetrics()
  238. require.Len(t, mdd, 1)
  239. require.Equal(t, 1, mdd[0].MetricCount())
  240. m := mdd[0].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
  241. assert.Equal(t, carbonMetric.Name, m.Name())
  242. require.Equal(t, 1, m.Gauge().DataPoints().Len())
  243. require.Equal(t, len(recorder.Ended()), len(recorder.Started()))
  244. })
  245. }
  246. }