wal_test.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package prometheusremotewriteexporter
  4. import (
  5. "context"
  6. "sort"
  7. "testing"
  8. "time"
  9. "github.com/prometheus/prometheus/prompb"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. )
  13. func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error {
  14. _ = reqL
  15. return nil
  16. }
  17. func TestWALCreation_nilConfig(t *testing.T) {
  18. config := (*WALConfig)(nil)
  19. pwal, err := newWAL(config, doNothingExportSink)
  20. require.Equal(t, err, errNilConfig)
  21. require.Nil(t, pwal)
  22. }
  23. func TestWALCreation_nonNilConfig(t *testing.T) {
  24. config := &WALConfig{Directory: t.TempDir()}
  25. pwal, err := newWAL(config, doNothingExportSink)
  26. require.NotNil(t, pwal)
  27. assert.Nil(t, err)
  28. assert.NoError(t, pwal.stop())
  29. }
  30. func orderByLabelValueForEach(reqL []*prompb.WriteRequest) {
  31. for _, req := range reqL {
  32. orderByLabelValue(req)
  33. }
  34. }
  35. func orderByLabelValue(wreq *prompb.WriteRequest) {
  36. // Sort the timeSeries by their labels.
  37. type byLabelMessage struct {
  38. label *prompb.Label
  39. sample *prompb.Sample
  40. }
  41. for _, timeSeries := range wreq.Timeseries {
  42. bMsgs := make([]*byLabelMessage, 0, len(wreq.Timeseries)*10)
  43. for i := range timeSeries.Labels {
  44. bMsgs = append(bMsgs, &byLabelMessage{
  45. label: &timeSeries.Labels[i],
  46. sample: &timeSeries.Samples[i],
  47. })
  48. }
  49. sort.Slice(bMsgs, func(i, j int) bool {
  50. return bMsgs[i].label.Value < bMsgs[j].label.Value
  51. })
  52. for i := range bMsgs {
  53. timeSeries.Labels[i] = *bMsgs[i].label
  54. timeSeries.Samples[i] = *bMsgs[i].sample
  55. }
  56. }
  57. // Now finally sort stably by timeseries value for
  58. // which just .String() is good enough for comparison.
  59. sort.Slice(wreq.Timeseries, func(i, j int) bool {
  60. ti, tj := wreq.Timeseries[i], wreq.Timeseries[j]
  61. return ti.String() < tj.String()
  62. })
  63. }
  64. func TestWALStopManyTimes(t *testing.T) {
  65. tempDir := t.TempDir()
  66. config := &WALConfig{
  67. Directory: tempDir,
  68. TruncateFrequency: 60 * time.Microsecond,
  69. BufferSize: 1,
  70. }
  71. pwal, err := newWAL(config, doNothingExportSink)
  72. require.Nil(t, err)
  73. require.NotNil(t, pwal)
  74. // Ensure that invoking .stop() multiple times doesn't cause a panic, but actually
  75. // First close should NOT return an error.
  76. err = pwal.stop()
  77. require.Nil(t, err)
  78. for i := 0; i < 4; i++ {
  79. // Every invocation to .stop() should return an errAlreadyClosed.
  80. err = pwal.stop()
  81. require.Equal(t, err, errAlreadyClosed)
  82. }
  83. }
  84. func TestWAL_persist(t *testing.T) {
  85. // Unit tests that requests written to the WAL persist.
  86. config := &WALConfig{Directory: t.TempDir()}
  87. pwal, err := newWAL(config, doNothingExportSink)
  88. require.Nil(t, err)
  89. // 1. Write out all the entries.
  90. reqL := []*prompb.WriteRequest{
  91. {
  92. Timeseries: []prompb.TimeSeries{
  93. {
  94. Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}},
  95. Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
  96. },
  97. },
  98. },
  99. {
  100. Timeseries: []prompb.TimeSeries{
  101. {
  102. Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}},
  103. Samples: []prompb.Sample{{Value: 2, Timestamp: 200}},
  104. },
  105. {
  106. Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}},
  107. Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
  108. },
  109. },
  110. },
  111. }
  112. ctx := context.Background()
  113. err = pwal.retrieveWALIndices()
  114. require.Nil(t, err)
  115. t.Cleanup(func() {
  116. assert.NoError(t, pwal.stop())
  117. })
  118. err = pwal.persistToWAL(reqL)
  119. require.Nil(t, err)
  120. // 2. Read all the entries from the WAL itself, guided by the indices available,
  121. // and ensure that they are exactly in order as we'd expect them.
  122. wal := pwal.wal
  123. start, err := wal.FirstIndex()
  124. require.Nil(t, err)
  125. end, err := wal.LastIndex()
  126. require.Nil(t, err)
  127. var reqLFromWAL []*prompb.WriteRequest
  128. for i := start; i <= end; i++ {
  129. req, err := pwal.readPrompbFromWAL(ctx, i)
  130. require.Nil(t, err)
  131. reqLFromWAL = append(reqLFromWAL, req)
  132. }
  133. orderByLabelValueForEach(reqL)
  134. orderByLabelValueForEach(reqLFromWAL)
  135. require.Equal(t, reqLFromWAL[0], reqL[0])
  136. require.Equal(t, reqLFromWAL[1], reqL[1])
  137. }