123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
- import (
- "encoding/hex"
- "fmt"
- "math"
- "sort"
- "strings"
- "github.com/prometheus/prometheus/model/exemplar"
- "github.com/prometheus/prometheus/model/labels"
- "github.com/prometheus/prometheus/model/value"
- "github.com/prometheus/prometheus/scrape"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
- )
- const (
- traceIDKey = "trace_id"
- spanIDKey = "span_id"
- )
- type metricFamily struct {
- mtype pmetric.MetricType
- // isMonotonic only applies to sums
- isMonotonic bool
- groups map[uint64]*metricGroup
- name string
- metadata *scrape.MetricMetadata
- groupOrders []*metricGroup
- }
- // metricGroup, represents a single metric of a metric family. for example a histogram metric is usually represent by
- // a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
- // simple types like counter and gauge, each data point is a group of itself
- type metricGroup struct {
- mtype pmetric.MetricType
- ts int64
- ls labels.Labels
- count float64
- hasCount bool
- sum float64
- hasSum bool
- created float64
- value float64
- complexValue []*dataPoint
- exemplars pmetric.ExemplarSlice
- }
- func newMetricFamily(metricName string, mc scrape.MetricMetadataStore, logger *zap.Logger) *metricFamily {
- metadata, familyName := metadataForMetric(metricName, mc)
- mtype, isMonotonic := convToMetricType(metadata.Type)
- if mtype == pmetric.MetricTypeEmpty {
- logger.Debug(fmt.Sprintf("Unknown-typed metric : %s %+v", metricName, metadata))
- }
- return &metricFamily{
- mtype: mtype,
- isMonotonic: isMonotonic,
- groups: make(map[uint64]*metricGroup),
- name: familyName,
- metadata: metadata,
- }
- }
- // includesMetric returns true if the metric is part of the family
- func (mf *metricFamily) includesMetric(metricName string) bool {
- if mf.mtype != pmetric.MetricTypeGauge {
- // If it is a merged family type, then it should match the
- // family name when suffixes are trimmed.
- return normalizeMetricName(metricName) == mf.name
- }
- // If it isn't a merged type, the metricName and family name should match
- return metricName == mf.name
- }
- func (mg *metricGroup) sortPoints() {
- sort.Slice(mg.complexValue, func(i, j int) bool {
- return mg.complexValue[i].boundary < mg.complexValue[j].boundary
- })
- }
- func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) {
- if !mg.hasCount {
- return
- }
- mg.sortPoints()
- bucketCount := len(mg.complexValue) + 1
- // if the final bucket is +Inf, we ignore it
- if bucketCount > 1 && mg.complexValue[bucketCount-2].boundary == math.Inf(1) {
- bucketCount--
- }
- // for OTLP the bounds won't include +inf
- bounds := make([]float64, bucketCount-1)
- bucketCounts := make([]uint64, bucketCount)
- var adjustedCount float64
- pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)
- for i := 0; i < bucketCount-1; i++ {
- bounds[i] = mg.complexValue[i].boundary
- adjustedCount = mg.complexValue[i].value
- // Buckets still need to be sent to know to set them as stale,
- // but a staleness NaN converted to uint64 would be an extremely large number.
- // Setting to 0 instead.
- if pointIsStale {
- adjustedCount = 0
- } else if i != 0 {
- adjustedCount -= mg.complexValue[i-1].value
- }
- bucketCounts[i] = uint64(adjustedCount)
- }
- // Add the final bucket based on the total count
- adjustedCount = mg.count
- if pointIsStale {
- adjustedCount = 0
- } else if bucketCount > 1 {
- adjustedCount -= mg.complexValue[bucketCount-2].value
- }
- bucketCounts[bucketCount-1] = uint64(adjustedCount)
- point := dest.AppendEmpty()
- if pointIsStale {
- point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
- } else {
- point.SetCount(uint64(mg.count))
- if mg.hasSum {
- point.SetSum(mg.sum)
- }
- }
- point.ExplicitBounds().FromRaw(bounds)
- point.BucketCounts().FromRaw(bucketCounts)
- // The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
- tsNanos := timestampFromMs(mg.ts)
- if mg.created != 0 {
- point.SetStartTimestamp(timestampFromFloat64(mg.created))
- } else {
- // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
- point.SetStartTimestamp(tsNanos)
- }
- point.SetTimestamp(tsNanos)
- populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())
- mg.setExemplars(point.Exemplars())
- }
- func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) {
- if mg == nil {
- return
- }
- if mg.exemplars.Len() > 0 {
- mg.exemplars.MoveAndAppendTo(exemplars)
- }
- }
- func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice) {
- // expecting count to be provided, however, in the following two cases, they can be missed.
- // 1. data is corrupted
- // 2. ignored by startValue evaluation
- if !mg.hasCount {
- return
- }
- mg.sortPoints()
- point := dest.AppendEmpty()
- pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)
- if pointIsStale {
- point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
- } else {
- if mg.hasSum {
- point.SetSum(mg.sum)
- }
- point.SetCount(uint64(mg.count))
- }
- quantileValues := point.QuantileValues()
- for _, p := range mg.complexValue {
- quantile := quantileValues.AppendEmpty()
- // Quantiles still need to be sent to know to set them as stale,
- // but a staleness NaN converted to uint64 would be an extremely large number.
- // By not setting the quantile value, it will default to 0.
- if !pointIsStale {
- quantile.SetValue(p.value)
- }
- quantile.SetQuantile(p.boundary)
- }
- // Based on the summary description from https://prometheus.io/docs/concepts/metric_types/#summary
- // the quantiles are calculated over a sliding time window, however, the count is the total count of
- // observations and the corresponding sum is a sum of all observed values, thus the sum and count used
- // at the global level of the metricspb.SummaryValue
- // The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
- tsNanos := timestampFromMs(mg.ts)
- point.SetTimestamp(tsNanos)
- if mg.created != 0 {
- point.SetStartTimestamp(timestampFromFloat64(mg.created))
- } else {
- // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
- point.SetStartTimestamp(tsNanos)
- }
- populateAttributes(pmetric.MetricTypeSummary, mg.ls, point.Attributes())
- }
- func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice) {
- tsNanos := timestampFromMs(mg.ts)
- point := dest.AppendEmpty()
- // gauge/undefined types have no start time.
- if mg.mtype == pmetric.MetricTypeSum {
- if mg.created != 0 {
- point.SetStartTimestamp(timestampFromFloat64(mg.created))
- } else {
- // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
- point.SetStartTimestamp(tsNanos)
- }
- }
- point.SetTimestamp(tsNanos)
- if value.IsStaleNaN(mg.value) {
- point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
- } else {
- point.SetDoubleValue(mg.value)
- }
- populateAttributes(pmetric.MetricTypeGauge, mg.ls, point.Attributes())
- mg.setExemplars(point.Exemplars())
- }
- func populateAttributes(mType pmetric.MetricType, ls labels.Labels, dest pcommon.Map) {
- dest.EnsureCapacity(ls.Len())
- names := getSortedNotUsefulLabels(mType)
- j := 0
- for i := range ls {
- for j < len(names) && names[j] < ls[i].Name {
- j++
- }
- if j < len(names) && ls[i].Name == names[j] {
- continue
- }
- if ls[i].Value == "" {
- // empty label values should be omitted
- continue
- }
- dest.PutStr(ls[i].Name, ls[i].Value)
- }
- }
- func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Labels, ts int64) *metricGroup {
- mg, ok := mf.groups[groupKey]
- if !ok {
- mg = &metricGroup{
- mtype: mf.mtype,
- ts: ts,
- ls: ls,
- exemplars: pmetric.NewExemplarSlice(),
- }
- mf.groups[groupKey] = mg
- // maintaining data insertion order is helpful to generate stable/reproducible metric output
- mf.groupOrders = append(mf.groupOrders, mg)
- }
- return mg
- }
- func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, v float64) error {
- mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t)
- if mg.ts != t {
- return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)
- }
- switch mf.mtype {
- case pmetric.MetricTypeHistogram, pmetric.MetricTypeSummary:
- switch {
- case strings.HasSuffix(metricName, metricsSuffixSum):
- mg.sum = v
- mg.hasSum = true
- case strings.HasSuffix(metricName, metricsSuffixCount):
- // always use the timestamp from count, because is the only required field for histograms and summaries.
- mg.ts = t
- mg.count = v
- mg.hasCount = true
- case strings.HasSuffix(metricName, metricSuffixCreated):
- mg.created = v
- default:
- boundary, err := getBoundary(mf.mtype, ls)
- if err != nil {
- return err
- }
- mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary})
- }
- case pmetric.MetricTypeSum:
- if strings.HasSuffix(metricName, metricSuffixCreated) {
- mg.created = v
- } else {
- mg.value = v
- }
- case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram:
- fallthrough
- default:
- mg.value = v
- }
- return nil
- }
- func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes bool) {
- metric := pmetric.NewMetric()
- // Trims type and unit suffixes from metric name
- name := mf.name
- if trimSuffixes {
- name = prometheus.TrimPromSuffixes(name, mf.mtype, mf.metadata.Unit)
- }
- metric.SetName(name)
- metric.SetDescription(mf.metadata.Help)
- metric.SetUnit(prometheus.UnitWordToUCUM(mf.metadata.Unit))
- var pointCount int
- switch mf.mtype {
- case pmetric.MetricTypeHistogram:
- histogram := metric.SetEmptyHistogram()
- histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
- hdpL := histogram.DataPoints()
- for _, mg := range mf.groupOrders {
- mg.toDistributionPoint(hdpL)
- }
- pointCount = hdpL.Len()
- case pmetric.MetricTypeSummary:
- summary := metric.SetEmptySummary()
- sdpL := summary.DataPoints()
- for _, mg := range mf.groupOrders {
- mg.toSummaryPoint(sdpL)
- }
- pointCount = sdpL.Len()
- case pmetric.MetricTypeSum:
- sum := metric.SetEmptySum()
- sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
- sum.SetIsMonotonic(mf.isMonotonic)
- sdpL := sum.DataPoints()
- for _, mg := range mf.groupOrders {
- mg.toNumberDataPoint(sdpL)
- }
- pointCount = sdpL.Len()
- case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram:
- fallthrough
- default: // Everything else should be set to a Gauge.
- gauge := metric.SetEmptyGauge()
- gdpL := gauge.DataPoints()
- for _, mg := range mf.groupOrders {
- mg.toNumberDataPoint(gdpL)
- }
- pointCount = gdpL.Len()
- }
- if pointCount == 0 {
- return
- }
- metric.MoveTo(metrics.AppendEmpty())
- }
- func (mf *metricFamily) addExemplar(seriesRef uint64, e exemplar.Exemplar) {
- mg := mf.groups[seriesRef]
- if mg == nil {
- return
- }
- es := mg.exemplars
- convertExemplar(e, es.AppendEmpty())
- }
- func convertExemplar(pe exemplar.Exemplar, e pmetric.Exemplar) {
- e.SetTimestamp(timestampFromMs(pe.Ts))
- e.SetDoubleValue(pe.Value)
- e.FilteredAttributes().EnsureCapacity(len(pe.Labels))
- for _, lb := range pe.Labels {
- switch strings.ToLower(lb.Name) {
- case traceIDKey:
- var tid [16]byte
- err := decodeAndCopyToLowerBytes(tid[:], []byte(lb.Value))
- if err == nil {
- e.SetTraceID(tid)
- } else {
- e.FilteredAttributes().PutStr(lb.Name, lb.Value)
- }
- case spanIDKey:
- var sid [8]byte
- err := decodeAndCopyToLowerBytes(sid[:], []byte(lb.Value))
- if err == nil {
- e.SetSpanID(sid)
- } else {
- e.FilteredAttributes().PutStr(lb.Name, lb.Value)
- }
- default:
- e.FilteredAttributes().PutStr(lb.Name, lb.Value)
- }
- }
- }
- /*
- decodeAndCopyToLowerBytes copies src to dst on lower bytes instead of higher
- 1. If len(src) > len(dst) -> copy first len(dst) bytes as it is. Example -> src = []byte{0xab,0xcd,0xef,0xgh,0xij}, dst = [2]byte, result dst = [2]byte{0xab, 0xcd}
- 2. If len(src) = len(dst) -> copy src to dst as it is
- 3. If len(src) < len(dst) -> prepend required 0s and then add src to dst. Example -> src = []byte{0xab, 0xcd}, dst = [8]byte, result dst = [8]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd}
- */
- func decodeAndCopyToLowerBytes(dst []byte, src []byte) error {
- var err error
- decodedLen := hex.DecodedLen(len(src))
- if decodedLen >= len(dst) {
- _, err = hex.Decode(dst, src[:hex.EncodedLen(len(dst))])
- } else {
- _, err = hex.Decode(dst[len(dst)-decodedLen:], src)
- }
- return err
- }
|