file_exporter.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"
  4. import (
  5. "context"
  6. "encoding/binary"
  7. "io"
  8. "sync"
  9. "time"
  10. "go.opentelemetry.io/collector/component"
  11. "go.opentelemetry.io/collector/pdata/plog"
  12. "go.opentelemetry.io/collector/pdata/pmetric"
  13. "go.opentelemetry.io/collector/pdata/ptrace"
  14. )
  15. // Marshaler configuration used for marhsaling Protobuf
  16. var tracesMarshalers = map[string]ptrace.Marshaler{
  17. formatTypeJSON: &ptrace.JSONMarshaler{},
  18. formatTypeProto: &ptrace.ProtoMarshaler{},
  19. }
  20. var metricsMarshalers = map[string]pmetric.Marshaler{
  21. formatTypeJSON: &pmetric.JSONMarshaler{},
  22. formatTypeProto: &pmetric.ProtoMarshaler{},
  23. }
  24. var logsMarshalers = map[string]plog.Marshaler{
  25. formatTypeJSON: &plog.JSONMarshaler{},
  26. formatTypeProto: &plog.ProtoMarshaler{},
  27. }
  28. // exportFunc defines how to export encoded telemetry data.
  29. type exportFunc func(e *fileExporter, buf []byte) error
  30. // fileExporter is the implementation of file exporter that writes telemetry data to a file
  31. type fileExporter struct {
  32. path string
  33. file io.WriteCloser
  34. mutex sync.Mutex
  35. tracesMarshaler ptrace.Marshaler
  36. metricsMarshaler pmetric.Marshaler
  37. logsMarshaler plog.Marshaler
  38. compression string
  39. compressor compressFunc
  40. formatType string
  41. exporter exportFunc
  42. flushInterval time.Duration
  43. flushTicker *time.Ticker
  44. stopTicker chan struct{}
  45. }
  46. func (e *fileExporter) consumeTraces(_ context.Context, td ptrace.Traces) error {
  47. buf, err := e.tracesMarshaler.MarshalTraces(td)
  48. if err != nil {
  49. return err
  50. }
  51. buf = e.compressor(buf)
  52. return e.exporter(e, buf)
  53. }
  54. func (e *fileExporter) consumeMetrics(_ context.Context, md pmetric.Metrics) error {
  55. buf, err := e.metricsMarshaler.MarshalMetrics(md)
  56. if err != nil {
  57. return err
  58. }
  59. buf = e.compressor(buf)
  60. return e.exporter(e, buf)
  61. }
  62. func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
  63. buf, err := e.logsMarshaler.MarshalLogs(ld)
  64. if err != nil {
  65. return err
  66. }
  67. buf = e.compressor(buf)
  68. return e.exporter(e, buf)
  69. }
  70. func exportMessageAsLine(e *fileExporter, buf []byte) error {
  71. // Ensure only one write operation happens at a time.
  72. e.mutex.Lock()
  73. defer e.mutex.Unlock()
  74. if _, err := e.file.Write(buf); err != nil {
  75. return err
  76. }
  77. if _, err := io.WriteString(e.file, "\n"); err != nil {
  78. return err
  79. }
  80. return nil
  81. }
  82. func exportMessageAsBuffer(e *fileExporter, buf []byte) error {
  83. // Ensure only one write operation happens at a time.
  84. e.mutex.Lock()
  85. defer e.mutex.Unlock()
  86. // write the size of each message before writing the message itself. https://developers.google.com/protocol-buffers/docs/techniques
  87. // each encoded object is preceded by 4 bytes (an unsigned 32 bit integer)
  88. data := make([]byte, 4, 4+len(buf))
  89. binary.BigEndian.PutUint32(data, uint32(len(buf)))
  90. return binary.Write(e.file, binary.BigEndian, append(data, buf...))
  91. }
  92. // startFlusher starts the flusher.
  93. // It does not check the flushInterval
  94. func (e *fileExporter) startFlusher() {
  95. e.mutex.Lock()
  96. defer e.mutex.Unlock()
  97. ff, ok := e.file.(interface{ flush() error })
  98. if !ok {
  99. // Just in case.
  100. return
  101. }
  102. // Create the stop channel.
  103. e.stopTicker = make(chan struct{})
  104. // Start the ticker.
  105. e.flushTicker = time.NewTicker(e.flushInterval)
  106. go func() {
  107. for {
  108. select {
  109. case <-e.flushTicker.C:
  110. e.mutex.Lock()
  111. ff.flush()
  112. e.mutex.Unlock()
  113. case <-e.stopTicker:
  114. return
  115. }
  116. }
  117. }()
  118. }
  119. // Start starts the flush timer if set.
  120. func (e *fileExporter) Start(context.Context, component.Host) error {
  121. if e.flushInterval > 0 {
  122. e.startFlusher()
  123. }
  124. return nil
  125. }
  126. // Shutdown stops the exporter and is invoked during shutdown.
  127. // It stops the flush ticker if set.
  128. func (e *fileExporter) Shutdown(context.Context) error {
  129. e.mutex.Lock()
  130. defer e.mutex.Unlock()
  131. // Stop the flush ticker.
  132. if e.flushTicker != nil {
  133. e.flushTicker.Stop()
  134. // Stop the go routine.
  135. close(e.stopTicker)
  136. }
  137. return e.file.Close()
  138. }
  139. func buildExportFunc(cfg *Config) func(e *fileExporter, buf []byte) error {
  140. if cfg.FormatType == formatTypeProto {
  141. return exportMessageAsBuffer
  142. }
  143. // if the data format is JSON and needs to be compressed, telemetry data can't be written to file in JSON format.
  144. if cfg.FormatType == formatTypeJSON && cfg.Compression != "" {
  145. return exportMessageAsBuffer
  146. }
  147. return exportMessageAsLine
  148. }