udp_test.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package udplogreceiver
  4. import (
  5. "context"
  6. "fmt"
  7. "net"
  8. "path/filepath"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/stretchr/testify/require"
  13. "go.opentelemetry.io/collector/component"
  14. "go.opentelemetry.io/collector/component/componenttest"
  15. "go.opentelemetry.io/collector/confmap/confmaptest"
  16. "go.opentelemetry.io/collector/consumer/consumertest"
  17. "go.opentelemetry.io/collector/receiver/receivertest"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
  21. )
  22. func TestUdp(t *testing.T) {
  23. listenAddress := "127.0.0.1:29018"
  24. testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
  25. }
  26. func TestUdpAsync(t *testing.T) {
  27. listenAddress := "127.0.0.1:29019"
  28. cfg := testdataConfigYaml(listenAddress)
  29. cfg.InputConfig.AsyncConfig = &udp.AsyncConfig{
  30. Readers: 2,
  31. Processors: 2,
  32. MaxQueueLength: 100,
  33. }
  34. cfg.InputConfig.AsyncConfig.Readers = 2
  35. testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
  36. }
  37. func testUDP(t *testing.T, cfg *UDPLogConfig, listenAddress string) {
  38. numLogs := 5
  39. f := NewFactory()
  40. sink := new(consumertest.LogsSink)
  41. rcvr, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
  42. require.NoError(t, err)
  43. require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
  44. var conn net.Conn
  45. conn, err = net.Dial("udp", listenAddress)
  46. require.NoError(t, err)
  47. for i := 0; i < numLogs; i++ {
  48. msg := fmt.Sprintf("<86>1 2021-02-28T00:0%d:02.003Z test msg %d\n", i, i)
  49. _, err = conn.Write([]byte(msg))
  50. require.NoError(t, err)
  51. }
  52. require.NoError(t, conn.Close())
  53. require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, time.Millisecond)
  54. require.NoError(t, rcvr.Shutdown(context.Background()))
  55. require.Len(t, sink.AllLogs(), 1)
  56. resourceLogs := sink.AllLogs()[0].ResourceLogs().At(0)
  57. logs := resourceLogs.ScopeLogs().At(0).LogRecords()
  58. require.Equal(t, logs.Len(), numLogs)
  59. expectedLogs := make([]string, numLogs)
  60. for i := 0; i < numLogs; i++ {
  61. expectedLogs[i] = fmt.Sprintf("<86>1 2021-02-28T00:0%d:02.003Z test msg %d", i, i)
  62. }
  63. for i := 0; i < numLogs; i++ {
  64. assert.Contains(t, expectedLogs, logs.At(i).Body().Str())
  65. }
  66. }
  67. func TestLoadConfig(t *testing.T) {
  68. cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
  69. require.NoError(t, err)
  70. factory := NewFactory()
  71. cfg := factory.CreateDefaultConfig()
  72. sub, err := cm.Sub("udplog")
  73. require.NoError(t, err)
  74. require.NoError(t, component.UnmarshalConfig(sub, cfg))
  75. assert.NoError(t, component.ValidateConfig(cfg))
  76. assert.Equal(t, testdataConfigYaml("127.0.0.1:29018"), cfg)
  77. }
  78. func testdataConfigYaml(listenAddress string) *UDPLogConfig {
  79. return &UDPLogConfig{
  80. BaseConfig: adapter.BaseConfig{
  81. Operators: []operator.Config{},
  82. },
  83. InputConfig: func() udp.Config {
  84. c := udp.NewConfig()
  85. c.ListenAddress = listenAddress
  86. return *c
  87. }(),
  88. }
  89. }
  90. func TestDecodeInputConfigFailure(t *testing.T) {
  91. sink := new(consumertest.LogsSink)
  92. factory := NewFactory()
  93. badCfg := &UDPLogConfig{
  94. BaseConfig: adapter.BaseConfig{
  95. Operators: []operator.Config{},
  96. },
  97. InputConfig: func() udp.Config {
  98. c := udp.NewConfig()
  99. c.Encoding = "fake"
  100. return *c
  101. }(),
  102. }
  103. receiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), badCfg, sink)
  104. require.Error(t, err, "receiver creation should fail if input config isn't valid")
  105. require.Nil(t, receiver, "receiver creation should fail if input config isn't valid")
  106. }
  107. func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool {
  108. return func() bool {
  109. return sink.LogRecordCount() == expected
  110. }
  111. }