123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- //go:build integration
- // +build integration
- package datasetexporter
- import (
- "context"
- "encoding/json"
- "fmt"
- "math/rand"
- "net/http"
- "net/http/httptest"
- "strings"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/exporter/exporterhelper"
- "go.opentelemetry.io/collector/exporter/exportertest"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/plog"
- )
- func TestConsumeLogsManyLogsShouldSucceed(t *testing.T) {
- const maxDelay = 200 * time.Millisecond
- createSettings := exportertest.NewNopCreateSettings()
- const maxBatchCount = 20
- const logsPerBatch = 10000
- const expectedLogs = uint64(maxBatchCount * logsPerBatch)
- attempt := atomic.Uint64{}
- wasSuccessful := atomic.Bool{}
- processedEvents := atomic.Uint64{}
- seenKeys := make(map[string]int64)
- expectedKeys := make(map[string]int64)
- mutex := &sync.RWMutex{}
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- attempt.Add(1)
- cer, err := extract(req)
- assert.NoError(t, err, "Error reading request: %v", err)
- for _, ev := range cer.Events {
- processedEvents.Add(1)
- key, found := ev.Attrs["body.str"]
- assert.True(t, found)
- mutex.Lock()
- sKey := key.(string)
- _, f := seenKeys[sKey]
- if !f {
- seenKeys[sKey] = 0
- }
- seenKeys[sKey]++
- mutex.Unlock()
- }
- wasSuccessful.Store(true)
- payload, err := json.Marshal(map[string]any{
- "status": "success",
- "bytesCharged": 42,
- })
- assert.NoError(t, err)
- l, err := w.Write(payload)
- assert.Greater(t, l, 1)
- assert.NoError(t, err)
- }))
- defer server.Close()
- config := &Config{
- DatasetURL: server.URL,
- APIKey: "key-lib",
- BufferSettings: BufferSettings{
- MaxLifetime: maxDelay,
- GroupBy: []string{"attributes.container_id"},
- RetryShutdownTimeout: time.Minute,
- },
- RetrySettings: exporterhelper.NewDefaultRetrySettings(),
- QueueSettings: exporterhelper.NewDefaultQueueSettings(),
- TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
- }
- logs, err := createLogsExporter(context.Background(), createSettings, config)
- waitingTime := time.Duration(0)
- if assert.NoError(t, err) {
- err = logs.Start(context.Background(), componenttest.NewNopHost())
- assert.NoError(t, err)
- for bI := 0; bI < maxBatchCount; bI++ {
- batch := plog.NewLogs()
- rL := batch.ResourceLogs().AppendEmpty()
- sL := rL.ScopeLogs().AppendEmpty()
- for lI := 0; lI < logsPerBatch; lI++ {
- key := fmt.Sprintf("%04d-%06d", bI, lI)
- log := sL.LogRecords().AppendEmpty()
- log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
- log.Body().SetStr(key)
- log.Attributes().PutStr("key", key)
- log.Attributes().PutStr("p1", strings.Repeat("A", rand.Intn(2000)))
- expectedKeys[key] = 1
- }
- err = logs.ConsumeLogs(context.Background(), batch)
- assert.Nil(t, err)
- time.Sleep(time.Duration(float64(maxDelay.Nanoseconds()) * 0.7))
- }
- assert.NotNil(t, logs)
- time.Sleep(time.Second)
- err = logs.Shutdown(context.Background())
- assert.Nil(t, err)
- lastProcessed := uint64(0)
- sameNumber := 0
- for {
- t.Logf("Processed events: %d / %d", processedEvents.Load(), expectedLogs)
- if lastProcessed == processedEvents.Load() {
- sameNumber++
- }
- if processedEvents.Load() >= expectedLogs || sameNumber > 10 {
- break
- }
- lastProcessed = processedEvents.Load()
- time.Sleep(time.Second)
- waitingTime += time.Second
- }
- }
- time.Sleep(2 * time.Second)
- assert.True(t, wasSuccessful.Load())
- assert.Equal(t, seenKeys, expectedKeys)
- assert.Equal(t, expectedLogs, processedEvents.Load(), "processed items")
- assert.Equal(t, expectedLogs, uint64(len(seenKeys)), "unique items")
- }
|