123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package udplogreceiver
- import (
- "context"
- "fmt"
- "net"
- "path/filepath"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/confmap/confmaptest"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/receiver/receivertest"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
- )
- func TestUdp(t *testing.T) {
- listenAddress := "127.0.0.1:29018"
- testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
- }
- func TestUdpAsync(t *testing.T) {
- listenAddress := "127.0.0.1:29019"
- cfg := testdataConfigYaml(listenAddress)
- cfg.InputConfig.AsyncConfig = &udp.AsyncConfig{
- Readers: 2,
- Processors: 2,
- MaxQueueLength: 100,
- }
- cfg.InputConfig.AsyncConfig.Readers = 2
- testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
- }
- func testUDP(t *testing.T, cfg *UDPLogConfig, listenAddress string) {
- numLogs := 5
- f := NewFactory()
- sink := new(consumertest.LogsSink)
- rcvr, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
- require.NoError(t, err)
- require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
- var conn net.Conn
- conn, err = net.Dial("udp", listenAddress)
- require.NoError(t, err)
- for i := 0; i < numLogs; i++ {
- msg := fmt.Sprintf("<86>1 2021-02-28T00:0%d:02.003Z test msg %d\n", i, i)
- _, err = conn.Write([]byte(msg))
- require.NoError(t, err)
- }
- require.NoError(t, conn.Close())
- require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, time.Millisecond)
- require.NoError(t, rcvr.Shutdown(context.Background()))
- require.Len(t, sink.AllLogs(), 1)
- resourceLogs := sink.AllLogs()[0].ResourceLogs().At(0)
- logs := resourceLogs.ScopeLogs().At(0).LogRecords()
- require.Equal(t, logs.Len(), numLogs)
- expectedLogs := make([]string, numLogs)
- for i := 0; i < numLogs; i++ {
- expectedLogs[i] = fmt.Sprintf("<86>1 2021-02-28T00:0%d:02.003Z test msg %d", i, i)
- }
- for i := 0; i < numLogs; i++ {
- assert.Contains(t, expectedLogs, logs.At(i).Body().Str())
- }
- }
- func TestLoadConfig(t *testing.T) {
- cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
- require.NoError(t, err)
- factory := NewFactory()
- cfg := factory.CreateDefaultConfig()
- sub, err := cm.Sub("udplog")
- require.NoError(t, err)
- require.NoError(t, component.UnmarshalConfig(sub, cfg))
- assert.NoError(t, component.ValidateConfig(cfg))
- assert.Equal(t, testdataConfigYaml("127.0.0.1:29018"), cfg)
- }
- func testdataConfigYaml(listenAddress string) *UDPLogConfig {
- return &UDPLogConfig{
- BaseConfig: adapter.BaseConfig{
- Operators: []operator.Config{},
- },
- InputConfig: func() udp.Config {
- c := udp.NewConfig()
- c.ListenAddress = listenAddress
- return *c
- }(),
- }
- }
- func TestDecodeInputConfigFailure(t *testing.T) {
- sink := new(consumertest.LogsSink)
- factory := NewFactory()
- badCfg := &UDPLogConfig{
- BaseConfig: adapter.BaseConfig{
- Operators: []operator.Config{},
- },
- InputConfig: func() udp.Config {
- c := udp.NewConfig()
- c.Encoding = "fake"
- return *c
- }(),
- }
- receiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), badCfg, sink)
- require.Error(t, err, "receiver creation should fail if input config isn't valid")
- require.Nil(t, receiver, "receiver creation should fail if input config isn't valid")
- }
- func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool {
- return func() bool {
- return sink.LogRecordCount() == expected
- }
- }
|