123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package prometheusremotewriteexporter
- import (
- "context"
- "sort"
- "testing"
- "time"
- "github.com/prometheus/prometheus/prompb"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- )
- func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error {
- _ = reqL
- return nil
- }
- func TestWALCreation_nilConfig(t *testing.T) {
- config := (*WALConfig)(nil)
- pwal, err := newWAL(config, doNothingExportSink)
- require.Equal(t, err, errNilConfig)
- require.Nil(t, pwal)
- }
- func TestWALCreation_nonNilConfig(t *testing.T) {
- config := &WALConfig{Directory: t.TempDir()}
- pwal, err := newWAL(config, doNothingExportSink)
- require.NotNil(t, pwal)
- assert.Nil(t, err)
- assert.NoError(t, pwal.stop())
- }
- func orderByLabelValueForEach(reqL []*prompb.WriteRequest) {
- for _, req := range reqL {
- orderByLabelValue(req)
- }
- }
- func orderByLabelValue(wreq *prompb.WriteRequest) {
- // Sort the timeSeries by their labels.
- type byLabelMessage struct {
- label *prompb.Label
- sample *prompb.Sample
- }
- for _, timeSeries := range wreq.Timeseries {
- bMsgs := make([]*byLabelMessage, 0, len(wreq.Timeseries)*10)
- for i := range timeSeries.Labels {
- bMsgs = append(bMsgs, &byLabelMessage{
- label: &timeSeries.Labels[i],
- sample: &timeSeries.Samples[i],
- })
- }
- sort.Slice(bMsgs, func(i, j int) bool {
- return bMsgs[i].label.Value < bMsgs[j].label.Value
- })
- for i := range bMsgs {
- timeSeries.Labels[i] = *bMsgs[i].label
- timeSeries.Samples[i] = *bMsgs[i].sample
- }
- }
- // Now finally sort stably by timeseries value for
- // which just .String() is good enough for comparison.
- sort.Slice(wreq.Timeseries, func(i, j int) bool {
- ti, tj := wreq.Timeseries[i], wreq.Timeseries[j]
- return ti.String() < tj.String()
- })
- }
- func TestWALStopManyTimes(t *testing.T) {
- tempDir := t.TempDir()
- config := &WALConfig{
- Directory: tempDir,
- TruncateFrequency: 60 * time.Microsecond,
- BufferSize: 1,
- }
- pwal, err := newWAL(config, doNothingExportSink)
- require.Nil(t, err)
- require.NotNil(t, pwal)
- // Ensure that invoking .stop() multiple times doesn't cause a panic, but actually
- // First close should NOT return an error.
- err = pwal.stop()
- require.Nil(t, err)
- for i := 0; i < 4; i++ {
- // Every invocation to .stop() should return an errAlreadyClosed.
- err = pwal.stop()
- require.Equal(t, err, errAlreadyClosed)
- }
- }
- func TestWAL_persist(t *testing.T) {
- // Unit tests that requests written to the WAL persist.
- config := &WALConfig{Directory: t.TempDir()}
- pwal, err := newWAL(config, doNothingExportSink)
- require.Nil(t, err)
- // 1. Write out all the entries.
- reqL := []*prompb.WriteRequest{
- {
- Timeseries: []prompb.TimeSeries{
- {
- Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}},
- Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
- },
- },
- },
- {
- Timeseries: []prompb.TimeSeries{
- {
- Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}},
- Samples: []prompb.Sample{{Value: 2, Timestamp: 200}},
- },
- {
- Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}},
- Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
- },
- },
- },
- }
- ctx := context.Background()
- err = pwal.retrieveWALIndices()
- require.Nil(t, err)
- t.Cleanup(func() {
- assert.NoError(t, pwal.stop())
- })
- err = pwal.persistToWAL(reqL)
- require.Nil(t, err)
- // 2. Read all the entries from the WAL itself, guided by the indices available,
- // and ensure that they are exactly in order as we'd expect them.
- wal := pwal.wal
- start, err := wal.FirstIndex()
- require.Nil(t, err)
- end, err := wal.LastIndex()
- require.Nil(t, err)
- var reqLFromWAL []*prompb.WriteRequest
- for i := start; i <= end; i++ {
- req, err := pwal.readPrompbFromWAL(ctx, i)
- require.Nil(t, err)
- reqLFromWAL = append(reqLFromWAL, req)
- }
- orderByLabelValueForEach(reqL)
- orderByLabelValueForEach(reqLFromWAL)
- require.Equal(t, reqLFromWAL[0], reqL[0])
- require.Equal(t, reqLFromWAL[1], reqL[1])
- }
|