accumulator.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package prometheusexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"
  4. import (
  5. "fmt"
  6. "sort"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/prometheus/common/model"
  11. "go.opentelemetry.io/collector/pdata/pcommon"
  12. "go.opentelemetry.io/collector/pdata/pmetric"
  13. "go.uber.org/zap"
  14. )
  15. type accumulatedValue struct {
  16. // value contains a metric with exactly one aggregated datapoint.
  17. value pmetric.Metric
  18. // resourceAttrs contain the resource attributes. They are used to output instance and job labels.
  19. resourceAttrs pcommon.Map
  20. // updated indicates when metric was last changed.
  21. updated time.Time
  22. scope pcommon.InstrumentationScope
  23. }
  24. // accumulator stores aggragated values of incoming metrics
  25. type accumulator interface {
  26. // Accumulate stores aggragated metric values
  27. Accumulate(resourceMetrics pmetric.ResourceMetrics) (processed int)
  28. // Collect returns a slice with relevant aggregated metrics and their resource attributes.
  29. // The number or metrics and attributes returned will be the same.
  30. Collect() (metrics []pmetric.Metric, resourceAttrs []pcommon.Map)
  31. }
  32. // LastValueAccumulator keeps last value for accumulated metrics
  33. type lastValueAccumulator struct {
  34. logger *zap.Logger
  35. registeredMetrics sync.Map
  36. // metricExpiration contains duration for which metric
  37. // should be served after it was updated
  38. metricExpiration time.Duration
  39. }
  40. // NewAccumulator returns LastValueAccumulator
  41. func newAccumulator(logger *zap.Logger, metricExpiration time.Duration) accumulator {
  42. return &lastValueAccumulator{
  43. logger: logger,
  44. metricExpiration: metricExpiration,
  45. }
  46. }
  47. // Accumulate stores one datapoint per metric
  48. func (a *lastValueAccumulator) Accumulate(rm pmetric.ResourceMetrics) (n int) {
  49. now := time.Now()
  50. ilms := rm.ScopeMetrics()
  51. resourceAttrs := rm.Resource().Attributes()
  52. for i := 0; i < ilms.Len(); i++ {
  53. ilm := ilms.At(i)
  54. metrics := ilm.Metrics()
  55. for j := 0; j < metrics.Len(); j++ {
  56. n += a.addMetric(metrics.At(j), ilm.Scope(), resourceAttrs, now)
  57. }
  58. }
  59. return
  60. }
  61. func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) int {
  62. a.logger.Debug(fmt.Sprintf("accumulating metric: %s", metric.Name()))
  63. switch metric.Type() {
  64. case pmetric.MetricTypeGauge:
  65. return a.accumulateGauge(metric, il, resourceAttrs, now)
  66. case pmetric.MetricTypeSum:
  67. return a.accumulateSum(metric, il, resourceAttrs, now)
  68. case pmetric.MetricTypeHistogram:
  69. return a.accumulateDoubleHistogram(metric, il, resourceAttrs, now)
  70. case pmetric.MetricTypeSummary:
  71. return a.accumulateSummary(metric, il, resourceAttrs, now)
  72. default:
  73. a.logger.With(
  74. zap.String("data_type", string(metric.Type())),
  75. zap.String("metric_name", metric.Name()),
  76. ).Error("failed to translate metric")
  77. }
  78. return 0
  79. }
  80. func (a *lastValueAccumulator) accumulateSummary(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
  81. dps := metric.Summary().DataPoints()
  82. for i := 0; i < dps.Len(); i++ {
  83. ip := dps.At(i)
  84. signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
  85. if ip.Flags().NoRecordedValue() {
  86. a.registeredMetrics.Delete(signature)
  87. return 0
  88. }
  89. v, ok := a.registeredMetrics.Load(signature)
  90. stalePoint := ok &&
  91. ip.Timestamp().AsTime().Before(v.(*accumulatedValue).value.Summary().DataPoints().At(0).Timestamp().AsTime())
  92. if stalePoint {
  93. // Only keep this datapoint if it has a later timestamp.
  94. continue
  95. }
  96. m := copyMetricMetadata(metric)
  97. ip.CopyTo(m.SetEmptySummary().DataPoints().AppendEmpty())
  98. a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
  99. n++
  100. }
  101. return n
  102. }
  103. func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
  104. dps := metric.Gauge().DataPoints()
  105. for i := 0; i < dps.Len(); i++ {
  106. ip := dps.At(i)
  107. signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
  108. if ip.Flags().NoRecordedValue() {
  109. a.registeredMetrics.Delete(signature)
  110. return 0
  111. }
  112. v, ok := a.registeredMetrics.Load(signature)
  113. if !ok {
  114. m := copyMetricMetadata(metric)
  115. ip.CopyTo(m.SetEmptyGauge().DataPoints().AppendEmpty())
  116. a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
  117. n++
  118. continue
  119. }
  120. mv := v.(*accumulatedValue)
  121. if ip.Timestamp().AsTime().Before(mv.value.Gauge().DataPoints().At(0).Timestamp().AsTime()) {
  122. // only keep datapoint with latest timestamp
  123. continue
  124. }
  125. m := copyMetricMetadata(metric)
  126. ip.CopyTo(m.SetEmptyGauge().DataPoints().AppendEmpty())
  127. a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
  128. n++
  129. }
  130. return
  131. }
  132. func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
  133. doubleSum := metric.Sum()
  134. // Drop metrics with unspecified aggregations
  135. if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityUnspecified {
  136. return
  137. }
  138. // Drop non-monotonic and non-cumulative metrics
  139. if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && !doubleSum.IsMonotonic() {
  140. return
  141. }
  142. dps := doubleSum.DataPoints()
  143. for i := 0; i < dps.Len(); i++ {
  144. ip := dps.At(i)
  145. signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
  146. if ip.Flags().NoRecordedValue() {
  147. a.registeredMetrics.Delete(signature)
  148. return 0
  149. }
  150. v, ok := a.registeredMetrics.Load(signature)
  151. if !ok {
  152. m := copyMetricMetadata(metric)
  153. m.SetEmptySum().SetIsMonotonic(metric.Sum().IsMonotonic())
  154. m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
  155. ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
  156. a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
  157. n++
  158. continue
  159. }
  160. mv := v.(*accumulatedValue)
  161. if ip.Timestamp().AsTime().Before(mv.value.Sum().DataPoints().At(0).Timestamp().AsTime()) {
  162. // only keep datapoint with latest timestamp
  163. continue
  164. }
  165. // Delta-to-Cumulative
  166. if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).Timestamp() {
  167. ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
  168. switch ip.ValueType() {
  169. case pmetric.NumberDataPointValueTypeInt:
  170. ip.SetIntValue(ip.IntValue() + mv.value.Sum().DataPoints().At(0).IntValue())
  171. case pmetric.NumberDataPointValueTypeDouble:
  172. ip.SetDoubleValue(ip.DoubleValue() + mv.value.Sum().DataPoints().At(0).DoubleValue())
  173. }
  174. }
  175. m := copyMetricMetadata(metric)
  176. m.SetEmptySum().SetIsMonotonic(metric.Sum().IsMonotonic())
  177. m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
  178. ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
  179. a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
  180. n++
  181. }
  182. return
  183. }
  184. func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
  185. doubleHistogram := metric.Histogram()
  186. // Drop metrics with non-cumulative aggregations
  187. if doubleHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
  188. return
  189. }
  190. dps := doubleHistogram.DataPoints()
  191. for i := 0; i < dps.Len(); i++ {
  192. ip := dps.At(i)
  193. signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
  194. if ip.Flags().NoRecordedValue() {
  195. a.registeredMetrics.Delete(signature)
  196. return 0
  197. }
  198. v, ok := a.registeredMetrics.Load(signature)
  199. if !ok {
  200. m := copyMetricMetadata(metric)
  201. ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
  202. a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
  203. n++
  204. continue
  205. }
  206. mv := v.(*accumulatedValue)
  207. if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
  208. // only keep datapoint with latest timestamp
  209. continue
  210. }
  211. m := copyMetricMetadata(metric)
  212. ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
  213. m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
  214. a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
  215. n++
  216. }
  217. return
  218. }
  219. // Collect returns a slice with relevant aggregated metrics and their resource attributes.
  220. func (a *lastValueAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map) {
  221. a.logger.Debug("Accumulator collect called")
  222. var metrics []pmetric.Metric
  223. var resourceAttrs []pcommon.Map
  224. expirationTime := time.Now().Add(-a.metricExpiration)
  225. a.registeredMetrics.Range(func(key, value any) bool {
  226. v := value.(*accumulatedValue)
  227. if expirationTime.After(v.updated) {
  228. a.logger.Debug(fmt.Sprintf("metric expired: %s", v.value.Name()))
  229. a.registeredMetrics.Delete(key)
  230. return true
  231. }
  232. metrics = append(metrics, v.value)
  233. resourceAttrs = append(resourceAttrs, v.resourceAttrs)
  234. return true
  235. })
  236. return metrics, resourceAttrs
  237. }
  238. func timeseriesSignature(ilmName string, metric pmetric.Metric, attributes pcommon.Map, resourceAttrs pcommon.Map) string {
  239. var b strings.Builder
  240. b.WriteString(metric.Type().String())
  241. b.WriteString("*" + ilmName)
  242. b.WriteString("*" + metric.Name())
  243. attrs := make([]string, 0, attributes.Len())
  244. attributes.Range(func(k string, v pcommon.Value) bool {
  245. attrs = append(attrs, k+"*"+v.AsString())
  246. return true
  247. })
  248. sort.Strings(attrs)
  249. b.WriteString("*" + strings.Join(attrs, "*"))
  250. if job, ok := extractJob(resourceAttrs); ok {
  251. b.WriteString("*" + model.JobLabel + "*" + job)
  252. }
  253. if instance, ok := extractInstance(resourceAttrs); ok {
  254. b.WriteString("*" + model.InstanceLabel + "*" + instance)
  255. }
  256. return b.String()
  257. }
  258. func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric {
  259. m := pmetric.NewMetric()
  260. m.SetName(metric.Name())
  261. m.SetDescription(metric.Description())
  262. m.SetUnit(metric.Unit())
  263. return m
  264. }