syslog_integration_test.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package tests // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/tests"
  4. import (
  5. "fmt"
  6. "net"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. "go.opentelemetry.io/collector/otelcol"
  12. "go.opentelemetry.io/collector/pdata/pcommon"
  13. "go.opentelemetry.io/collector/pdata/plog"
  14. "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datareceivers"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
  16. )
  17. type expectedDataType struct {
  18. message string
  19. severityNumber plog.SeverityNumber
  20. severityText string
  21. timestamp pcommon.Timestamp
  22. attributes map[string]any
  23. }
  24. func TestSyslogComplementaryRFC5424(t *testing.T) {
  25. expectedData := []expectedDataType{
  26. {
  27. message: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com eventslog - ID47 [exampleSDID@32473 iut=\"3\"] Some message",
  28. severityNumber: 10,
  29. severityText: "notice",
  30. timestamp: 1065910455003000000,
  31. attributes: map[string]any{
  32. "message": "Some message",
  33. "msg_id": "ID47",
  34. "structured_data": map[string]any{
  35. "exampleSDID@32473": map[string]any{
  36. "iut": "3",
  37. },
  38. },
  39. "hostname": "mymachine.example.com",
  40. "appname": "eventslog",
  41. "priority": int64(165),
  42. "version": int64(1),
  43. "facility": int64(20),
  44. },
  45. },
  46. {
  47. message: "<17>3 2003-10-11T22:14:15.008Z - - - - -",
  48. severityNumber: 19,
  49. severityText: "alert",
  50. timestamp: 1065910455008000000,
  51. attributes: map[string]any{
  52. "priority": int64(17),
  53. "version": int64(3),
  54. "facility": int64(2),
  55. },
  56. },
  57. }
  58. complementaryTest(t, "rfc5424", expectedData)
  59. }
  60. func TestSyslogComplementaryRFC3164(t *testing.T) {
  61. expectedData := []expectedDataType{
  62. {
  63. message: "<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8",
  64. timestamp: 1697062455000000000,
  65. severityNumber: 18,
  66. severityText: "crit",
  67. attributes: map[string]any{
  68. "message": "'su root' failed for lonvick on /dev/pts/8",
  69. "hostname": "mymachine",
  70. "appname": "su",
  71. "priority": int64(34),
  72. "facility": int64(4),
  73. },
  74. },
  75. {
  76. message: "<19>Oct 11 22:14:15 - -",
  77. timestamp: 1697062455000000000,
  78. severityNumber: 17,
  79. severityText: "err",
  80. attributes: map[string]any{
  81. "message": "-",
  82. "priority": int64(19),
  83. "facility": int64(2),
  84. },
  85. },
  86. }
  87. complementaryTest(t, "rfc3164", expectedData)
  88. }
  89. func componentFactories(t *testing.T) otelcol.Factories {
  90. factories, err := testbed.Components()
  91. require.NoError(t, err)
  92. return factories
  93. }
  94. func complementaryTest(t *testing.T, rfc string, expectedData []expectedDataType) {
  95. // Prepare ports
  96. port := testbed.GetAvailablePort(t)
  97. inputPort := testbed.GetAvailablePort(t)
  98. // Start SyslogDataReceiver
  99. syslogReceiver := datareceivers.NewSyslogDataReceiver(rfc, port)
  100. backend := testbed.NewMockBackend("mockbackend.log", syslogReceiver)
  101. require.NoError(t, backend.Start())
  102. backend.EnableRecording()
  103. // Prepare and run collector
  104. config := `
  105. receivers:
  106. syslog/client:
  107. protocol: %s
  108. tcp:
  109. listen_address: '127.0.0.1:%d'
  110. exporters:
  111. syslog/client:
  112. endpoint: 127.0.0.1
  113. network: tcp
  114. protocol: %s
  115. port: %d
  116. tls:
  117. insecure: true
  118. service:
  119. pipelines:
  120. logs/client:
  121. receivers:
  122. - syslog/client
  123. exporters:
  124. - syslog/client`
  125. collector := testbed.NewInProcessCollector(componentFactories(t))
  126. _, err := collector.PrepareConfig(fmt.Sprintf(config, rfc, inputPort, rfc, port))
  127. require.NoError(t, err)
  128. err = collector.Start(testbed.StartParams{
  129. Name: "Agent",
  130. })
  131. require.NoError(t, err)
  132. t.Cleanup(func() {
  133. stopped, e := collector.Stop()
  134. require.NoError(t, e)
  135. require.True(t, stopped)
  136. })
  137. // prepare data
  138. message := ""
  139. expectedAttributes := []map[string]any{}
  140. expectedLogs := plog.NewLogs()
  141. rl := expectedLogs.ResourceLogs().AppendEmpty()
  142. lrs := rl.ScopeLogs().AppendEmpty().LogRecords()
  143. for _, e := range expectedData {
  144. lr := lrs.AppendEmpty()
  145. lr.Body().SetStr(e.message)
  146. lr.SetSeverityNumber(e.severityNumber)
  147. lr.SetSeverityText(e.severityText)
  148. lr.SetTimestamp(e.timestamp)
  149. expectedAttributes = append(expectedAttributes, e.attributes)
  150. message += e.message + "\n"
  151. }
  152. // Prepare client
  153. conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", inputPort))
  154. require.NoError(t, err)
  155. // Write requests
  156. fmt.Fprint(conn, message)
  157. // Wait for all messages
  158. for len(backend.ReceivedLogs) < 1 {
  159. time.Sleep(100 * time.Millisecond)
  160. }
  161. require.Equal(t, len(backend.ReceivedLogs), 1)
  162. require.Equal(t, backend.ReceivedLogs[0].ResourceLogs().Len(), 1)
  163. require.Equal(t, backend.ReceivedLogs[0].ResourceLogs().At(0).ScopeLogs().Len(), 1)
  164. require.Equal(t, backend.ReceivedLogs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len(), len(expectedData))
  165. // Clean received logs
  166. attributes := []map[string]any{}
  167. lrs = backend.ReceivedLogs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
  168. for i := 0; i < lrs.Len(); i++ {
  169. lrs.At(i).SetObservedTimestamp(0)
  170. attributes = append(attributes, lrs.At(i).Attributes().AsRaw())
  171. lrs.At(i).Attributes().Clear()
  172. }
  173. // Assert
  174. assert.Equal(t, expectedLogs, backend.ReceivedLogs[0])
  175. assert.Equal(t, expectedAttributes, attributes)
  176. }