stanza.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package datasenders // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datasenders"
  4. import (
  5. "context"
  6. "fmt"
  7. "net"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "go.opentelemetry.io/collector/consumer"
  13. "go.opentelemetry.io/collector/pdata/pcommon"
  14. "go.opentelemetry.io/collector/pdata/plog"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
  16. )
  17. type FileLogWriter struct {
  18. file *os.File
  19. }
  20. // Ensure FileLogWriter implements LogDataSender.
  21. var _ testbed.LogDataSender = (*FileLogWriter)(nil)
  22. // NewFileLogWriter creates a new data sender that will write log entries to a
  23. // file, to be tailed by FluentBit and sent to the collector.
  24. func NewFileLogWriter() *FileLogWriter {
  25. file, err := os.CreateTemp("", "perf-logs.log")
  26. if err != nil {
  27. panic("failed to create temp file")
  28. }
  29. f := &FileLogWriter{
  30. file: file,
  31. }
  32. return f
  33. }
  34. func (f *FileLogWriter) Capabilities() consumer.Capabilities {
  35. return consumer.Capabilities{MutatesData: false}
  36. }
  37. func (f *FileLogWriter) Start() error {
  38. return nil
  39. }
  40. func (f *FileLogWriter) ConsumeLogs(_ context.Context, logs plog.Logs) error {
  41. for i := 0; i < logs.ResourceLogs().Len(); i++ {
  42. for j := 0; j < logs.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
  43. ills := logs.ResourceLogs().At(i).ScopeLogs().At(j)
  44. for k := 0; k < ills.LogRecords().Len(); k++ {
  45. _, err := f.file.Write(append(f.convertLogToTextLine(ills.LogRecords().At(k)), '\n'))
  46. if err != nil {
  47. return err
  48. }
  49. }
  50. }
  51. }
  52. return nil
  53. }
  54. func (f *FileLogWriter) convertLogToTextLine(lr plog.LogRecord) []byte {
  55. sb := strings.Builder{}
  56. // Timestamp
  57. sb.WriteString(time.Unix(0, int64(lr.Timestamp())).Format("2006-01-02"))
  58. // Severity
  59. sb.WriteString(" ")
  60. sb.WriteString(lr.SeverityText())
  61. sb.WriteString(" ")
  62. if lr.Body().Type() == pcommon.ValueTypeStr {
  63. sb.WriteString(lr.Body().Str())
  64. }
  65. lr.Attributes().Range(func(k string, v pcommon.Value) bool {
  66. sb.WriteString(" ")
  67. sb.WriteString(k)
  68. sb.WriteString("=")
  69. switch v.Type() {
  70. case pcommon.ValueTypeStr:
  71. sb.WriteString(v.Str())
  72. case pcommon.ValueTypeInt:
  73. sb.WriteString(strconv.FormatInt(v.Int(), 10))
  74. case pcommon.ValueTypeDouble:
  75. sb.WriteString(strconv.FormatFloat(v.Double(), 'f', -1, 64))
  76. case pcommon.ValueTypeBool:
  77. sb.WriteString(strconv.FormatBool(v.Bool()))
  78. default:
  79. panic("missing case")
  80. }
  81. return true
  82. })
  83. return []byte(sb.String())
  84. }
  85. func (f *FileLogWriter) Flush() {
  86. _ = f.file.Sync()
  87. }
  88. func (f *FileLogWriter) GenConfigYAMLStr() string {
  89. // Note that this generates a receiver config for agent.
  90. // We are testing stanza receiver here.
  91. return fmt.Sprintf(`
  92. filelog:
  93. include: [ %s ]
  94. start_at: beginning
  95. operators:
  96. - type: regex_parser
  97. regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z0-9]*) (?P<msg>.*)$'
  98. timestamp:
  99. parse_from: body.time
  100. layout: '%%Y-%%m-%%d'
  101. severity:
  102. parse_from: body.sev
  103. `, f.file.Name())
  104. }
  105. func (f *FileLogWriter) ProtocolName() string {
  106. return "filelog"
  107. }
  108. func (f *FileLogWriter) GetEndpoint() net.Addr {
  109. return nil
  110. }
  111. func NewLocalFileStorageExtension() map[string]string {
  112. tempDir, err := os.MkdirTemp("", "")
  113. if err != nil {
  114. panic("failed to create temp storage dir")
  115. }
  116. return map[string]string{
  117. "file_storage": fmt.Sprintf(`
  118. file_storage:
  119. directory: %s
  120. `, tempDir),
  121. }
  122. }