123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package batchperresourceattr
- import (
- "context"
- "errors"
- "math/rand"
- "sort"
- "strconv"
- "testing"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/pdata/ptrace"
- )
- func TestSplitTracesOneResourceSpans(t *testing.T) {
- inBatch := ptrace.NewTraces()
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
- sink := new(consumertest.TracesSink)
- bpr := NewBatchPerResourceTraces("attr_key", sink)
- assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
- outBatches := sink.AllTraces()
- require.Len(t, outBatches, 1)
- assert.Equal(t, inBatch, outBatches[0])
- }
- func TestOriginalResourceSpansUnchanged(t *testing.T) {
- inBatch := ptrace.NewTraces()
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
- sink := new(consumertest.TracesSink)
- bpr := NewBatchPerResourceTraces("attr_key", sink)
- assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
- outBatches := sink.AllTraces()
- require.Len(t, outBatches, 1)
- assert.Equal(t, inBatch, outBatches[0])
- }
- func TestSplitTracesReturnError(t *testing.T) {
- inBatch := ptrace.NewTraces()
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
- err := errors.New("test_error")
- bpr := NewBatchPerResourceTraces("attr_key", consumertest.NewErr(err))
- assert.Equal(t, err, bpr.ConsumeTraces(context.Background(), inBatch))
- }
- func TestSplitTracesSameResource(t *testing.T) {
- inBatch := ptrace.NewTraces()
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "same_attr_val", "1")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "same_attr_val", "1")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "same_attr_val", "1")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "same_attr_val", "1")
- expected := ptrace.NewTraces()
- inBatch.CopyTo(expected)
- sink := new(consumertest.TracesSink)
- bpr := NewBatchPerResourceTraces("same_attr_val", sink)
- assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
- outBatches := sink.AllTraces()
- require.Len(t, outBatches, 1)
- assert.Equal(t, expected, outBatches[0])
- }
- func TestSplitTracesIntoDifferentBatches(t *testing.T) {
- inBatch := ptrace.NewTraces()
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "2")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "3")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "4")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "2")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "3")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "4")
- fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "diff_attr_key", "1")
- expected := ptrace.NewTraces()
- inBatch.CopyTo(expected)
- sink := new(consumertest.TracesSink)
- bpr := NewBatchPerResourceTraces("attr_key", sink)
- assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
- outBatches := sink.AllTraces()
- require.Len(t, outBatches, 5)
- sortTraces(outBatches, "attr_key")
- assert.Equal(t, newTraces(expected.ResourceSpans().At(8)), outBatches[0])
- assert.Equal(t, newTraces(expected.ResourceSpans().At(0), expected.ResourceSpans().At(4)), outBatches[1])
- assert.Equal(t, newTraces(expected.ResourceSpans().At(1), expected.ResourceSpans().At(5)), outBatches[2])
- assert.Equal(t, newTraces(expected.ResourceSpans().At(2), expected.ResourceSpans().At(6)), outBatches[3])
- assert.Equal(t, newTraces(expected.ResourceSpans().At(3), expected.ResourceSpans().At(7)), outBatches[4])
- }
- func TestSplitMetricsOneResourceMetrics(t *testing.T) {
- inBatch := pmetric.NewMetrics()
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
- expected := pmetric.NewMetrics()
- inBatch.CopyTo(expected)
- sink := new(consumertest.MetricsSink)
- bpr := NewBatchPerResourceMetrics("attr_key", sink)
- assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
- outBatches := sink.AllMetrics()
- require.Len(t, outBatches, 1)
- assert.Equal(t, expected, outBatches[0])
- }
- func TestOriginalResourceMetricsUnchanged(t *testing.T) {
- inBatch := pmetric.NewMetrics()
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
- sink := new(consumertest.MetricsSink)
- bpr := NewBatchPerResourceMetrics("attr_key", sink)
- assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
- outBatches := sink.AllMetrics()
- require.Len(t, outBatches, 1)
- assert.Equal(t, inBatch, outBatches[0])
- }
- func TestSplitMetricsReturnError(t *testing.T) {
- inBatch := pmetric.NewMetrics()
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
- err := errors.New("test_error")
- bpr := NewBatchPerResourceMetrics("attr_key", consumertest.NewErr(err))
- assert.Equal(t, err, bpr.ConsumeMetrics(context.Background(), inBatch))
- }
- func TestSplitMetricsSameResource(t *testing.T) {
- inBatch := pmetric.NewMetrics()
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "same_attr_val", "1")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "same_attr_val", "1")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "same_attr_val", "1")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "same_attr_val", "1")
- expected := pmetric.NewMetrics()
- inBatch.CopyTo(expected)
- sink := new(consumertest.MetricsSink)
- bpr := NewBatchPerResourceMetrics("same_attr_val", sink)
- assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
- outBatches := sink.AllMetrics()
- require.Len(t, outBatches, 1)
- assert.Equal(t, expected, outBatches[0])
- }
- func TestSplitMetricsIntoDifferentBatches(t *testing.T) {
- inBatch := pmetric.NewMetrics()
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "2")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "3")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "4")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "2")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "3")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "4")
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "diff_attr_key", "1")
- expected := pmetric.NewMetrics()
- inBatch.CopyTo(expected)
- sink := new(consumertest.MetricsSink)
- bpr := NewBatchPerResourceMetrics("attr_key", sink)
- assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
- outBatches := sink.AllMetrics()
- require.Len(t, outBatches, 5)
- sortMetrics(outBatches, "attr_key")
- assert.Equal(t, newMetrics(expected.ResourceMetrics().At(8)), outBatches[0])
- assert.Equal(t, newMetrics(expected.ResourceMetrics().At(0), expected.ResourceMetrics().At(4)), outBatches[1])
- assert.Equal(t, newMetrics(expected.ResourceMetrics().At(1), expected.ResourceMetrics().At(5)), outBatches[2])
- assert.Equal(t, newMetrics(expected.ResourceMetrics().At(2), expected.ResourceMetrics().At(6)), outBatches[3])
- assert.Equal(t, newMetrics(expected.ResourceMetrics().At(3), expected.ResourceMetrics().At(7)), outBatches[4])
- }
- func TestSplitLogsOneResourceLogs(t *testing.T) {
- inBatch := plog.NewLogs()
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
- expected := plog.NewLogs()
- inBatch.CopyTo(expected)
- sink := new(consumertest.LogsSink)
- bpr := NewBatchPerResourceLogs("attr_key", sink)
- assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
- outBatches := sink.AllLogs()
- require.Len(t, outBatches, 1)
- assert.Equal(t, expected, outBatches[0])
- }
- func TestOriginalResourceLogsUnchanged(t *testing.T) {
- inBatch := plog.NewLogs()
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
- sink := new(consumertest.LogsSink)
- bpr := NewBatchPerResourceLogs("attr_key", sink)
- assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
- outBatches := sink.AllLogs()
- require.Len(t, outBatches, 1)
- assert.Equal(t, inBatch, outBatches[0])
- }
- func TestSplitLogsReturnError(t *testing.T) {
- inBatch := plog.NewLogs()
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
- err := errors.New("test_error")
- bpr := NewBatchPerResourceLogs("attr_key", consumertest.NewErr(err))
- assert.Equal(t, err, bpr.ConsumeLogs(context.Background(), inBatch))
- }
- func TestSplitLogsSameResource(t *testing.T) {
- inBatch := plog.NewLogs()
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "same_attr_val", "1")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "same_attr_val", "1")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "same_attr_val", "1")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "same_attr_val", "1")
- expected := plog.NewLogs()
- inBatch.CopyTo(expected)
- sink := new(consumertest.LogsSink)
- bpr := NewBatchPerResourceLogs("same_attr_val", sink)
- assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
- outBatches := sink.AllLogs()
- require.Len(t, outBatches, 1)
- assert.Equal(t, expected, outBatches[0])
- }
- func TestSplitLogsIntoDifferentBatches(t *testing.T) {
- inBatch := plog.NewLogs()
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "2")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "3")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "4")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "2")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "3")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "4")
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "diff_attr_key", "1")
- expected := plog.NewLogs()
- inBatch.CopyTo(expected)
- sink := new(consumertest.LogsSink)
- bpr := NewBatchPerResourceLogs("attr_key", sink)
- assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
- outBatches := sink.AllLogs()
- require.Len(t, outBatches, 5)
- sortLogs(outBatches, "attr_key")
- assert.Equal(t, newLogs(expected.ResourceLogs().At(8)), outBatches[0])
- assert.Equal(t, newLogs(expected.ResourceLogs().At(0), expected.ResourceLogs().At(4)), outBatches[1])
- assert.Equal(t, newLogs(expected.ResourceLogs().At(1), expected.ResourceLogs().At(5)), outBatches[2])
- assert.Equal(t, newLogs(expected.ResourceLogs().At(2), expected.ResourceLogs().At(6)), outBatches[3])
- assert.Equal(t, newLogs(expected.ResourceLogs().At(3), expected.ResourceLogs().At(7)), outBatches[4])
- }
- func newTraces(rss ...ptrace.ResourceSpans) ptrace.Traces {
- td := ptrace.NewTraces()
- for _, rs := range rss {
- rs.CopyTo(td.ResourceSpans().AppendEmpty())
- }
- return td
- }
- func sortTraces(tds []ptrace.Traces, attrKey string) {
- sort.Slice(tds, func(i, j int) bool {
- valI := ""
- if av, ok := tds[i].ResourceSpans().At(0).Resource().Attributes().Get(attrKey); ok {
- valI = av.Str()
- }
- valJ := ""
- if av, ok := tds[j].ResourceSpans().At(0).Resource().Attributes().Get(attrKey); ok {
- valJ = av.Str()
- }
- return valI < valJ
- })
- }
- func fillResourceSpans(rs ptrace.ResourceSpans, key string, val string) {
- rs.Resource().Attributes().PutStr(key, val)
- rs.Resource().Attributes().PutInt("__other_key__", 123)
- ils := rs.ScopeSpans().AppendEmpty()
- firstSpan := ils.Spans().AppendEmpty()
- firstSpan.SetName("first-span")
- firstSpan.SetTraceID(pcommon.TraceID([16]byte{byte(rand.Int())}))
- secondSpan := ils.Spans().AppendEmpty()
- secondSpan.SetName("second-span")
- secondSpan.SetTraceID(pcommon.TraceID([16]byte{byte(rand.Int())}))
- }
- func newMetrics(rms ...pmetric.ResourceMetrics) pmetric.Metrics {
- md := pmetric.NewMetrics()
- for _, rm := range rms {
- rm.CopyTo(md.ResourceMetrics().AppendEmpty())
- }
- return md
- }
- func sortMetrics(tds []pmetric.Metrics, attrKey string) {
- sort.Slice(tds, func(i, j int) bool {
- valI := ""
- if av, ok := tds[i].ResourceMetrics().At(0).Resource().Attributes().Get(attrKey); ok {
- valI = av.Str()
- }
- valJ := ""
- if av, ok := tds[j].ResourceMetrics().At(0).Resource().Attributes().Get(attrKey); ok {
- valJ = av.Str()
- }
- return valI < valJ
- })
- }
- func fillResourceMetrics(rs pmetric.ResourceMetrics, key string, val string) {
- rs.Resource().Attributes().PutStr(key, val)
- rs.Resource().Attributes().PutInt("__other_key__", 123)
- ils := rs.ScopeMetrics().AppendEmpty()
- firstMetric := ils.Metrics().AppendEmpty()
- firstMetric.SetName("first-metric")
- firstMetric.SetEmptyGauge()
- secondMetric := ils.Metrics().AppendEmpty()
- secondMetric.SetName("second-metric")
- secondMetric.SetEmptySum()
- }
- func newLogs(rls ...plog.ResourceLogs) plog.Logs {
- ld := plog.NewLogs()
- for _, rl := range rls {
- rl.CopyTo(ld.ResourceLogs().AppendEmpty())
- }
- return ld
- }
- func sortLogs(tds []plog.Logs, attrKey string) {
- sort.Slice(tds, func(i, j int) bool {
- valI := ""
- if av, ok := tds[i].ResourceLogs().At(0).Resource().Attributes().Get(attrKey); ok {
- valI = av.Str()
- }
- valJ := ""
- if av, ok := tds[j].ResourceLogs().At(0).Resource().Attributes().Get(attrKey); ok {
- valJ = av.Str()
- }
- return valI < valJ
- })
- }
- func fillResourceLogs(rs plog.ResourceLogs, key string, val string) {
- rs.Resource().Attributes().PutStr(key, val)
- rs.Resource().Attributes().PutInt("__other_key__", 123)
- ils := rs.ScopeLogs().AppendEmpty()
- firstLogRecord := ils.LogRecords().AppendEmpty()
- firstLogRecord.SetFlags(plog.LogRecordFlags(rand.Int31()))
- secondLogRecord := ils.LogRecords().AppendEmpty()
- secondLogRecord.SetFlags(plog.LogRecordFlags(rand.Int31()))
- }
- func BenchmarkBatchPerResourceTraces(b *testing.B) {
- inBatch := ptrace.NewTraces()
- rss := inBatch.ResourceSpans()
- rss.EnsureCapacity(64)
- for i := 0; i < 64; i++ {
- fillResourceSpans(rss.AppendEmpty(), "attr_key", strconv.Itoa(i%8))
- }
- bpr := NewBatchPerResourceTraces("attr_key", consumertest.NewNop())
- b.ReportAllocs()
- b.ResetTimer()
- for n := 0; n < b.N; n++ {
- if err := bpr.ConsumeTraces(context.Background(), inBatch); err != nil {
- b.Fail()
- }
- }
- }
- func BenchmarkBatchPerResourceMetrics(b *testing.B) {
- inBatch := pmetric.NewMetrics()
- inBatch.ResourceMetrics().EnsureCapacity(64)
- for i := 0; i < 64; i++ {
- fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", strconv.Itoa(i%8))
- }
- bpr := NewBatchPerResourceMetrics("attr_key", consumertest.NewNop())
- b.ReportAllocs()
- b.ResetTimer()
- for n := 0; n < b.N; n++ {
- if err := bpr.ConsumeMetrics(context.Background(), inBatch); err != nil {
- b.Fail()
- }
- }
- }
- func BenchmarkBatchPerResourceLogs(b *testing.B) {
- inBatch := plog.NewLogs()
- inBatch.ResourceLogs().EnsureCapacity(64)
- for i := 0; i < 64; i++ {
- fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", strconv.Itoa(i%8))
- }
- bpr := NewBatchPerResourceLogs("attr_key", consumertest.NewNop())
- b.ReportAllocs()
- b.ResetTimer()
- for n := 0; n < b.N; n++ {
- if err := bpr.ConsumeLogs(context.Background(), inBatch); err != nil {
- b.Fail()
- }
- }
- }
|