logs_exporter_stress_test.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. //go:build integration
  4. // +build integration
  5. package datasetexporter
  6. import (
  7. "context"
  8. "encoding/json"
  9. "fmt"
  10. "math/rand"
  11. "net/http"
  12. "net/http/httptest"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "testing"
  17. "time"
  18. "github.com/stretchr/testify/assert"
  19. "go.opentelemetry.io/collector/component/componenttest"
  20. "go.opentelemetry.io/collector/exporter/exporterhelper"
  21. "go.opentelemetry.io/collector/exporter/exportertest"
  22. "go.opentelemetry.io/collector/pdata/pcommon"
  23. "go.opentelemetry.io/collector/pdata/plog"
  24. )
  25. func TestConsumeLogsManyLogsShouldSucceed(t *testing.T) {
  26. const maxDelay = 200 * time.Millisecond
  27. createSettings := exportertest.NewNopCreateSettings()
  28. const maxBatchCount = 20
  29. const logsPerBatch = 10000
  30. const expectedLogs = uint64(maxBatchCount * logsPerBatch)
  31. attempt := atomic.Uint64{}
  32. wasSuccessful := atomic.Bool{}
  33. processedEvents := atomic.Uint64{}
  34. seenKeys := make(map[string]int64)
  35. expectedKeys := make(map[string]int64)
  36. mutex := &sync.RWMutex{}
  37. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  38. attempt.Add(1)
  39. cer, err := extract(req)
  40. assert.NoError(t, err, "Error reading request: %v", err)
  41. for _, ev := range cer.Events {
  42. processedEvents.Add(1)
  43. key, found := ev.Attrs["body.str"]
  44. assert.True(t, found)
  45. mutex.Lock()
  46. sKey := key.(string)
  47. _, f := seenKeys[sKey]
  48. if !f {
  49. seenKeys[sKey] = 0
  50. }
  51. seenKeys[sKey]++
  52. mutex.Unlock()
  53. }
  54. wasSuccessful.Store(true)
  55. payload, err := json.Marshal(map[string]any{
  56. "status": "success",
  57. "bytesCharged": 42,
  58. })
  59. assert.NoError(t, err)
  60. l, err := w.Write(payload)
  61. assert.Greater(t, l, 1)
  62. assert.NoError(t, err)
  63. }))
  64. defer server.Close()
  65. config := &Config{
  66. DatasetURL: server.URL,
  67. APIKey: "key-lib",
  68. BufferSettings: BufferSettings{
  69. MaxLifetime: maxDelay,
  70. GroupBy: []string{"attributes.container_id"},
  71. RetryShutdownTimeout: time.Minute,
  72. },
  73. RetrySettings: exporterhelper.NewDefaultRetrySettings(),
  74. QueueSettings: exporterhelper.NewDefaultQueueSettings(),
  75. TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
  76. }
  77. logs, err := createLogsExporter(context.Background(), createSettings, config)
  78. waitingTime := time.Duration(0)
  79. if assert.NoError(t, err) {
  80. err = logs.Start(context.Background(), componenttest.NewNopHost())
  81. assert.NoError(t, err)
  82. for bI := 0; bI < maxBatchCount; bI++ {
  83. batch := plog.NewLogs()
  84. rL := batch.ResourceLogs().AppendEmpty()
  85. sL := rL.ScopeLogs().AppendEmpty()
  86. for lI := 0; lI < logsPerBatch; lI++ {
  87. key := fmt.Sprintf("%04d-%06d", bI, lI)
  88. log := sL.LogRecords().AppendEmpty()
  89. log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
  90. log.Body().SetStr(key)
  91. log.Attributes().PutStr("key", key)
  92. log.Attributes().PutStr("p1", strings.Repeat("A", rand.Intn(2000)))
  93. expectedKeys[key] = 1
  94. }
  95. err = logs.ConsumeLogs(context.Background(), batch)
  96. assert.Nil(t, err)
  97. time.Sleep(time.Duration(float64(maxDelay.Nanoseconds()) * 0.7))
  98. }
  99. assert.NotNil(t, logs)
  100. time.Sleep(time.Second)
  101. err = logs.Shutdown(context.Background())
  102. assert.Nil(t, err)
  103. lastProcessed := uint64(0)
  104. sameNumber := 0
  105. for {
  106. t.Logf("Processed events: %d / %d", processedEvents.Load(), expectedLogs)
  107. if lastProcessed == processedEvents.Load() {
  108. sameNumber++
  109. }
  110. if processedEvents.Load() >= expectedLogs || sameNumber > 10 {
  111. break
  112. }
  113. lastProcessed = processedEvents.Load()
  114. time.Sleep(time.Second)
  115. waitingTime += time.Second
  116. }
  117. }
  118. time.Sleep(2 * time.Second)
  119. assert.True(t, wasSuccessful.Load())
  120. assert.Equal(t, seenKeys, expectedKeys)
  121. assert.Equal(t, expectedLogs, processedEvents.Load(), "processed items")
  122. assert.Equal(t, expectedLogs, uint64(len(seenKeys)), "unique items")
  123. }