storage_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package filelogreceiver
  4. import (
  5. "context"
  6. "fmt"
  7. "log"
  8. "os"
  9. "path/filepath"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/require"
  13. "go.opentelemetry.io/collector/consumer/consumertest"
  14. "go.opentelemetry.io/collector/receiver/receivertest"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
  16. )
  17. func TestStorage(t *testing.T) {
  18. t.Parallel()
  19. const baseLog = "This is a simple log line with the number %3d"
  20. ctx := context.Background()
  21. logsDir := t.TempDir()
  22. storageDir := t.TempDir()
  23. extID := storagetest.NewFileBackedStorageExtension("test", storageDir).ID
  24. f := NewFactory()
  25. cfg := rotationTestConfig(logsDir)
  26. cfg.Operators = nil // not testing processing, just read the lines
  27. cfg.StorageID = &extID
  28. logger := newRecallLogger(t, logsDir)
  29. ext := storagetest.NewFileBackedStorageExtension("test", storageDir)
  30. host := storagetest.NewStorageHost().WithExtension(ext.ID, ext)
  31. sink := new(consumertest.LogsSink)
  32. rcvr, err := f.CreateLogsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, sink)
  33. require.NoError(t, err, "failed to create receiver")
  34. require.NoError(t, rcvr.Start(ctx, host))
  35. // Write 2 logs
  36. logger.log(fmt.Sprintf(baseLog, 0))
  37. logger.log(fmt.Sprintf(baseLog, 1))
  38. // Expect them now, since the receiver is running
  39. require.Eventually(t,
  40. expectLogs(sink, logger.recall()),
  41. 5*time.Second,
  42. 10*time.Millisecond,
  43. "expected 2 but got %d logs",
  44. sink.LogRecordCount(),
  45. )
  46. // Shut down the components
  47. require.NoError(t, rcvr.Shutdown(ctx))
  48. for _, e := range host.GetExtensions() {
  49. require.NoError(t, e.Shutdown(ctx))
  50. }
  51. // Write 3 more logs while the collector is not running
  52. logger.log(fmt.Sprintf(baseLog, 2))
  53. logger.log(fmt.Sprintf(baseLog, 3))
  54. logger.log(fmt.Sprintf(baseLog, 4))
  55. // Start the components again
  56. ext = storagetest.NewFileBackedStorageExtension("test", storageDir)
  57. host = storagetest.NewStorageHost().WithExtension(ext.ID, ext)
  58. rcvr, err = f.CreateLogsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, sink)
  59. require.NoError(t, err, "failed to create receiver")
  60. require.NoError(t, rcvr.Start(ctx, host))
  61. sink.Reset()
  62. // Expect only the new 3
  63. require.Eventually(t,
  64. expectLogs(sink, logger.recall()),
  65. time.Second,
  66. 10*time.Millisecond,
  67. "expected 3 but got %d logs",
  68. sink.LogRecordCount(),
  69. )
  70. sink.Reset()
  71. // Write 100 more, to ensure we're past the fingerprint size
  72. for i := 100; i < 200; i++ {
  73. logger.log(fmt.Sprintf(baseLog, i))
  74. }
  75. // Expect the new 100
  76. require.Eventually(t,
  77. expectLogs(sink, logger.recall()),
  78. time.Second,
  79. 10*time.Millisecond,
  80. "expected 100 but got %d logs",
  81. sink.LogRecordCount(),
  82. )
  83. // Shut down the components
  84. require.NoError(t, rcvr.Shutdown(ctx))
  85. for _, e := range host.GetExtensions() {
  86. require.NoError(t, e.Shutdown(ctx))
  87. }
  88. // Write 5 more logs while the collector is not running
  89. logger.log(fmt.Sprintf(baseLog, 5))
  90. logger.log(fmt.Sprintf(baseLog, 6))
  91. logger.log(fmt.Sprintf(baseLog, 7))
  92. logger.log(fmt.Sprintf(baseLog, 8))
  93. logger.log(fmt.Sprintf(baseLog, 9))
  94. // Start the components again
  95. ext = storagetest.NewFileBackedStorageExtension("test", storageDir)
  96. host = storagetest.NewStorageHost().WithExtension(ext.ID, ext)
  97. rcvr, err = f.CreateLogsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, sink)
  98. require.NoError(t, err, "failed to create receiver")
  99. require.NoError(t, rcvr.Start(ctx, host))
  100. sink.Reset()
  101. // Expect only the new 5
  102. require.Eventually(t,
  103. expectLogs(sink, logger.recall()),
  104. time.Second,
  105. 10*time.Millisecond,
  106. "expected 5 but got %d logs",
  107. sink.LogRecordCount(),
  108. )
  109. // Shut down the components
  110. require.NoError(t, rcvr.Shutdown(ctx))
  111. for _, e := range host.GetExtensions() {
  112. require.NoError(t, e.Shutdown(ctx))
  113. }
  114. require.NoError(t, logger.close())
  115. }
  116. type recallLogger struct {
  117. logFile *os.File
  118. *log.Logger
  119. written []string
  120. }
  121. func newRecallLogger(t *testing.T, tempDir string) *recallLogger {
  122. path := filepath.Join(tempDir, "test.log")
  123. logFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
  124. require.NoError(t, err)
  125. return &recallLogger{
  126. logFile: logFile,
  127. Logger: log.New(logFile, "", 0),
  128. written: []string{},
  129. }
  130. }
  131. func (l *recallLogger) log(s string) {
  132. l.written = append(l.written, s)
  133. l.Logger.Println(s)
  134. }
  135. func (l *recallLogger) recall() []string {
  136. defer func() { l.written = []string{} }()
  137. return l.written
  138. }
  139. func (l *recallLogger) close() error {
  140. return l.logFile.Close()
  141. }
  142. func expectLogs(sink *consumertest.LogsSink, expected []string) func() bool {
  143. return func() bool {
  144. if sink.LogRecordCount() != len(expected) {
  145. return false
  146. }
  147. found := make(map[string]bool)
  148. for _, e := range expected {
  149. found[e] = false
  150. }
  151. for _, logs := range sink.AllLogs() {
  152. rl := logs.ResourceLogs()
  153. for i := 0; i < rl.Len(); i++ {
  154. sl := rl.At(i).ScopeLogs()
  155. for j := 0; j < sl.Len(); j++ {
  156. lrs := sl.At(j).LogRecords()
  157. for k := 0; k < lrs.Len(); k++ {
  158. body := lrs.At(k).Body().Str()
  159. found[body] = true
  160. }
  161. }
  162. }
  163. }
  164. for _, v := range found {
  165. if !v {
  166. return false
  167. }
  168. }
  169. return true
  170. }
  171. }