123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package carbonreceiver
- import (
- "context"
- "errors"
- "runtime"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/config/confignet"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/receiver/receivertest"
- sdktrace "go.opentelemetry.io/otel/sdk/trace"
- "go.opentelemetry.io/otel/sdk/trace/tracetest"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport/client"
- )
- func Test_carbonreceiver_New(t *testing.T) {
- defaultConfig := createDefaultConfig().(*Config)
- type args struct {
- config Config
- nextConsumer consumer.Metrics
- }
- tests := []struct {
- name string
- args args
- wantErr error
- }{
- {
- name: "default_config",
- args: args{
- config: *defaultConfig,
- nextConsumer: consumertest.NewNop(),
- },
- },
- {
- name: "zero_value_parser",
- args: args{
- config: Config{
- NetAddr: confignet.NetAddr{
- Endpoint: defaultConfig.Endpoint,
- Transport: defaultConfig.Transport,
- },
- TCPIdleTimeout: defaultConfig.TCPIdleTimeout,
- },
- nextConsumer: consumertest.NewNop(),
- },
- },
- {
- name: "nil_nextConsumer",
- args: args{
- config: *defaultConfig,
- },
- wantErr: component.ErrNilNextConsumer,
- },
- {
- name: "empty_endpoint",
- args: args{
- config: Config{},
- nextConsumer: consumertest.NewNop(),
- },
- wantErr: errEmptyEndpoint,
- },
- {
- name: "regex_parser",
- args: args{
- config: Config{
- NetAddr: confignet.NetAddr{
- Endpoint: "localhost:2003",
- Transport: "tcp",
- },
- Parser: &protocol.Config{
- Type: "regex",
- Config: &protocol.RegexParserConfig{
- Rules: []*protocol.RegexRule{
- {
- Regexp: `(?P<key_root>[^.]*)\.test`,
- },
- },
- },
- },
- },
- nextConsumer: consumertest.NewNop(),
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- got, err := newMetricsReceiver(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer)
- assert.Equal(t, tt.wantErr, err)
- if err == nil {
- require.NotNil(t, got)
- assert.NoError(t, got.Shutdown(context.Background()))
- } else {
- assert.Nil(t, got)
- }
- })
- }
- }
- func Test_carbonreceiver_Start(t *testing.T) {
- type args struct {
- config Config
- nextConsumer consumer.Metrics
- }
- tests := []struct {
- name string
- args args
- wantErr error
- }{
- {
- name: "invalid_transport",
- args: args{
- config: Config{
- NetAddr: confignet.NetAddr{
- Endpoint: "localhost:2003",
- Transport: "unknown_transp",
- },
- Parser: &protocol.Config{
- Type: "plaintext",
- Config: &protocol.PlaintextConfig{},
- },
- },
- nextConsumer: consumertest.NewNop(),
- },
- wantErr: errors.New("unsupported transport \"unknown_transp\""),
- },
- {
- name: "negative_tcp_idle_timeout",
- args: args{
- config: Config{
- NetAddr: confignet.NetAddr{
- Endpoint: "localhost:2003",
- Transport: "tcp",
- },
- TCPIdleTimeout: -1 * time.Second,
- Parser: &protocol.Config{
- Type: "plaintext",
- Config: &protocol.PlaintextConfig{},
- },
- },
- nextConsumer: consumertest.NewNop(),
- },
- wantErr: errors.New("invalid idle timeout: -1s"),
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- got, err := newMetricsReceiver(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer)
- require.NoError(t, err)
- err = got.Start(context.Background(), componenttest.NewNopHost())
- assert.Equal(t, tt.wantErr, err)
- assert.NoError(t, got.Shutdown(context.Background()))
- })
- }
- }
- func Test_carbonreceiver_EndToEnd(t *testing.T) {
- addr := testutil.GetAvailableLocalAddress(t)
- tests := []struct {
- name string
- configFn func() *Config
- clientFn func(t *testing.T) func(client.Metric) error
- }{
- {
- name: "default_config",
- configFn: func() *Config {
- return createDefaultConfig().(*Config)
- },
- clientFn: func(t *testing.T) func(client.Metric) error {
- c, err := client.NewGraphite(client.TCP, addr)
- require.NoError(t, err)
- return c.SendMetric
- },
- },
- {
- name: "tcp_reconnect",
- configFn: func() *Config {
- return createDefaultConfig().(*Config)
- },
- clientFn: func(t *testing.T) func(client.Metric) error {
- c, err := client.NewGraphite(client.TCP, addr)
- require.NoError(t, err)
- return c.SputterThenSendMetric
- },
- },
- {
- name: "default_config_udp",
- configFn: func() *Config {
- cfg := createDefaultConfig().(*Config)
- cfg.Transport = "udp"
- return cfg
- },
- clientFn: func(t *testing.T) func(client.Metric) error {
- c, err := client.NewGraphite(client.UDP, addr)
- require.NoError(t, err)
- return c.SendMetric
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- cfg := tt.configFn()
- cfg.Endpoint = addr
- sink := new(consumertest.MetricsSink)
- recorder := tracetest.NewSpanRecorder()
- rt := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder))
- cs := receivertest.NewNopCreateSettings()
- cs.TracerProvider = rt
- rcv, err := newMetricsReceiver(cs, *cfg, sink)
- require.NoError(t, err)
- r := rcv.(*carbonReceiver)
- mr, err := newReporter(cs)
- require.NoError(t, err)
- r.reporter = mr
- require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
- runtime.Gosched()
- defer func() {
- require.NoError(t, r.Shutdown(context.Background()))
- }()
- snd := tt.clientFn(t)
- ts := time.Now()
- carbonMetric := client.Metric{
- Name: "tst_dbl",
- Value: 1.23,
- Timestamp: ts,
- }
- err = snd(carbonMetric)
- require.NoError(t, err)
- require.Eventually(t, func() bool {
- return len(recorder.Ended()) == 1
- }, 30*time.Second, 100*time.Millisecond)
- mdd := sink.AllMetrics()
- require.Len(t, mdd, 1)
- require.Equal(t, 1, mdd[0].MetricCount())
- m := mdd[0].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
- assert.Equal(t, carbonMetric.Name, m.Name())
- require.Equal(t, 1, m.Gauge().DataPoints().Len())
- require.Equal(t, len(recorder.Ended()), len(recorder.Started()))
- })
- }
- }
|