exporter_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package syslogexporter
  4. import (
  5. "context"
  6. "errors"
  7. "io"
  8. "net"
  9. "strconv"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. "go.opentelemetry.io/collector/component"
  15. "go.opentelemetry.io/collector/consumer/consumererror"
  16. "go.opentelemetry.io/collector/exporter"
  17. "go.opentelemetry.io/collector/pdata/pcommon"
  18. "go.opentelemetry.io/collector/pdata/plog"
  19. "go.uber.org/zap"
  20. )
  21. var expectedForm = "<165>1 2003-08-24T12:14:15Z 192.0.2.1 myproc 8710 - - It's time to make the do-nuts.\n"
  22. var originalForm = "<165>1 2003-08-24T05:14:15-07:00 192.0.2.1 myproc 8710 - - It's time to make the do-nuts."
  23. type exporterTest struct {
  24. srv net.TCPListener
  25. exp *syslogexporter
  26. }
  27. func exampleLog(t *testing.T) plog.LogRecord {
  28. buffer := plog.NewLogRecord()
  29. buffer.Body().SetStr(originalForm)
  30. timestamp := "2003-08-24T05:14:15-07:00"
  31. timeStr, err := time.Parse(time.RFC3339, timestamp)
  32. require.NoError(t, err, "failed to start test syslog server")
  33. ts := pcommon.NewTimestampFromTime(timeStr)
  34. buffer.SetTimestamp(ts)
  35. attrMap := map[string]any{"proc_id": "8710", "message": "It's time to make the do-nuts.",
  36. "appname": "myproc", "hostname": "192.0.2.1", "priority": int64(165),
  37. "version": int64(1)}
  38. for k, v := range attrMap {
  39. if _, ok := v.(string); ok {
  40. buffer.Attributes().PutStr(k, v.(string))
  41. } else {
  42. buffer.Attributes().PutInt(k, v.(int64))
  43. }
  44. }
  45. return buffer
  46. }
  47. func logRecordsToLogs(record plog.LogRecord) plog.Logs {
  48. logs := plog.NewLogs()
  49. logsSlice := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
  50. ls := logsSlice.AppendEmpty()
  51. record.CopyTo(ls)
  52. return logs
  53. }
  54. func createExporterCreateSettings() exporter.CreateSettings {
  55. return exporter.CreateSettings{
  56. TelemetrySettings: component.TelemetrySettings{
  57. Logger: zap.NewNop(),
  58. },
  59. }
  60. }
  61. func TestInitExporter(t *testing.T) {
  62. _, err := initExporter(&Config{Endpoint: "test.com",
  63. Network: "tcp",
  64. Port: 514,
  65. Protocol: "rfc5424"}, createExporterCreateSettings())
  66. assert.NoError(t, err)
  67. }
  68. func buildValidExporter(t *testing.T, server net.TCPListener, cfg *Config) (*syslogexporter, error) {
  69. var port string
  70. var err error
  71. hostPort := server.Addr().String()
  72. cfg.Endpoint, port, err = net.SplitHostPort(hostPort)
  73. require.NoError(t, err, "could not parse port")
  74. cfg.Port, err = strconv.Atoi(port)
  75. require.NoError(t, err, "type error")
  76. exp, err := initExporter(cfg, createExporterCreateSettings())
  77. require.NoError(t, err)
  78. return exp, err
  79. }
  80. func buildInvalidExporter(t *testing.T, server net.TCPListener, cfg *Config) (*syslogexporter, error) {
  81. var port string
  82. var err error
  83. hostPort := server.Addr().String()
  84. cfg.Endpoint, port, err = net.SplitHostPort(hostPort)
  85. require.NoError(t, err, "could not parse endpoint")
  86. require.NotNil(t, port)
  87. invalidPort := "112" // Assign invalid port
  88. cfg.Port, err = strconv.Atoi(invalidPort)
  89. require.NoError(t, err, "type error")
  90. exp, err := initExporter(cfg, createExporterCreateSettings())
  91. require.NoError(t, err)
  92. return exp, err
  93. }
  94. func createServer() (net.TCPListener, error) {
  95. var addr net.TCPAddr
  96. addr.IP = net.IP{127, 0, 0, 1}
  97. addr.Port = 0
  98. testServer, err := net.ListenTCP("tcp", &addr)
  99. return *testServer, err
  100. }
  101. func prepareExporterTest(t *testing.T, cfg *Config, invalidExporter bool) *exporterTest {
  102. // Start a test syslog server
  103. var err error
  104. testServer, err := createServer()
  105. require.NoError(t, err, "failed to start test syslog server")
  106. var exp *syslogexporter
  107. if invalidExporter {
  108. exp, err = buildInvalidExporter(t, testServer, cfg)
  109. } else {
  110. exp, err = buildValidExporter(t, testServer, cfg)
  111. }
  112. require.NoError(t, err, "Error building exporter")
  113. require.NotNil(t, exp)
  114. return &exporterTest{
  115. srv: testServer,
  116. exp: exp,
  117. }
  118. }
  119. func createTestConfig() *Config {
  120. config := createDefaultConfig().(*Config)
  121. config.Network = "tcp"
  122. config.TLSSetting.Insecure = true
  123. return config
  124. }
  125. func TestSyslogExportSuccess(t *testing.T) {
  126. test := prepareExporterTest(t, createTestConfig(), false)
  127. require.NotNil(t, test.exp)
  128. defer test.srv.Close()
  129. go func() {
  130. buffer := exampleLog(t)
  131. logs := logRecordsToLogs(buffer)
  132. err := test.exp.pushLogsData(context.Background(), logs)
  133. require.NoError(t, err, "could not send message")
  134. }()
  135. err := test.srv.SetDeadline(time.Now().Add(time.Second * 1))
  136. require.NoError(t, err, "cannot set deadline")
  137. conn, err := test.srv.AcceptTCP()
  138. require.NoError(t, err, "could not accept connection")
  139. defer conn.Close()
  140. b, err := io.ReadAll(conn)
  141. require.NoError(t, err, "could not read all")
  142. assert.Equal(t, string(b), expectedForm)
  143. }
  144. func TestSyslogExportFail(t *testing.T) {
  145. test := prepareExporterTest(t, createTestConfig(), true)
  146. defer test.srv.Close()
  147. buffer := exampleLog(t)
  148. logs := logRecordsToLogs(buffer)
  149. consumerErr := test.exp.pushLogsData(context.Background(), logs)
  150. var consumerErrorLogs consumererror.Logs
  151. ok := errors.As(consumerErr, &consumerErrorLogs)
  152. assert.Equal(t, ok, true)
  153. consumerLogs := consumererror.Logs.Data(consumerErrorLogs)
  154. rls := consumerLogs.ResourceLogs()
  155. require.Equal(t, 1, rls.Len())
  156. scl := rls.At(0).ScopeLogs()
  157. require.Equal(t, 1, scl.Len())
  158. lrs := scl.At(0).LogRecords()
  159. require.Equal(t, 1, lrs.Len())
  160. droppedLog := lrs.At(0).Body().AsString()
  161. err := test.srv.SetDeadline(time.Now().Add(time.Second * 1))
  162. require.NoError(t, err, "cannot set deadline")
  163. conn, err := test.srv.AcceptTCP()
  164. require.ErrorContains(t, err, "i/o timeout")
  165. require.Nil(t, conn)
  166. assert.ErrorContains(t, consumerErr, "dial tcp 127.0.0.1:112: connect")
  167. assert.Equal(t, droppedLog, originalForm)
  168. }