batchperresourceattr_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package batchperresourceattr
  4. import (
  5. "context"
  6. "errors"
  7. "math/rand"
  8. "sort"
  9. "strconv"
  10. "testing"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/stretchr/testify/require"
  13. "go.opentelemetry.io/collector/consumer/consumertest"
  14. "go.opentelemetry.io/collector/pdata/pcommon"
  15. "go.opentelemetry.io/collector/pdata/plog"
  16. "go.opentelemetry.io/collector/pdata/pmetric"
  17. "go.opentelemetry.io/collector/pdata/ptrace"
  18. )
  19. func TestSplitTracesOneResourceSpans(t *testing.T) {
  20. inBatch := ptrace.NewTraces()
  21. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
  22. sink := new(consumertest.TracesSink)
  23. bpr := NewBatchPerResourceTraces("attr_key", sink)
  24. assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
  25. outBatches := sink.AllTraces()
  26. require.Len(t, outBatches, 1)
  27. assert.Equal(t, inBatch, outBatches[0])
  28. }
  29. func TestOriginalResourceSpansUnchanged(t *testing.T) {
  30. inBatch := ptrace.NewTraces()
  31. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
  32. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
  33. sink := new(consumertest.TracesSink)
  34. bpr := NewBatchPerResourceTraces("attr_key", sink)
  35. assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
  36. outBatches := sink.AllTraces()
  37. require.Len(t, outBatches, 1)
  38. assert.Equal(t, inBatch, outBatches[0])
  39. }
  40. func TestSplitTracesReturnError(t *testing.T) {
  41. inBatch := ptrace.NewTraces()
  42. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
  43. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
  44. err := errors.New("test_error")
  45. bpr := NewBatchPerResourceTraces("attr_key", consumertest.NewErr(err))
  46. assert.Equal(t, err, bpr.ConsumeTraces(context.Background(), inBatch))
  47. }
  48. func TestSplitTracesSameResource(t *testing.T) {
  49. inBatch := ptrace.NewTraces()
  50. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "same_attr_val", "1")
  51. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "same_attr_val", "1")
  52. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "same_attr_val", "1")
  53. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "same_attr_val", "1")
  54. expected := ptrace.NewTraces()
  55. inBatch.CopyTo(expected)
  56. sink := new(consumertest.TracesSink)
  57. bpr := NewBatchPerResourceTraces("same_attr_val", sink)
  58. assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
  59. outBatches := sink.AllTraces()
  60. require.Len(t, outBatches, 1)
  61. assert.Equal(t, expected, outBatches[0])
  62. }
  63. func TestSplitTracesIntoDifferentBatches(t *testing.T) {
  64. inBatch := ptrace.NewTraces()
  65. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
  66. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "2")
  67. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "3")
  68. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "4")
  69. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "1")
  70. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "2")
  71. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "3")
  72. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "attr_key", "4")
  73. fillResourceSpans(inBatch.ResourceSpans().AppendEmpty(), "diff_attr_key", "1")
  74. expected := ptrace.NewTraces()
  75. inBatch.CopyTo(expected)
  76. sink := new(consumertest.TracesSink)
  77. bpr := NewBatchPerResourceTraces("attr_key", sink)
  78. assert.NoError(t, bpr.ConsumeTraces(context.Background(), inBatch))
  79. outBatches := sink.AllTraces()
  80. require.Len(t, outBatches, 5)
  81. sortTraces(outBatches, "attr_key")
  82. assert.Equal(t, newTraces(expected.ResourceSpans().At(8)), outBatches[0])
  83. assert.Equal(t, newTraces(expected.ResourceSpans().At(0), expected.ResourceSpans().At(4)), outBatches[1])
  84. assert.Equal(t, newTraces(expected.ResourceSpans().At(1), expected.ResourceSpans().At(5)), outBatches[2])
  85. assert.Equal(t, newTraces(expected.ResourceSpans().At(2), expected.ResourceSpans().At(6)), outBatches[3])
  86. assert.Equal(t, newTraces(expected.ResourceSpans().At(3), expected.ResourceSpans().At(7)), outBatches[4])
  87. }
  88. func TestSplitMetricsOneResourceMetrics(t *testing.T) {
  89. inBatch := pmetric.NewMetrics()
  90. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
  91. expected := pmetric.NewMetrics()
  92. inBatch.CopyTo(expected)
  93. sink := new(consumertest.MetricsSink)
  94. bpr := NewBatchPerResourceMetrics("attr_key", sink)
  95. assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
  96. outBatches := sink.AllMetrics()
  97. require.Len(t, outBatches, 1)
  98. assert.Equal(t, expected, outBatches[0])
  99. }
  100. func TestOriginalResourceMetricsUnchanged(t *testing.T) {
  101. inBatch := pmetric.NewMetrics()
  102. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
  103. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
  104. sink := new(consumertest.MetricsSink)
  105. bpr := NewBatchPerResourceMetrics("attr_key", sink)
  106. assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
  107. outBatches := sink.AllMetrics()
  108. require.Len(t, outBatches, 1)
  109. assert.Equal(t, inBatch, outBatches[0])
  110. }
  111. func TestSplitMetricsReturnError(t *testing.T) {
  112. inBatch := pmetric.NewMetrics()
  113. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
  114. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
  115. err := errors.New("test_error")
  116. bpr := NewBatchPerResourceMetrics("attr_key", consumertest.NewErr(err))
  117. assert.Equal(t, err, bpr.ConsumeMetrics(context.Background(), inBatch))
  118. }
  119. func TestSplitMetricsSameResource(t *testing.T) {
  120. inBatch := pmetric.NewMetrics()
  121. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "same_attr_val", "1")
  122. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "same_attr_val", "1")
  123. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "same_attr_val", "1")
  124. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "same_attr_val", "1")
  125. expected := pmetric.NewMetrics()
  126. inBatch.CopyTo(expected)
  127. sink := new(consumertest.MetricsSink)
  128. bpr := NewBatchPerResourceMetrics("same_attr_val", sink)
  129. assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
  130. outBatches := sink.AllMetrics()
  131. require.Len(t, outBatches, 1)
  132. assert.Equal(t, expected, outBatches[0])
  133. }
  134. func TestSplitMetricsIntoDifferentBatches(t *testing.T) {
  135. inBatch := pmetric.NewMetrics()
  136. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
  137. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "2")
  138. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "3")
  139. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "4")
  140. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "1")
  141. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "2")
  142. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "3")
  143. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", "4")
  144. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "diff_attr_key", "1")
  145. expected := pmetric.NewMetrics()
  146. inBatch.CopyTo(expected)
  147. sink := new(consumertest.MetricsSink)
  148. bpr := NewBatchPerResourceMetrics("attr_key", sink)
  149. assert.NoError(t, bpr.ConsumeMetrics(context.Background(), inBatch))
  150. outBatches := sink.AllMetrics()
  151. require.Len(t, outBatches, 5)
  152. sortMetrics(outBatches, "attr_key")
  153. assert.Equal(t, newMetrics(expected.ResourceMetrics().At(8)), outBatches[0])
  154. assert.Equal(t, newMetrics(expected.ResourceMetrics().At(0), expected.ResourceMetrics().At(4)), outBatches[1])
  155. assert.Equal(t, newMetrics(expected.ResourceMetrics().At(1), expected.ResourceMetrics().At(5)), outBatches[2])
  156. assert.Equal(t, newMetrics(expected.ResourceMetrics().At(2), expected.ResourceMetrics().At(6)), outBatches[3])
  157. assert.Equal(t, newMetrics(expected.ResourceMetrics().At(3), expected.ResourceMetrics().At(7)), outBatches[4])
  158. }
  159. func TestSplitLogsOneResourceLogs(t *testing.T) {
  160. inBatch := plog.NewLogs()
  161. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
  162. expected := plog.NewLogs()
  163. inBatch.CopyTo(expected)
  164. sink := new(consumertest.LogsSink)
  165. bpr := NewBatchPerResourceLogs("attr_key", sink)
  166. assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
  167. outBatches := sink.AllLogs()
  168. require.Len(t, outBatches, 1)
  169. assert.Equal(t, expected, outBatches[0])
  170. }
  171. func TestOriginalResourceLogsUnchanged(t *testing.T) {
  172. inBatch := plog.NewLogs()
  173. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
  174. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
  175. sink := new(consumertest.LogsSink)
  176. bpr := NewBatchPerResourceLogs("attr_key", sink)
  177. assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
  178. outBatches := sink.AllLogs()
  179. require.Len(t, outBatches, 1)
  180. assert.Equal(t, inBatch, outBatches[0])
  181. }
  182. func TestSplitLogsReturnError(t *testing.T) {
  183. inBatch := plog.NewLogs()
  184. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
  185. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
  186. err := errors.New("test_error")
  187. bpr := NewBatchPerResourceLogs("attr_key", consumertest.NewErr(err))
  188. assert.Equal(t, err, bpr.ConsumeLogs(context.Background(), inBatch))
  189. }
  190. func TestSplitLogsSameResource(t *testing.T) {
  191. inBatch := plog.NewLogs()
  192. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "same_attr_val", "1")
  193. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "same_attr_val", "1")
  194. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "same_attr_val", "1")
  195. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "same_attr_val", "1")
  196. expected := plog.NewLogs()
  197. inBatch.CopyTo(expected)
  198. sink := new(consumertest.LogsSink)
  199. bpr := NewBatchPerResourceLogs("same_attr_val", sink)
  200. assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
  201. outBatches := sink.AllLogs()
  202. require.Len(t, outBatches, 1)
  203. assert.Equal(t, expected, outBatches[0])
  204. }
  205. func TestSplitLogsIntoDifferentBatches(t *testing.T) {
  206. inBatch := plog.NewLogs()
  207. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
  208. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "2")
  209. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "3")
  210. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "4")
  211. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "1")
  212. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "2")
  213. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "3")
  214. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", "4")
  215. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "diff_attr_key", "1")
  216. expected := plog.NewLogs()
  217. inBatch.CopyTo(expected)
  218. sink := new(consumertest.LogsSink)
  219. bpr := NewBatchPerResourceLogs("attr_key", sink)
  220. assert.NoError(t, bpr.ConsumeLogs(context.Background(), inBatch))
  221. outBatches := sink.AllLogs()
  222. require.Len(t, outBatches, 5)
  223. sortLogs(outBatches, "attr_key")
  224. assert.Equal(t, newLogs(expected.ResourceLogs().At(8)), outBatches[0])
  225. assert.Equal(t, newLogs(expected.ResourceLogs().At(0), expected.ResourceLogs().At(4)), outBatches[1])
  226. assert.Equal(t, newLogs(expected.ResourceLogs().At(1), expected.ResourceLogs().At(5)), outBatches[2])
  227. assert.Equal(t, newLogs(expected.ResourceLogs().At(2), expected.ResourceLogs().At(6)), outBatches[3])
  228. assert.Equal(t, newLogs(expected.ResourceLogs().At(3), expected.ResourceLogs().At(7)), outBatches[4])
  229. }
  230. func newTraces(rss ...ptrace.ResourceSpans) ptrace.Traces {
  231. td := ptrace.NewTraces()
  232. for _, rs := range rss {
  233. rs.CopyTo(td.ResourceSpans().AppendEmpty())
  234. }
  235. return td
  236. }
  237. func sortTraces(tds []ptrace.Traces, attrKey string) {
  238. sort.Slice(tds, func(i, j int) bool {
  239. valI := ""
  240. if av, ok := tds[i].ResourceSpans().At(0).Resource().Attributes().Get(attrKey); ok {
  241. valI = av.Str()
  242. }
  243. valJ := ""
  244. if av, ok := tds[j].ResourceSpans().At(0).Resource().Attributes().Get(attrKey); ok {
  245. valJ = av.Str()
  246. }
  247. return valI < valJ
  248. })
  249. }
  250. func fillResourceSpans(rs ptrace.ResourceSpans, key string, val string) {
  251. rs.Resource().Attributes().PutStr(key, val)
  252. rs.Resource().Attributes().PutInt("__other_key__", 123)
  253. ils := rs.ScopeSpans().AppendEmpty()
  254. firstSpan := ils.Spans().AppendEmpty()
  255. firstSpan.SetName("first-span")
  256. firstSpan.SetTraceID(pcommon.TraceID([16]byte{byte(rand.Int())}))
  257. secondSpan := ils.Spans().AppendEmpty()
  258. secondSpan.SetName("second-span")
  259. secondSpan.SetTraceID(pcommon.TraceID([16]byte{byte(rand.Int())}))
  260. }
  261. func newMetrics(rms ...pmetric.ResourceMetrics) pmetric.Metrics {
  262. md := pmetric.NewMetrics()
  263. for _, rm := range rms {
  264. rm.CopyTo(md.ResourceMetrics().AppendEmpty())
  265. }
  266. return md
  267. }
  268. func sortMetrics(tds []pmetric.Metrics, attrKey string) {
  269. sort.Slice(tds, func(i, j int) bool {
  270. valI := ""
  271. if av, ok := tds[i].ResourceMetrics().At(0).Resource().Attributes().Get(attrKey); ok {
  272. valI = av.Str()
  273. }
  274. valJ := ""
  275. if av, ok := tds[j].ResourceMetrics().At(0).Resource().Attributes().Get(attrKey); ok {
  276. valJ = av.Str()
  277. }
  278. return valI < valJ
  279. })
  280. }
  281. func fillResourceMetrics(rs pmetric.ResourceMetrics, key string, val string) {
  282. rs.Resource().Attributes().PutStr(key, val)
  283. rs.Resource().Attributes().PutInt("__other_key__", 123)
  284. ils := rs.ScopeMetrics().AppendEmpty()
  285. firstMetric := ils.Metrics().AppendEmpty()
  286. firstMetric.SetName("first-metric")
  287. firstMetric.SetEmptyGauge()
  288. secondMetric := ils.Metrics().AppendEmpty()
  289. secondMetric.SetName("second-metric")
  290. secondMetric.SetEmptySum()
  291. }
  292. func newLogs(rls ...plog.ResourceLogs) plog.Logs {
  293. ld := plog.NewLogs()
  294. for _, rl := range rls {
  295. rl.CopyTo(ld.ResourceLogs().AppendEmpty())
  296. }
  297. return ld
  298. }
  299. func sortLogs(tds []plog.Logs, attrKey string) {
  300. sort.Slice(tds, func(i, j int) bool {
  301. valI := ""
  302. if av, ok := tds[i].ResourceLogs().At(0).Resource().Attributes().Get(attrKey); ok {
  303. valI = av.Str()
  304. }
  305. valJ := ""
  306. if av, ok := tds[j].ResourceLogs().At(0).Resource().Attributes().Get(attrKey); ok {
  307. valJ = av.Str()
  308. }
  309. return valI < valJ
  310. })
  311. }
  312. func fillResourceLogs(rs plog.ResourceLogs, key string, val string) {
  313. rs.Resource().Attributes().PutStr(key, val)
  314. rs.Resource().Attributes().PutInt("__other_key__", 123)
  315. ils := rs.ScopeLogs().AppendEmpty()
  316. firstLogRecord := ils.LogRecords().AppendEmpty()
  317. firstLogRecord.SetFlags(plog.LogRecordFlags(rand.Int31()))
  318. secondLogRecord := ils.LogRecords().AppendEmpty()
  319. secondLogRecord.SetFlags(plog.LogRecordFlags(rand.Int31()))
  320. }
  321. func BenchmarkBatchPerResourceTraces(b *testing.B) {
  322. inBatch := ptrace.NewTraces()
  323. rss := inBatch.ResourceSpans()
  324. rss.EnsureCapacity(64)
  325. for i := 0; i < 64; i++ {
  326. fillResourceSpans(rss.AppendEmpty(), "attr_key", strconv.Itoa(i%8))
  327. }
  328. bpr := NewBatchPerResourceTraces("attr_key", consumertest.NewNop())
  329. b.ReportAllocs()
  330. b.ResetTimer()
  331. for n := 0; n < b.N; n++ {
  332. if err := bpr.ConsumeTraces(context.Background(), inBatch); err != nil {
  333. b.Fail()
  334. }
  335. }
  336. }
  337. func BenchmarkBatchPerResourceMetrics(b *testing.B) {
  338. inBatch := pmetric.NewMetrics()
  339. inBatch.ResourceMetrics().EnsureCapacity(64)
  340. for i := 0; i < 64; i++ {
  341. fillResourceMetrics(inBatch.ResourceMetrics().AppendEmpty(), "attr_key", strconv.Itoa(i%8))
  342. }
  343. bpr := NewBatchPerResourceMetrics("attr_key", consumertest.NewNop())
  344. b.ReportAllocs()
  345. b.ResetTimer()
  346. for n := 0; n < b.N; n++ {
  347. if err := bpr.ConsumeMetrics(context.Background(), inBatch); err != nil {
  348. b.Fail()
  349. }
  350. }
  351. }
  352. func BenchmarkBatchPerResourceLogs(b *testing.B) {
  353. inBatch := plog.NewLogs()
  354. inBatch.ResourceLogs().EnsureCapacity(64)
  355. for i := 0; i < 64; i++ {
  356. fillResourceLogs(inBatch.ResourceLogs().AppendEmpty(), "attr_key", strconv.Itoa(i%8))
  357. }
  358. bpr := NewBatchPerResourceLogs("attr_key", consumertest.NewNop())
  359. b.ReportAllocs()
  360. b.ResetTimer()
  361. for n := 0; n < b.N; n++ {
  362. if err := bpr.ConsumeLogs(context.Background(), inBatch); err != nil {
  363. b.Fail()
  364. }
  365. }
  366. }