123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
- import (
- "errors"
- "sync"
- "time"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
- )
- // Notes on garbage collection (gc):
- //
- // Job-level gc:
- // The Prometheus receiver will likely execute in a long running service whose lifetime may exceed
- // the lifetimes of many of the jobs that it is collecting from. In order to keep the JobsMap from
- // leaking memory for entries of no-longer existing jobs, the JobsMap needs to remove entries that
- // haven't been accessed for a long period of time.
- //
- // Timeseries-level gc:
- // Some jobs that the Prometheus receiver is collecting from may export timeseries based on metrics
- // from other jobs (e.g. cAdvisor). In order to keep the timeseriesMap from leaking memory for entries
- // of no-longer existing jobs, the timeseriesMap for each job needs to remove entries that haven't
- // been accessed for a long period of time.
- //
- // The gc strategy uses a standard mark-and-sweep approach - each time a timeseriesMap is accessed,
- // it is marked. Similarly, each time a timeseriesInfo is accessed, it is also marked.
- //
- // At the end of each JobsMap.get(), if the last time the JobsMap was gc'd exceeds the 'gcInterval',
- // the JobsMap is locked and any timeseriesMaps that are unmarked are removed from the JobsMap
- // otherwise the timeseriesMap is gc'd
- //
- // The gc for the timeseriesMap is straightforward - the map is locked and, for each timeseriesInfo
- // in the map, if it has not been marked, it is removed otherwise it is unmarked.
- //
- // Alternative Strategies
- // 1. If the job-level gc doesn't run often enough, or runs too often, a separate go routine can
- // be spawned at JobMap creation time that gc's at periodic intervals. This approach potentially
- // adds more contention and latency to each scrape so the current approach is used. Note that
- // the go routine will need to be cancelled upon Shutdown().
- // 2. If the gc of each timeseriesMap during the gc of the JobsMap causes too much contention,
- // the gc of timeseriesMaps can be moved to the end of MetricsAdjuster().AdjustMetricSlice(). This
- // approach requires adding 'lastGC' Time and (potentially) a gcInterval duration to
- // timeseriesMap so the current approach is used instead.
- // timeseriesInfo contains the information necessary to adjust from the initial point and to detect resets.
- type timeseriesInfo struct {
- mark bool
- number numberInfo
- histogram histogramInfo
- summary summaryInfo
- }
- type numberInfo struct {
- startTime pcommon.Timestamp
- previousValue float64
- }
- type histogramInfo struct {
- startTime pcommon.Timestamp
- previousCount uint64
- previousSum float64
- }
- type summaryInfo struct {
- startTime pcommon.Timestamp
- previousCount uint64
- previousSum float64
- }
- type timeseriesKey struct {
- name string
- attributes [16]byte
- aggTemporality pmetric.AggregationTemporality
- }
- // timeseriesMap maps from a timeseries instance (metric * label values) to the timeseries info for
- // the instance.
- type timeseriesMap struct {
- sync.RWMutex
- // The mutex is used to protect access to the member fields. It is acquired for the entirety of
- // AdjustMetricSlice() and also acquired by gc().
- mark bool
- tsiMap map[timeseriesKey]*timeseriesInfo
- }
- // Get the timeseriesInfo for the timeseries associated with the metric and label values.
- func (tsm *timeseriesMap) get(metric pmetric.Metric, kv pcommon.Map) (*timeseriesInfo, bool) {
- // This should only be invoked be functions called (directly or indirectly) by AdjustMetricSlice().
- // The lock protecting tsm.tsiMap is acquired there.
- name := metric.Name()
- key := timeseriesKey{
- name: name,
- attributes: getAttributesSignature(kv),
- }
- if metric.Type() == pmetric.MetricTypeHistogram {
- // There are 2 types of Histograms whose aggregation temporality needs distinguishing:
- // * CumulativeHistogram
- // * GaugeHistogram
- key.aggTemporality = metric.Histogram().AggregationTemporality()
- }
- tsm.mark = true
- tsi, ok := tsm.tsiMap[key]
- if !ok {
- tsi = ×eriesInfo{}
- tsm.tsiMap[key] = tsi
- }
- tsi.mark = true
- return tsi, ok
- }
- // Create a unique string signature for attributes values sorted by attribute keys.
- func getAttributesSignature(m pcommon.Map) [16]byte {
- clearedMap := pcommon.NewMap()
- m.Range(func(k string, attrValue pcommon.Value) bool {
- value := attrValue.Str()
- if value != "" {
- clearedMap.PutStr(k, value)
- }
- return true
- })
- return pdatautil.MapHash(clearedMap)
- }
- // Remove timeseries that have aged out.
- func (tsm *timeseriesMap) gc() {
- tsm.Lock()
- defer tsm.Unlock()
- // this shouldn't happen under the current gc() strategy
- if !tsm.mark {
- return
- }
- for ts, tsi := range tsm.tsiMap {
- if !tsi.mark {
- delete(tsm.tsiMap, ts)
- } else {
- tsi.mark = false
- }
- }
- tsm.mark = false
- }
- func newTimeseriesMap() *timeseriesMap {
- return ×eriesMap{mark: true, tsiMap: map[timeseriesKey]*timeseriesInfo{}}
- }
- // JobsMap maps from a job instance to a map of timeseries instances for the job.
- type JobsMap struct {
- sync.RWMutex
- // The mutex is used to protect access to the member fields. It is acquired for most of
- // get() and also acquired by gc().
- gcInterval time.Duration
- lastGC time.Time
- jobsMap map[string]*timeseriesMap
- }
- // NewJobsMap creates a new (empty) JobsMap.
- func NewJobsMap(gcInterval time.Duration) *JobsMap {
- return &JobsMap{gcInterval: gcInterval, lastGC: time.Now(), jobsMap: make(map[string]*timeseriesMap)}
- }
- // Remove jobs and timeseries that have aged out.
- func (jm *JobsMap) gc() {
- jm.Lock()
- defer jm.Unlock()
- // once the structure is locked, confirm that gc() is still necessary
- if time.Since(jm.lastGC) > jm.gcInterval {
- for sig, tsm := range jm.jobsMap {
- tsm.RLock()
- tsmNotMarked := !tsm.mark
- // take a read lock here, no need to get a full lock as we have a lock on the JobsMap
- tsm.RUnlock()
- if tsmNotMarked {
- delete(jm.jobsMap, sig)
- } else {
- // a full lock will be obtained in here, if required.
- tsm.gc()
- }
- }
- jm.lastGC = time.Now()
- }
- }
- func (jm *JobsMap) maybeGC() {
- // speculatively check if gc() is necessary, recheck once the structure is locked
- jm.RLock()
- defer jm.RUnlock()
- if time.Since(jm.lastGC) > jm.gcInterval {
- go jm.gc()
- }
- }
- func (jm *JobsMap) get(job, instance string) *timeseriesMap {
- sig := job + ":" + instance
- // a read locke is taken here as we will not need to modify jobsMap if the target timeseriesMap is available.
- jm.RLock()
- tsm, ok := jm.jobsMap[sig]
- jm.RUnlock()
- defer jm.maybeGC()
- if ok {
- return tsm
- }
- jm.Lock()
- defer jm.Unlock()
- // Now that we've got an exclusive lock, check once more to ensure an entry wasn't created in the interim
- // and then create a new timeseriesMap if required.
- tsm2, ok2 := jm.jobsMap[sig]
- if ok2 {
- return tsm2
- }
- tsm2 = newTimeseriesMap()
- jm.jobsMap[sig] = tsm2
- return tsm2
- }
- type MetricsAdjuster interface {
- AdjustMetrics(metrics pmetric.Metrics) error
- }
- // initialPointAdjuster takes a map from a metric instance to the initial point in the metrics instance
- // and provides AdjustMetricSlice, which takes a sequence of metrics and adjust their start times based on
- // the initial points.
- type initialPointAdjuster struct {
- jobsMap *JobsMap
- logger *zap.Logger
- useCreatedMetric bool
- }
- // NewInitialPointAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on the initial received points.
- func NewInitialPointAdjuster(logger *zap.Logger, gcInterval time.Duration, useCreatedMetric bool) MetricsAdjuster {
- return &initialPointAdjuster{
- jobsMap: NewJobsMap(gcInterval),
- logger: logger,
- useCreatedMetric: useCreatedMetric,
- }
- }
- // AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and
- // previous points in the timeseriesMap.
- func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
- // By contract metrics will have at least 1 data point, so for sure will have at least one ResourceMetrics.
- job, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceName)
- if !found {
- return errors.New("adjusting metrics without job")
- }
- instance, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
- if !found {
- return errors.New("adjusting metrics without instance")
- }
- tsm := a.jobsMap.get(job.Str(), instance.Str())
- // The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
- // nothing else can modify the data used for adjustment.
- tsm.Lock()
- defer tsm.Unlock()
- for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
- rm := metrics.ResourceMetrics().At(i)
- for j := 0; j < rm.ScopeMetrics().Len(); j++ {
- ilm := rm.ScopeMetrics().At(j)
- for k := 0; k < ilm.Metrics().Len(); k++ {
- metric := ilm.Metrics().At(k)
- switch dataType := metric.Type(); dataType {
- case pmetric.MetricTypeGauge:
- // gauges don't need to be adjusted so no additional processing is necessary
- case pmetric.MetricTypeHistogram:
- a.adjustMetricHistogram(tsm, metric)
- case pmetric.MetricTypeSummary:
- a.adjustMetricSummary(tsm, metric)
- case pmetric.MetricTypeSum:
- a.adjustMetricSum(tsm, metric)
- case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram:
- fallthrough
- default:
- // this shouldn't happen
- a.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))
- }
- }
- }
- }
- return nil
- }
- func (a *initialPointAdjuster) adjustMetricHistogram(tsm *timeseriesMap, current pmetric.Metric) {
- histogram := current.Histogram()
- if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
- // Only dealing with CumulativeDistributions.
- return
- }
- currentPoints := histogram.DataPoints()
- for i := 0; i < currentPoints.Len(); i++ {
- currentDist := currentPoints.At(i)
- // start timestamp was set from _created
- if a.useCreatedMetric &&
- !currentDist.Flags().NoRecordedValue() &&
- currentDist.StartTimestamp() < currentDist.Timestamp() {
- continue
- }
- tsi, found := tsm.get(current, currentDist.Attributes())
- if !found {
- // initialize everything.
- tsi.histogram.startTime = currentDist.StartTimestamp()
- tsi.histogram.previousCount = currentDist.Count()
- tsi.histogram.previousSum = currentDist.Sum()
- continue
- }
- if currentDist.Flags().NoRecordedValue() {
- // TODO: Investigate why this does not reset.
- currentDist.SetStartTimestamp(tsi.histogram.startTime)
- continue
- }
- if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum {
- // reset re-initialize everything.
- tsi.histogram.startTime = currentDist.StartTimestamp()
- tsi.histogram.previousCount = currentDist.Count()
- tsi.histogram.previousSum = currentDist.Sum()
- continue
- }
- // Update only previous values.
- tsi.histogram.previousCount = currentDist.Count()
- tsi.histogram.previousSum = currentDist.Sum()
- currentDist.SetStartTimestamp(tsi.histogram.startTime)
- }
- }
- func (a *initialPointAdjuster) adjustMetricSum(tsm *timeseriesMap, current pmetric.Metric) {
- currentPoints := current.Sum().DataPoints()
- for i := 0; i < currentPoints.Len(); i++ {
- currentSum := currentPoints.At(i)
- // start timestamp was set from _created
- if a.useCreatedMetric &&
- !currentSum.Flags().NoRecordedValue() &&
- currentSum.StartTimestamp() < currentSum.Timestamp() {
- continue
- }
- tsi, found := tsm.get(current, currentSum.Attributes())
- if !found {
- // initialize everything.
- tsi.number.startTime = currentSum.StartTimestamp()
- tsi.number.previousValue = currentSum.DoubleValue()
- continue
- }
- if currentSum.Flags().NoRecordedValue() {
- // TODO: Investigate why this does not reset.
- currentSum.SetStartTimestamp(tsi.number.startTime)
- continue
- }
- if currentSum.DoubleValue() < tsi.number.previousValue {
- // reset re-initialize everything.
- tsi.number.startTime = currentSum.StartTimestamp()
- tsi.number.previousValue = currentSum.DoubleValue()
- continue
- }
- // Update only previous values.
- tsi.number.previousValue = currentSum.DoubleValue()
- currentSum.SetStartTimestamp(tsi.number.startTime)
- }
- }
- func (a *initialPointAdjuster) adjustMetricSummary(tsm *timeseriesMap, current pmetric.Metric) {
- currentPoints := current.Summary().DataPoints()
- for i := 0; i < currentPoints.Len(); i++ {
- currentSummary := currentPoints.At(i)
- // start timestamp was set from _created
- if a.useCreatedMetric &&
- !currentSummary.Flags().NoRecordedValue() &&
- currentSummary.StartTimestamp() < currentSummary.Timestamp() {
- continue
- }
- tsi, found := tsm.get(current, currentSummary.Attributes())
- if !found {
- // initialize everything.
- tsi.summary.startTime = currentSummary.StartTimestamp()
- tsi.summary.previousCount = currentSummary.Count()
- tsi.summary.previousSum = currentSummary.Sum()
- continue
- }
- if currentSummary.Flags().NoRecordedValue() {
- // TODO: Investigate why this does not reset.
- currentSummary.SetStartTimestamp(tsi.summary.startTime)
- continue
- }
- if (currentSummary.Count() != 0 &&
- tsi.summary.previousCount != 0 &&
- currentSummary.Count() < tsi.summary.previousCount) ||
- (currentSummary.Sum() != 0 &&
- tsi.summary.previousSum != 0 &&
- currentSummary.Sum() < tsi.summary.previousSum) {
- // reset re-initialize everything.
- tsi.summary.startTime = currentSummary.StartTimestamp()
- tsi.summary.previousCount = currentSummary.Count()
- tsi.summary.previousSum = currentSummary.Sum()
- continue
- }
- // Update only previous values.
- tsi.summary.previousCount = currentSummary.Count()
- tsi.summary.previousSum = currentSummary.Sum()
- currentSummary.SetStartTimestamp(tsi.summary.startTime)
- }
- }
|