123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package datasenders // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datasenders"
- import (
- "context"
- "fmt"
- "net"
- "os"
- "strconv"
- "strings"
- "time"
- "go.opentelemetry.io/collector/consumer"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/plog"
- "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
- )
- type FileLogWriter struct {
- file *os.File
- }
- // Ensure FileLogWriter implements LogDataSender.
- var _ testbed.LogDataSender = (*FileLogWriter)(nil)
- // NewFileLogWriter creates a new data sender that will write log entries to a
- // file, to be tailed by FluentBit and sent to the collector.
- func NewFileLogWriter() *FileLogWriter {
- file, err := os.CreateTemp("", "perf-logs.log")
- if err != nil {
- panic("failed to create temp file")
- }
- f := &FileLogWriter{
- file: file,
- }
- return f
- }
- func (f *FileLogWriter) Capabilities() consumer.Capabilities {
- return consumer.Capabilities{MutatesData: false}
- }
- func (f *FileLogWriter) Start() error {
- return nil
- }
- func (f *FileLogWriter) ConsumeLogs(_ context.Context, logs plog.Logs) error {
- for i := 0; i < logs.ResourceLogs().Len(); i++ {
- for j := 0; j < logs.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
- ills := logs.ResourceLogs().At(i).ScopeLogs().At(j)
- for k := 0; k < ills.LogRecords().Len(); k++ {
- _, err := f.file.Write(append(f.convertLogToTextLine(ills.LogRecords().At(k)), '\n'))
- if err != nil {
- return err
- }
- }
- }
- }
- return nil
- }
- func (f *FileLogWriter) convertLogToTextLine(lr plog.LogRecord) []byte {
- sb := strings.Builder{}
- // Timestamp
- sb.WriteString(time.Unix(0, int64(lr.Timestamp())).Format("2006-01-02"))
- // Severity
- sb.WriteString(" ")
- sb.WriteString(lr.SeverityText())
- sb.WriteString(" ")
- if lr.Body().Type() == pcommon.ValueTypeStr {
- sb.WriteString(lr.Body().Str())
- }
- lr.Attributes().Range(func(k string, v pcommon.Value) bool {
- sb.WriteString(" ")
- sb.WriteString(k)
- sb.WriteString("=")
- switch v.Type() {
- case pcommon.ValueTypeStr:
- sb.WriteString(v.Str())
- case pcommon.ValueTypeInt:
- sb.WriteString(strconv.FormatInt(v.Int(), 10))
- case pcommon.ValueTypeDouble:
- sb.WriteString(strconv.FormatFloat(v.Double(), 'f', -1, 64))
- case pcommon.ValueTypeBool:
- sb.WriteString(strconv.FormatBool(v.Bool()))
- default:
- panic("missing case")
- }
- return true
- })
- return []byte(sb.String())
- }
- func (f *FileLogWriter) Flush() {
- _ = f.file.Sync()
- }
- func (f *FileLogWriter) GenConfigYAMLStr() string {
- // Note that this generates a receiver config for agent.
- // We are testing stanza receiver here.
- return fmt.Sprintf(`
- filelog:
- include: [ %s ]
- start_at: beginning
- operators:
- - type: regex_parser
- regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z0-9]*) (?P<msg>.*)$'
- timestamp:
- parse_from: body.time
- layout: '%%Y-%%m-%%d'
- severity:
- parse_from: body.sev
- `, f.file.Name())
- }
- func (f *FileLogWriter) ProtocolName() string {
- return "filelog"
- }
- func (f *FileLogWriter) GetEndpoint() net.Addr {
- return nil
- }
- func NewLocalFileStorageExtension() map[string]string {
- tempDir, err := os.MkdirTemp("", "")
- if err != nil {
- panic("failed to create temp storage dir")
- }
- return map[string]string{
- "file_storage": fmt.Sprintf(`
- file_storage:
- directory: %s
- `, tempDir),
- }
- }
|