file_reader.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package filereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filereceiver"
  4. import (
  5. "bufio"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "os"
  11. "go.opentelemetry.io/collector/consumer"
  12. "go.opentelemetry.io/collector/pdata/pcommon"
  13. "go.opentelemetry.io/collector/pdata/pmetric"
  14. )
  15. // stringReader is the only function we use from *bufio.Reader. We define it
  16. // so that it can be swapped out for testing.
  17. type stringReader interface {
  18. ReadString(delim byte) (string, error)
  19. }
  20. // fileReader
  21. type fileReader struct {
  22. stringReader stringReader
  23. unm pmetric.Unmarshaler
  24. consumer consumer.Metrics
  25. timer *replayTimer
  26. }
  27. func newFileReader(consumer consumer.Metrics, file *os.File, timer *replayTimer) fileReader {
  28. return fileReader{
  29. consumer: consumer,
  30. stringReader: bufio.NewReader(file),
  31. unm: &pmetric.JSONUnmarshaler{},
  32. timer: timer,
  33. }
  34. }
  35. // readAll calls readline for each line in the file until all lines have been
  36. // read or the context is cancelled.
  37. func (fr fileReader) readAll(ctx context.Context) error {
  38. for {
  39. select {
  40. case <-ctx.Done():
  41. return nil
  42. default:
  43. err := fr.readLine(ctx)
  44. if err != nil {
  45. if errors.Is(err, io.EOF) {
  46. return nil
  47. }
  48. return err
  49. }
  50. }
  51. }
  52. }
  53. // readLine reads the next line in the file, converting it into metrics and
  54. // passing it to the the consumer member.
  55. func (fr fileReader) readLine(ctx context.Context) error {
  56. line, err := fr.stringReader.ReadString('\n')
  57. if err != nil {
  58. return fmt.Errorf("failed to read line from input file: %w", err)
  59. }
  60. metrics, err := fr.unm.UnmarshalMetrics([]byte(line))
  61. if err != nil {
  62. return fmt.Errorf("failed to unmarshal metrics: %w", err)
  63. }
  64. err = fr.timer.wait(ctx, getFirstTimestamp(metrics))
  65. if err != nil {
  66. return fmt.Errorf("readLine interrupted while waiting for timer: %w", err)
  67. }
  68. return fr.consumer.ConsumeMetrics(ctx, metrics)
  69. }
  70. func getFirstTimestamp(metrics pmetric.Metrics) pcommon.Timestamp {
  71. resourceMetrics := metrics.ResourceMetrics()
  72. if resourceMetrics.Len() == 0 {
  73. return 0
  74. }
  75. scopeMetrics := resourceMetrics.At(0).ScopeMetrics()
  76. if scopeMetrics.Len() == 0 {
  77. return 0
  78. }
  79. metricSlice := scopeMetrics.At(0).Metrics()
  80. if metricSlice.Len() == 0 {
  81. return 0
  82. }
  83. return getFirstTimestampFromMetric(metricSlice.At(0))
  84. }
  85. func getFirstTimestampFromMetric(metric pmetric.Metric) pcommon.Timestamp {
  86. //exhaustive:enforce
  87. switch metric.Type() {
  88. case pmetric.MetricTypeGauge:
  89. dps := metric.Gauge().DataPoints()
  90. if dps.Len() == 0 {
  91. return 0
  92. }
  93. return dps.At(0).Timestamp()
  94. case pmetric.MetricTypeSum:
  95. dps := metric.Sum().DataPoints()
  96. if dps.Len() == 0 {
  97. return 0
  98. }
  99. return dps.At(0).Timestamp()
  100. case pmetric.MetricTypeSummary:
  101. dps := metric.Summary().DataPoints()
  102. if dps.Len() == 0 {
  103. return 0
  104. }
  105. return dps.At(0).Timestamp()
  106. case pmetric.MetricTypeHistogram:
  107. dps := metric.Histogram().DataPoints()
  108. if dps.Len() == 0 {
  109. return 0
  110. }
  111. return dps.At(0).Timestamp()
  112. case pmetric.MetricTypeExponentialHistogram:
  113. dps := metric.ExponentialHistogram().DataPoints()
  114. if dps.Len() == 0 {
  115. return 0
  116. }
  117. return dps.At(0).Timestamp()
  118. case pmetric.MetricTypeEmpty:
  119. return 0
  120. }
  121. return 0
  122. }