fluent.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package datasenders // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datasenders"
  4. import (
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "os"
  9. "strconv"
  10. "testing"
  11. "time"
  12. "github.com/fluent/fluent-logger-golang/fluent"
  13. "github.com/stretchr/testify/require"
  14. "go.opentelemetry.io/collector/consumer"
  15. "go.opentelemetry.io/collector/pdata/pcommon"
  16. "go.opentelemetry.io/collector/pdata/plog"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
  18. )
  19. const (
  20. fluentDatafileVar = "FLUENT_DATA_SENDER_DATA_FILE"
  21. fluentPortVar = "FLUENT_DATA_SENDER_RECEIVER_PORT"
  22. )
  23. // FluentLogsForwarder forwards logs to fluent forwader
  24. type FluentLogsForwarder struct {
  25. testbed.DataSenderBase
  26. fluentLogger *fluent.Fluent
  27. dataFile *os.File
  28. }
  29. // Ensure FluentLogsForwarder implements LogDataSender.
  30. var _ testbed.LogDataSender = (*FluentLogsForwarder)(nil)
  31. func NewFluentLogsForwarder(t *testing.T, port int) *FluentLogsForwarder {
  32. var err error
  33. portOverride := os.Getenv(fluentPortVar)
  34. if portOverride != "" {
  35. port, err = strconv.Atoi(portOverride)
  36. require.NoError(t, err)
  37. }
  38. f := &FluentLogsForwarder{DataSenderBase: testbed.DataSenderBase{Port: port}}
  39. // When FLUENT_DATA_SENDER_DATA_FILE is set, the data sender, writes to a
  40. // file. This enables users to optionally run the e2e test against a real
  41. // fluentd/fluentbit agent rather than using the fluent writer the data sender
  42. // uses by default. In case, one is looking to point a real fluentd/fluentbit agent
  43. // to the e2e test, they can do so by configuring the fluent agent to read from the
  44. // file FLUENT_DATA_SENDER_DATA_FILE and forward data to FLUENT_DATA_SENDER_RECEIVER_PORT
  45. // on 127.0.0.1.
  46. if dataFileName := os.Getenv(fluentDatafileVar); dataFileName != "" {
  47. f.dataFile, err = os.OpenFile(dataFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
  48. require.NoError(t, err)
  49. } else {
  50. logger, err := fluent.New(fluent.Config{FluentPort: port, Async: true})
  51. require.NoError(t, err)
  52. f.fluentLogger = logger
  53. }
  54. return f
  55. }
  56. func (f *FluentLogsForwarder) Capabilities() consumer.Capabilities {
  57. return consumer.Capabilities{MutatesData: false}
  58. }
  59. func (f *FluentLogsForwarder) Start() error {
  60. return nil
  61. }
  62. func (f *FluentLogsForwarder) Stop() error {
  63. return f.fluentLogger.Close()
  64. }
  65. func (f *FluentLogsForwarder) ConsumeLogs(_ context.Context, logs plog.Logs) error {
  66. for i := 0; i < logs.ResourceLogs().Len(); i++ {
  67. for j := 0; j < logs.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
  68. ills := logs.ResourceLogs().At(i).ScopeLogs().At(j)
  69. for k := 0; k < ills.LogRecords().Len(); k++ {
  70. if f.dataFile == nil {
  71. if err := f.fluentLogger.Post("", f.convertLogToMap(ills.LogRecords().At(k))); err != nil {
  72. return err
  73. }
  74. } else {
  75. if _, err := f.dataFile.Write(append(f.convertLogToJSON(ills.LogRecords().At(k)), '\n')); err != nil {
  76. return err
  77. }
  78. }
  79. }
  80. }
  81. }
  82. return nil
  83. }
  84. func (f *FluentLogsForwarder) convertLogToMap(lr plog.LogRecord) map[string]string {
  85. out := map[string]string{}
  86. if lr.Body().Type() == pcommon.ValueTypeStr {
  87. out["log"] = lr.Body().Str()
  88. }
  89. lr.Attributes().Range(func(k string, v pcommon.Value) bool {
  90. switch v.Type() {
  91. case pcommon.ValueTypeStr:
  92. out[k] = v.Str()
  93. case pcommon.ValueTypeInt:
  94. out[k] = strconv.FormatInt(v.Int(), 10)
  95. case pcommon.ValueTypeDouble:
  96. out[k] = strconv.FormatFloat(v.Double(), 'f', -1, 64)
  97. case pcommon.ValueTypeBool:
  98. out[k] = strconv.FormatBool(v.Bool())
  99. default:
  100. panic("missing case")
  101. }
  102. return true
  103. })
  104. return out
  105. }
  106. func (f *FluentLogsForwarder) convertLogToJSON(lr plog.LogRecord) []byte {
  107. rec := map[string]string{
  108. "time": time.Unix(0, int64(lr.Timestamp())).Format("02/01/2006:15:04:05Z"),
  109. }
  110. rec["log"] = lr.Body().Str()
  111. lr.Attributes().Range(func(k string, v pcommon.Value) bool {
  112. switch v.Type() {
  113. case pcommon.ValueTypeStr:
  114. rec[k] = v.Str()
  115. case pcommon.ValueTypeInt:
  116. rec[k] = strconv.FormatInt(v.Int(), 10)
  117. case pcommon.ValueTypeDouble:
  118. rec[k] = strconv.FormatFloat(v.Double(), 'f', -1, 64)
  119. case pcommon.ValueTypeBool:
  120. rec[k] = strconv.FormatBool(v.Bool())
  121. default:
  122. panic("missing case")
  123. }
  124. return true
  125. })
  126. b, err := json.Marshal(rec)
  127. if err != nil {
  128. panic("failed to write log: " + err.Error())
  129. }
  130. return b
  131. }
  132. func (f *FluentLogsForwarder) Flush() {
  133. _ = f.dataFile.Sync()
  134. }
  135. func (f *FluentLogsForwarder) GenConfigYAMLStr() string {
  136. return fmt.Sprintf(`
  137. fluentforward:
  138. endpoint: 127.0.0.1:%d`, f.Port)
  139. }
  140. func (f *FluentLogsForwarder) ProtocolName() string {
  141. return "fluentforward"
  142. }