batchperresourceattr.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package batchperresourceattr // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr"
  4. import (
  5. "context"
  6. "go.opentelemetry.io/collector/consumer"
  7. "go.opentelemetry.io/collector/pdata/plog"
  8. "go.opentelemetry.io/collector/pdata/pmetric"
  9. "go.opentelemetry.io/collector/pdata/ptrace"
  10. "go.uber.org/multierr"
  11. )
  12. type batchTraces struct {
  13. attrKey string
  14. next consumer.Traces
  15. }
  16. func NewBatchPerResourceTraces(attrKey string, next consumer.Traces) consumer.Traces {
  17. return &batchTraces{
  18. attrKey: attrKey,
  19. next: next,
  20. }
  21. }
  22. // Capabilities implements the consumer interface.
  23. func (bt *batchTraces) Capabilities() consumer.Capabilities {
  24. return consumer.Capabilities{MutatesData: false}
  25. }
  26. func (bt *batchTraces) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
  27. rss := td.ResourceSpans()
  28. lenRss := rss.Len()
  29. // If zero or one resource spans just call next.
  30. if lenRss <= 1 {
  31. return bt.next.ConsumeTraces(ctx, td)
  32. }
  33. indicesByAttr := make(map[string][]int)
  34. for i := 0; i < lenRss; i++ {
  35. rs := rss.At(i)
  36. var attrVal string
  37. if attributeValue, ok := rs.Resource().Attributes().Get(bt.attrKey); ok {
  38. attrVal = attributeValue.Str()
  39. }
  40. indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i)
  41. }
  42. // If there is a single attribute value, then call next.
  43. if len(indicesByAttr) <= 1 {
  44. return bt.next.ConsumeTraces(ctx, td)
  45. }
  46. // Build the resource spans for each attribute value using CopyTo and call next for each one.
  47. var errs error
  48. for _, indices := range indicesByAttr {
  49. tracesForAttr := ptrace.NewTraces()
  50. for _, i := range indices {
  51. rs := rss.At(i)
  52. rs.CopyTo(tracesForAttr.ResourceSpans().AppendEmpty())
  53. }
  54. errs = multierr.Append(errs, bt.next.ConsumeTraces(ctx, tracesForAttr))
  55. }
  56. return errs
  57. }
  58. type batchMetrics struct {
  59. attrKey string
  60. next consumer.Metrics
  61. }
  62. func NewBatchPerResourceMetrics(attrKey string, next consumer.Metrics) consumer.Metrics {
  63. return &batchMetrics{
  64. attrKey: attrKey,
  65. next: next,
  66. }
  67. }
  68. // Capabilities implements the consumer interface.
  69. func (bt *batchMetrics) Capabilities() consumer.Capabilities {
  70. return consumer.Capabilities{MutatesData: false}
  71. }
  72. func (bt *batchMetrics) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error {
  73. rms := td.ResourceMetrics()
  74. lenRms := rms.Len()
  75. // If zero or one resource metrics just call next.
  76. if lenRms <= 1 {
  77. return bt.next.ConsumeMetrics(ctx, td)
  78. }
  79. indicesByAttr := make(map[string][]int)
  80. for i := 0; i < lenRms; i++ {
  81. rm := rms.At(i)
  82. var attrVal string
  83. if attributeValue, ok := rm.Resource().Attributes().Get(bt.attrKey); ok {
  84. attrVal = attributeValue.Str()
  85. }
  86. indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i)
  87. }
  88. // If there is a single attribute value, then call next.
  89. if len(indicesByAttr) <= 1 {
  90. return bt.next.ConsumeMetrics(ctx, td)
  91. }
  92. // Build the resource metrics for each attribute value using CopyTo and call next for each one.
  93. var errs error
  94. for _, indices := range indicesByAttr {
  95. metricsForAttr := pmetric.NewMetrics()
  96. for _, i := range indices {
  97. rm := rms.At(i)
  98. rm.CopyTo(metricsForAttr.ResourceMetrics().AppendEmpty())
  99. }
  100. errs = multierr.Append(errs, bt.next.ConsumeMetrics(ctx, metricsForAttr))
  101. }
  102. return errs
  103. }
  104. type batchLogs struct {
  105. attrKey string
  106. next consumer.Logs
  107. }
  108. func NewBatchPerResourceLogs(attrKey string, next consumer.Logs) consumer.Logs {
  109. return &batchLogs{
  110. attrKey: attrKey,
  111. next: next,
  112. }
  113. }
  114. // Capabilities implements the consumer interface.
  115. func (bt *batchLogs) Capabilities() consumer.Capabilities {
  116. return consumer.Capabilities{MutatesData: false}
  117. }
  118. func (bt *batchLogs) ConsumeLogs(ctx context.Context, td plog.Logs) error {
  119. rls := td.ResourceLogs()
  120. lenRls := rls.Len()
  121. // If zero or one resource logs just call next.
  122. if lenRls <= 1 {
  123. return bt.next.ConsumeLogs(ctx, td)
  124. }
  125. indicesByAttr := make(map[string][]int)
  126. for i := 0; i < lenRls; i++ {
  127. rl := rls.At(i)
  128. var attrVal string
  129. if attributeValue, ok := rl.Resource().Attributes().Get(bt.attrKey); ok {
  130. attrVal = attributeValue.Str()
  131. }
  132. indicesByAttr[attrVal] = append(indicesByAttr[attrVal], i)
  133. }
  134. // If there is a single attribute value, then call next.
  135. if len(indicesByAttr) <= 1 {
  136. return bt.next.ConsumeLogs(ctx, td)
  137. }
  138. // Build the resource logs for each attribute value using CopyTo and call next for each one.
  139. var errs error
  140. for _, indices := range indicesByAttr {
  141. logsForAttr := plog.NewLogs()
  142. for _, i := range indices {
  143. rl := rls.At(i)
  144. rl.CopyTo(logsForAttr.ResourceLogs().AppendEmpty())
  145. }
  146. errs = multierr.Append(errs, bt.next.ConsumeLogs(ctx, logsForAttr))
  147. }
  148. return errs
  149. }