123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package filelogreceiver
- import (
- "context"
- "fmt"
- "log"
- "os"
- "path/filepath"
- "testing"
- "time"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/receiver/receivertest"
- "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
- )
- func TestStorage(t *testing.T) {
- t.Parallel()
- const baseLog = "This is a simple log line with the number %3d"
- ctx := context.Background()
- logsDir := t.TempDir()
- storageDir := t.TempDir()
- extID := storagetest.NewFileBackedStorageExtension("test", storageDir).ID
- f := NewFactory()
- cfg := rotationTestConfig(logsDir)
- cfg.Operators = nil // not testing processing, just read the lines
- cfg.StorageID = &extID
- logger := newRecallLogger(t, logsDir)
- ext := storagetest.NewFileBackedStorageExtension("test", storageDir)
- host := storagetest.NewStorageHost().WithExtension(ext.ID, ext)
- sink := new(consumertest.LogsSink)
- rcvr, err := f.CreateLogsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, sink)
- require.NoError(t, err, "failed to create receiver")
- require.NoError(t, rcvr.Start(ctx, host))
- // Write 2 logs
- logger.log(fmt.Sprintf(baseLog, 0))
- logger.log(fmt.Sprintf(baseLog, 1))
- // Expect them now, since the receiver is running
- require.Eventually(t,
- expectLogs(sink, logger.recall()),
- 5*time.Second,
- 10*time.Millisecond,
- "expected 2 but got %d logs",
- sink.LogRecordCount(),
- )
- // Shut down the components
- require.NoError(t, rcvr.Shutdown(ctx))
- for _, e := range host.GetExtensions() {
- require.NoError(t, e.Shutdown(ctx))
- }
- // Write 3 more logs while the collector is not running
- logger.log(fmt.Sprintf(baseLog, 2))
- logger.log(fmt.Sprintf(baseLog, 3))
- logger.log(fmt.Sprintf(baseLog, 4))
- // Start the components again
- ext = storagetest.NewFileBackedStorageExtension("test", storageDir)
- host = storagetest.NewStorageHost().WithExtension(ext.ID, ext)
- rcvr, err = f.CreateLogsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, sink)
- require.NoError(t, err, "failed to create receiver")
- require.NoError(t, rcvr.Start(ctx, host))
- sink.Reset()
- // Expect only the new 3
- require.Eventually(t,
- expectLogs(sink, logger.recall()),
- time.Second,
- 10*time.Millisecond,
- "expected 3 but got %d logs",
- sink.LogRecordCount(),
- )
- sink.Reset()
- // Write 100 more, to ensure we're past the fingerprint size
- for i := 100; i < 200; i++ {
- logger.log(fmt.Sprintf(baseLog, i))
- }
- // Expect the new 100
- require.Eventually(t,
- expectLogs(sink, logger.recall()),
- time.Second,
- 10*time.Millisecond,
- "expected 100 but got %d logs",
- sink.LogRecordCount(),
- )
- // Shut down the components
- require.NoError(t, rcvr.Shutdown(ctx))
- for _, e := range host.GetExtensions() {
- require.NoError(t, e.Shutdown(ctx))
- }
- // Write 5 more logs while the collector is not running
- logger.log(fmt.Sprintf(baseLog, 5))
- logger.log(fmt.Sprintf(baseLog, 6))
- logger.log(fmt.Sprintf(baseLog, 7))
- logger.log(fmt.Sprintf(baseLog, 8))
- logger.log(fmt.Sprintf(baseLog, 9))
- // Start the components again
- ext = storagetest.NewFileBackedStorageExtension("test", storageDir)
- host = storagetest.NewStorageHost().WithExtension(ext.ID, ext)
- rcvr, err = f.CreateLogsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, sink)
- require.NoError(t, err, "failed to create receiver")
- require.NoError(t, rcvr.Start(ctx, host))
- sink.Reset()
- // Expect only the new 5
- require.Eventually(t,
- expectLogs(sink, logger.recall()),
- time.Second,
- 10*time.Millisecond,
- "expected 5 but got %d logs",
- sink.LogRecordCount(),
- )
- // Shut down the components
- require.NoError(t, rcvr.Shutdown(ctx))
- for _, e := range host.GetExtensions() {
- require.NoError(t, e.Shutdown(ctx))
- }
- require.NoError(t, logger.close())
- }
- type recallLogger struct {
- logFile *os.File
- *log.Logger
- written []string
- }
- func newRecallLogger(t *testing.T, tempDir string) *recallLogger {
- path := filepath.Join(tempDir, "test.log")
- logFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
- require.NoError(t, err)
- return &recallLogger{
- logFile: logFile,
- Logger: log.New(logFile, "", 0),
- written: []string{},
- }
- }
- func (l *recallLogger) log(s string) {
- l.written = append(l.written, s)
- l.Logger.Println(s)
- }
- func (l *recallLogger) recall() []string {
- defer func() { l.written = []string{} }()
- return l.written
- }
- func (l *recallLogger) close() error {
- return l.logFile.Close()
- }
- func expectLogs(sink *consumertest.LogsSink, expected []string) func() bool {
- return func() bool {
- if sink.LogRecordCount() != len(expected) {
- return false
- }
- found := make(map[string]bool)
- for _, e := range expected {
- found[e] = false
- }
- for _, logs := range sink.AllLogs() {
- rl := logs.ResourceLogs()
- for i := 0; i < rl.Len(); i++ {
- sl := rl.At(i).ScopeLogs()
- for j := 0; j < sl.Len(); j++ {
- lrs := sl.At(j).LogRecords()
- for k := 0; k < lrs.Len(); k++ {
- body := lrs.At(k).Body().Str()
- found[body] = true
- }
- }
- }
- }
- for _, v := range found {
- if !v {
- return false
- }
- }
- return true
- }
- }
|