tcp_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package tcplogreceiver
  4. import (
  5. "context"
  6. "fmt"
  7. "net"
  8. "path/filepath"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/stretchr/testify/require"
  13. "go.opentelemetry.io/collector/component"
  14. "go.opentelemetry.io/collector/component/componenttest"
  15. "go.opentelemetry.io/collector/confmap/confmaptest"
  16. "go.opentelemetry.io/collector/consumer/consumertest"
  17. "go.opentelemetry.io/collector/receiver/receivertest"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
  21. )
  22. func TestTcp(t *testing.T) {
  23. testTCP(t, testdataConfigYaml())
  24. }
  25. func testTCP(t *testing.T, cfg *TCPLogConfig) {
  26. numLogs := 5
  27. f := NewFactory()
  28. sink := new(consumertest.LogsSink)
  29. rcvr, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
  30. require.NoError(t, err)
  31. require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
  32. var conn net.Conn
  33. conn, err = net.Dial("tcp", "127.0.0.1:29018")
  34. require.NoError(t, err)
  35. for i := 0; i < numLogs; i++ {
  36. msg := fmt.Sprintf("<86>1 2021-02-28T00:0%d:02.003Z test msg %d\n", i, i)
  37. _, err = conn.Write([]byte(msg))
  38. require.NoError(t, err)
  39. }
  40. require.NoError(t, conn.Close())
  41. require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, time.Millisecond)
  42. require.NoError(t, rcvr.Shutdown(context.Background()))
  43. require.Len(t, sink.AllLogs(), 1)
  44. resourceLogs := sink.AllLogs()[0].ResourceLogs().At(0)
  45. logs := resourceLogs.ScopeLogs().At(0).LogRecords()
  46. for i := 0; i < numLogs; i++ {
  47. log := logs.At(i)
  48. msg := log.Body()
  49. require.Equal(t, msg.Str(), fmt.Sprintf("<86>1 2021-02-28T00:0%d:02.003Z test msg %d", i, i))
  50. }
  51. }
  52. func TestLoadConfig(t *testing.T) {
  53. cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
  54. require.NoError(t, err)
  55. factory := NewFactory()
  56. cfg := factory.CreateDefaultConfig()
  57. sub, err := cm.Sub("tcplog")
  58. require.NoError(t, err)
  59. require.NoError(t, component.UnmarshalConfig(sub, cfg))
  60. assert.NoError(t, component.ValidateConfig(cfg))
  61. assert.Equal(t, testdataConfigYaml(), cfg)
  62. }
  63. func testdataConfigYaml() *TCPLogConfig {
  64. return &TCPLogConfig{
  65. BaseConfig: adapter.BaseConfig{
  66. Operators: []operator.Config{},
  67. },
  68. InputConfig: func() tcp.Config {
  69. c := tcp.NewConfig()
  70. c.ListenAddress = "127.0.0.1:29018"
  71. return *c
  72. }(),
  73. }
  74. }
  75. func TestDecodeInputConfigFailure(t *testing.T) {
  76. factory := NewFactory()
  77. badCfg := &TCPLogConfig{
  78. BaseConfig: adapter.BaseConfig{
  79. Operators: []operator.Config{},
  80. },
  81. InputConfig: func() tcp.Config {
  82. c := tcp.NewConfig()
  83. c.Encoding = "fake"
  84. return *c
  85. }(),
  86. }
  87. receiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), badCfg, consumertest.NewNop())
  88. require.Error(t, err, "receiver creation should fail if input config isn't valid")
  89. require.Nil(t, receiver, "receiver creation should fail if input config isn't valid")
  90. }
  91. func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool {
  92. return func() bool {
  93. return sink.LogRecordCount() == expected
  94. }
  95. }