123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package prometheusexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"
- import (
- "fmt"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/prometheus/common/model"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.uber.org/zap"
- )
- type accumulatedValue struct {
- // value contains a metric with exactly one aggregated datapoint.
- value pmetric.Metric
- // resourceAttrs contain the resource attributes. They are used to output instance and job labels.
- resourceAttrs pcommon.Map
- // updated indicates when metric was last changed.
- updated time.Time
- scope pcommon.InstrumentationScope
- }
- // accumulator stores aggragated values of incoming metrics
- type accumulator interface {
- // Accumulate stores aggragated metric values
- Accumulate(resourceMetrics pmetric.ResourceMetrics) (processed int)
- // Collect returns a slice with relevant aggregated metrics and their resource attributes.
- // The number or metrics and attributes returned will be the same.
- Collect() (metrics []pmetric.Metric, resourceAttrs []pcommon.Map)
- }
- // LastValueAccumulator keeps last value for accumulated metrics
- type lastValueAccumulator struct {
- logger *zap.Logger
- registeredMetrics sync.Map
- // metricExpiration contains duration for which metric
- // should be served after it was updated
- metricExpiration time.Duration
- }
- // NewAccumulator returns LastValueAccumulator
- func newAccumulator(logger *zap.Logger, metricExpiration time.Duration) accumulator {
- return &lastValueAccumulator{
- logger: logger,
- metricExpiration: metricExpiration,
- }
- }
- // Accumulate stores one datapoint per metric
- func (a *lastValueAccumulator) Accumulate(rm pmetric.ResourceMetrics) (n int) {
- now := time.Now()
- ilms := rm.ScopeMetrics()
- resourceAttrs := rm.Resource().Attributes()
- for i := 0; i < ilms.Len(); i++ {
- ilm := ilms.At(i)
- metrics := ilm.Metrics()
- for j := 0; j < metrics.Len(); j++ {
- n += a.addMetric(metrics.At(j), ilm.Scope(), resourceAttrs, now)
- }
- }
- return
- }
- func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) int {
- a.logger.Debug(fmt.Sprintf("accumulating metric: %s", metric.Name()))
- switch metric.Type() {
- case pmetric.MetricTypeGauge:
- return a.accumulateGauge(metric, il, resourceAttrs, now)
- case pmetric.MetricTypeSum:
- return a.accumulateSum(metric, il, resourceAttrs, now)
- case pmetric.MetricTypeHistogram:
- return a.accumulateDoubleHistogram(metric, il, resourceAttrs, now)
- case pmetric.MetricTypeSummary:
- return a.accumulateSummary(metric, il, resourceAttrs, now)
- default:
- a.logger.With(
- zap.String("data_type", string(metric.Type())),
- zap.String("metric_name", metric.Name()),
- ).Error("failed to translate metric")
- }
- return 0
- }
- func (a *lastValueAccumulator) accumulateSummary(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
- dps := metric.Summary().DataPoints()
- for i := 0; i < dps.Len(); i++ {
- ip := dps.At(i)
- signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
- if ip.Flags().NoRecordedValue() {
- a.registeredMetrics.Delete(signature)
- return 0
- }
- v, ok := a.registeredMetrics.Load(signature)
- stalePoint := ok &&
- ip.Timestamp().AsTime().Before(v.(*accumulatedValue).value.Summary().DataPoints().At(0).Timestamp().AsTime())
- if stalePoint {
- // Only keep this datapoint if it has a later timestamp.
- continue
- }
- m := copyMetricMetadata(metric)
- ip.CopyTo(m.SetEmptySummary().DataPoints().AppendEmpty())
- a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
- n++
- }
- return n
- }
- func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
- dps := metric.Gauge().DataPoints()
- for i := 0; i < dps.Len(); i++ {
- ip := dps.At(i)
- signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
- if ip.Flags().NoRecordedValue() {
- a.registeredMetrics.Delete(signature)
- return 0
- }
- v, ok := a.registeredMetrics.Load(signature)
- if !ok {
- m := copyMetricMetadata(metric)
- ip.CopyTo(m.SetEmptyGauge().DataPoints().AppendEmpty())
- a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
- n++
- continue
- }
- mv := v.(*accumulatedValue)
- if ip.Timestamp().AsTime().Before(mv.value.Gauge().DataPoints().At(0).Timestamp().AsTime()) {
- // only keep datapoint with latest timestamp
- continue
- }
- m := copyMetricMetadata(metric)
- ip.CopyTo(m.SetEmptyGauge().DataPoints().AppendEmpty())
- a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
- n++
- }
- return
- }
- func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
- doubleSum := metric.Sum()
- // Drop metrics with unspecified aggregations
- if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityUnspecified {
- return
- }
- // Drop non-monotonic and non-cumulative metrics
- if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && !doubleSum.IsMonotonic() {
- return
- }
- dps := doubleSum.DataPoints()
- for i := 0; i < dps.Len(); i++ {
- ip := dps.At(i)
- signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
- if ip.Flags().NoRecordedValue() {
- a.registeredMetrics.Delete(signature)
- return 0
- }
- v, ok := a.registeredMetrics.Load(signature)
- if !ok {
- m := copyMetricMetadata(metric)
- m.SetEmptySum().SetIsMonotonic(metric.Sum().IsMonotonic())
- m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
- ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
- a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
- n++
- continue
- }
- mv := v.(*accumulatedValue)
- if ip.Timestamp().AsTime().Before(mv.value.Sum().DataPoints().At(0).Timestamp().AsTime()) {
- // only keep datapoint with latest timestamp
- continue
- }
- // Delta-to-Cumulative
- if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).Timestamp() {
- ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
- switch ip.ValueType() {
- case pmetric.NumberDataPointValueTypeInt:
- ip.SetIntValue(ip.IntValue() + mv.value.Sum().DataPoints().At(0).IntValue())
- case pmetric.NumberDataPointValueTypeDouble:
- ip.SetDoubleValue(ip.DoubleValue() + mv.value.Sum().DataPoints().At(0).DoubleValue())
- }
- }
- m := copyMetricMetadata(metric)
- m.SetEmptySum().SetIsMonotonic(metric.Sum().IsMonotonic())
- m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
- ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
- a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
- n++
- }
- return
- }
- func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
- doubleHistogram := metric.Histogram()
- // Drop metrics with non-cumulative aggregations
- if doubleHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
- return
- }
- dps := doubleHistogram.DataPoints()
- for i := 0; i < dps.Len(); i++ {
- ip := dps.At(i)
- signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
- if ip.Flags().NoRecordedValue() {
- a.registeredMetrics.Delete(signature)
- return 0
- }
- v, ok := a.registeredMetrics.Load(signature)
- if !ok {
- m := copyMetricMetadata(metric)
- ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
- a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
- n++
- continue
- }
- mv := v.(*accumulatedValue)
- if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
- // only keep datapoint with latest timestamp
- continue
- }
- m := copyMetricMetadata(metric)
- ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
- m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
- a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
- n++
- }
- return
- }
- // Collect returns a slice with relevant aggregated metrics and their resource attributes.
- func (a *lastValueAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map) {
- a.logger.Debug("Accumulator collect called")
- var metrics []pmetric.Metric
- var resourceAttrs []pcommon.Map
- expirationTime := time.Now().Add(-a.metricExpiration)
- a.registeredMetrics.Range(func(key, value any) bool {
- v := value.(*accumulatedValue)
- if expirationTime.After(v.updated) {
- a.logger.Debug(fmt.Sprintf("metric expired: %s", v.value.Name()))
- a.registeredMetrics.Delete(key)
- return true
- }
- metrics = append(metrics, v.value)
- resourceAttrs = append(resourceAttrs, v.resourceAttrs)
- return true
- })
- return metrics, resourceAttrs
- }
- func timeseriesSignature(ilmName string, metric pmetric.Metric, attributes pcommon.Map, resourceAttrs pcommon.Map) string {
- var b strings.Builder
- b.WriteString(metric.Type().String())
- b.WriteString("*" + ilmName)
- b.WriteString("*" + metric.Name())
- attrs := make([]string, 0, attributes.Len())
- attributes.Range(func(k string, v pcommon.Value) bool {
- attrs = append(attrs, k+"*"+v.AsString())
- return true
- })
- sort.Strings(attrs)
- b.WriteString("*" + strings.Join(attrs, "*"))
- if job, ok := extractJob(resourceAttrs); ok {
- b.WriteString("*" + model.JobLabel + "*" + job)
- }
- if instance, ok := extractInstance(resourceAttrs); ok {
- b.WriteString("*" + model.InstanceLabel + "*" + instance)
- }
- return b.String()
- }
- func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric {
- m := pmetric.NewMetric()
- m.SetName(metric.Name())
- m.SetDescription(metric.Description())
- m.SetUnit(metric.Unit())
- return m
- }
|