file_reader_test.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package filereceiver
  4. import (
  5. "context"
  6. "os"
  7. "path/filepath"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. "go.opentelemetry.io/collector/consumer/consumertest"
  13. "go.opentelemetry.io/collector/pdata/pmetric"
  14. )
  15. func TestFileReader_Readline(t *testing.T) {
  16. tc := testConsumer{}
  17. f, err := os.Open(filepath.Join("testdata", "metrics.json"))
  18. require.NoError(t, err)
  19. fr := newFileReader(&tc, f, newReplayTimer(0))
  20. err = fr.readLine(context.Background())
  21. require.NoError(t, err)
  22. assert.Equal(t, 1, len(tc.consumed))
  23. metrics := tc.consumed[0]
  24. assert.Equal(t, 26, metrics.MetricCount())
  25. byName := metricsByName(metrics)
  26. rcpMetric := byName["redis.commands.processed"]
  27. v := rcpMetric.Sum().DataPoints().At(0).IntValue()
  28. const testdataValue = 2076
  29. assert.EqualValues(t, testdataValue, v)
  30. }
  31. func TestFileReader_Cancellation(t *testing.T) {
  32. fr := fileReader{
  33. consumer: consumertest.NewNop(),
  34. stringReader: blockingStringReader{},
  35. }
  36. ctx, cancel := context.WithCancel(context.Background())
  37. errs := make(chan error)
  38. go func() {
  39. errs <- fr.readAll(ctx)
  40. }()
  41. cancel()
  42. require.Equal(t, 0, len(errs))
  43. }
  44. func TestFileReader_ReadAll(t *testing.T) {
  45. tc := testConsumer{}
  46. f, err := os.Open(filepath.Join("testdata", "metrics.json"))
  47. require.NoError(t, err)
  48. sleeper := &fakeSleeper{}
  49. rt := &replayTimer{
  50. throttle: 2,
  51. sleepFunc: sleeper.fakeSleep,
  52. }
  53. fr := newFileReader(&tc, f, rt)
  54. err = fr.readAll(context.Background())
  55. require.NoError(t, err)
  56. const expectedSleeps = 10
  57. assert.Len(t, sleeper.durations, expectedSleeps)
  58. assert.EqualValues(t, 0, sleeper.durations[0])
  59. for i := 1; i < expectedSleeps; i++ {
  60. expected := time.Second * 4
  61. actual := sleeper.durations[i]
  62. delta := time.Millisecond * 10
  63. assert.InDelta(t, float64(expected), float64(actual), float64(delta))
  64. }
  65. }
  66. type blockingStringReader struct {
  67. }
  68. func (sr blockingStringReader) ReadString(byte) (string, error) {
  69. select {}
  70. }
  71. func metricsByName(pm pmetric.Metrics) map[string]pmetric.Metric {
  72. out := map[string]pmetric.Metric{}
  73. for i := 0; i < pm.ResourceMetrics().Len(); i++ {
  74. sms := pm.ResourceMetrics().At(i).ScopeMetrics()
  75. for j := 0; j < sms.Len(); j++ {
  76. ms := sms.At(j).Metrics()
  77. for k := 0; k < ms.Len(); k++ {
  78. metric := ms.At(k)
  79. out[metric.Name()] = metric
  80. }
  81. }
  82. }
  83. return out
  84. }