metrics_adjuster.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
  4. import (
  5. "errors"
  6. "sync"
  7. "time"
  8. "go.opentelemetry.io/collector/pdata/pcommon"
  9. "go.opentelemetry.io/collector/pdata/pmetric"
  10. semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
  11. "go.uber.org/zap"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
  13. )
  14. // Notes on garbage collection (gc):
  15. //
  16. // Job-level gc:
  17. // The Prometheus receiver will likely execute in a long running service whose lifetime may exceed
  18. // the lifetimes of many of the jobs that it is collecting from. In order to keep the JobsMap from
  19. // leaking memory for entries of no-longer existing jobs, the JobsMap needs to remove entries that
  20. // haven't been accessed for a long period of time.
  21. //
  22. // Timeseries-level gc:
  23. // Some jobs that the Prometheus receiver is collecting from may export timeseries based on metrics
  24. // from other jobs (e.g. cAdvisor). In order to keep the timeseriesMap from leaking memory for entries
  25. // of no-longer existing jobs, the timeseriesMap for each job needs to remove entries that haven't
  26. // been accessed for a long period of time.
  27. //
  28. // The gc strategy uses a standard mark-and-sweep approach - each time a timeseriesMap is accessed,
  29. // it is marked. Similarly, each time a timeseriesInfo is accessed, it is also marked.
  30. //
  31. // At the end of each JobsMap.get(), if the last time the JobsMap was gc'd exceeds the 'gcInterval',
  32. // the JobsMap is locked and any timeseriesMaps that are unmarked are removed from the JobsMap
  33. // otherwise the timeseriesMap is gc'd
  34. //
  35. // The gc for the timeseriesMap is straightforward - the map is locked and, for each timeseriesInfo
  36. // in the map, if it has not been marked, it is removed otherwise it is unmarked.
  37. //
  38. // Alternative Strategies
  39. // 1. If the job-level gc doesn't run often enough, or runs too often, a separate go routine can
  40. // be spawned at JobMap creation time that gc's at periodic intervals. This approach potentially
  41. // adds more contention and latency to each scrape so the current approach is used. Note that
  42. // the go routine will need to be cancelled upon Shutdown().
  43. // 2. If the gc of each timeseriesMap during the gc of the JobsMap causes too much contention,
  44. // the gc of timeseriesMaps can be moved to the end of MetricsAdjuster().AdjustMetricSlice(). This
  45. // approach requires adding 'lastGC' Time and (potentially) a gcInterval duration to
  46. // timeseriesMap so the current approach is used instead.
  47. // timeseriesInfo contains the information necessary to adjust from the initial point and to detect resets.
  48. type timeseriesInfo struct {
  49. mark bool
  50. number numberInfo
  51. histogram histogramInfo
  52. summary summaryInfo
  53. }
  54. type numberInfo struct {
  55. startTime pcommon.Timestamp
  56. previousValue float64
  57. }
  58. type histogramInfo struct {
  59. startTime pcommon.Timestamp
  60. previousCount uint64
  61. previousSum float64
  62. }
  63. type summaryInfo struct {
  64. startTime pcommon.Timestamp
  65. previousCount uint64
  66. previousSum float64
  67. }
  68. type timeseriesKey struct {
  69. name string
  70. attributes [16]byte
  71. aggTemporality pmetric.AggregationTemporality
  72. }
  73. // timeseriesMap maps from a timeseries instance (metric * label values) to the timeseries info for
  74. // the instance.
  75. type timeseriesMap struct {
  76. sync.RWMutex
  77. // The mutex is used to protect access to the member fields. It is acquired for the entirety of
  78. // AdjustMetricSlice() and also acquired by gc().
  79. mark bool
  80. tsiMap map[timeseriesKey]*timeseriesInfo
  81. }
  82. // Get the timeseriesInfo for the timeseries associated with the metric and label values.
  83. func (tsm *timeseriesMap) get(metric pmetric.Metric, kv pcommon.Map) (*timeseriesInfo, bool) {
  84. // This should only be invoked be functions called (directly or indirectly) by AdjustMetricSlice().
  85. // The lock protecting tsm.tsiMap is acquired there.
  86. name := metric.Name()
  87. key := timeseriesKey{
  88. name: name,
  89. attributes: getAttributesSignature(kv),
  90. }
  91. if metric.Type() == pmetric.MetricTypeHistogram {
  92. // There are 2 types of Histograms whose aggregation temporality needs distinguishing:
  93. // * CumulativeHistogram
  94. // * GaugeHistogram
  95. key.aggTemporality = metric.Histogram().AggregationTemporality()
  96. }
  97. tsm.mark = true
  98. tsi, ok := tsm.tsiMap[key]
  99. if !ok {
  100. tsi = &timeseriesInfo{}
  101. tsm.tsiMap[key] = tsi
  102. }
  103. tsi.mark = true
  104. return tsi, ok
  105. }
  106. // Create a unique string signature for attributes values sorted by attribute keys.
  107. func getAttributesSignature(m pcommon.Map) [16]byte {
  108. clearedMap := pcommon.NewMap()
  109. m.Range(func(k string, attrValue pcommon.Value) bool {
  110. value := attrValue.Str()
  111. if value != "" {
  112. clearedMap.PutStr(k, value)
  113. }
  114. return true
  115. })
  116. return pdatautil.MapHash(clearedMap)
  117. }
  118. // Remove timeseries that have aged out.
  119. func (tsm *timeseriesMap) gc() {
  120. tsm.Lock()
  121. defer tsm.Unlock()
  122. // this shouldn't happen under the current gc() strategy
  123. if !tsm.mark {
  124. return
  125. }
  126. for ts, tsi := range tsm.tsiMap {
  127. if !tsi.mark {
  128. delete(tsm.tsiMap, ts)
  129. } else {
  130. tsi.mark = false
  131. }
  132. }
  133. tsm.mark = false
  134. }
  135. func newTimeseriesMap() *timeseriesMap {
  136. return &timeseriesMap{mark: true, tsiMap: map[timeseriesKey]*timeseriesInfo{}}
  137. }
  138. // JobsMap maps from a job instance to a map of timeseries instances for the job.
  139. type JobsMap struct {
  140. sync.RWMutex
  141. // The mutex is used to protect access to the member fields. It is acquired for most of
  142. // get() and also acquired by gc().
  143. gcInterval time.Duration
  144. lastGC time.Time
  145. jobsMap map[string]*timeseriesMap
  146. }
  147. // NewJobsMap creates a new (empty) JobsMap.
  148. func NewJobsMap(gcInterval time.Duration) *JobsMap {
  149. return &JobsMap{gcInterval: gcInterval, lastGC: time.Now(), jobsMap: make(map[string]*timeseriesMap)}
  150. }
  151. // Remove jobs and timeseries that have aged out.
  152. func (jm *JobsMap) gc() {
  153. jm.Lock()
  154. defer jm.Unlock()
  155. // once the structure is locked, confirm that gc() is still necessary
  156. if time.Since(jm.lastGC) > jm.gcInterval {
  157. for sig, tsm := range jm.jobsMap {
  158. tsm.RLock()
  159. tsmNotMarked := !tsm.mark
  160. // take a read lock here, no need to get a full lock as we have a lock on the JobsMap
  161. tsm.RUnlock()
  162. if tsmNotMarked {
  163. delete(jm.jobsMap, sig)
  164. } else {
  165. // a full lock will be obtained in here, if required.
  166. tsm.gc()
  167. }
  168. }
  169. jm.lastGC = time.Now()
  170. }
  171. }
  172. func (jm *JobsMap) maybeGC() {
  173. // speculatively check if gc() is necessary, recheck once the structure is locked
  174. jm.RLock()
  175. defer jm.RUnlock()
  176. if time.Since(jm.lastGC) > jm.gcInterval {
  177. go jm.gc()
  178. }
  179. }
  180. func (jm *JobsMap) get(job, instance string) *timeseriesMap {
  181. sig := job + ":" + instance
  182. // a read locke is taken here as we will not need to modify jobsMap if the target timeseriesMap is available.
  183. jm.RLock()
  184. tsm, ok := jm.jobsMap[sig]
  185. jm.RUnlock()
  186. defer jm.maybeGC()
  187. if ok {
  188. return tsm
  189. }
  190. jm.Lock()
  191. defer jm.Unlock()
  192. // Now that we've got an exclusive lock, check once more to ensure an entry wasn't created in the interim
  193. // and then create a new timeseriesMap if required.
  194. tsm2, ok2 := jm.jobsMap[sig]
  195. if ok2 {
  196. return tsm2
  197. }
  198. tsm2 = newTimeseriesMap()
  199. jm.jobsMap[sig] = tsm2
  200. return tsm2
  201. }
  202. type MetricsAdjuster interface {
  203. AdjustMetrics(metrics pmetric.Metrics) error
  204. }
  205. // initialPointAdjuster takes a map from a metric instance to the initial point in the metrics instance
  206. // and provides AdjustMetricSlice, which takes a sequence of metrics and adjust their start times based on
  207. // the initial points.
  208. type initialPointAdjuster struct {
  209. jobsMap *JobsMap
  210. logger *zap.Logger
  211. useCreatedMetric bool
  212. }
  213. // NewInitialPointAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on the initial received points.
  214. func NewInitialPointAdjuster(logger *zap.Logger, gcInterval time.Duration, useCreatedMetric bool) MetricsAdjuster {
  215. return &initialPointAdjuster{
  216. jobsMap: NewJobsMap(gcInterval),
  217. logger: logger,
  218. useCreatedMetric: useCreatedMetric,
  219. }
  220. }
  221. // AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and
  222. // previous points in the timeseriesMap.
  223. func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
  224. // By contract metrics will have at least 1 data point, so for sure will have at least one ResourceMetrics.
  225. job, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceName)
  226. if !found {
  227. return errors.New("adjusting metrics without job")
  228. }
  229. instance, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
  230. if !found {
  231. return errors.New("adjusting metrics without instance")
  232. }
  233. tsm := a.jobsMap.get(job.Str(), instance.Str())
  234. // The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
  235. // nothing else can modify the data used for adjustment.
  236. tsm.Lock()
  237. defer tsm.Unlock()
  238. for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
  239. rm := metrics.ResourceMetrics().At(i)
  240. for j := 0; j < rm.ScopeMetrics().Len(); j++ {
  241. ilm := rm.ScopeMetrics().At(j)
  242. for k := 0; k < ilm.Metrics().Len(); k++ {
  243. metric := ilm.Metrics().At(k)
  244. switch dataType := metric.Type(); dataType {
  245. case pmetric.MetricTypeGauge:
  246. // gauges don't need to be adjusted so no additional processing is necessary
  247. case pmetric.MetricTypeHistogram:
  248. a.adjustMetricHistogram(tsm, metric)
  249. case pmetric.MetricTypeSummary:
  250. a.adjustMetricSummary(tsm, metric)
  251. case pmetric.MetricTypeSum:
  252. a.adjustMetricSum(tsm, metric)
  253. case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram:
  254. fallthrough
  255. default:
  256. // this shouldn't happen
  257. a.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))
  258. }
  259. }
  260. }
  261. }
  262. return nil
  263. }
  264. func (a *initialPointAdjuster) adjustMetricHistogram(tsm *timeseriesMap, current pmetric.Metric) {
  265. histogram := current.Histogram()
  266. if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
  267. // Only dealing with CumulativeDistributions.
  268. return
  269. }
  270. currentPoints := histogram.DataPoints()
  271. for i := 0; i < currentPoints.Len(); i++ {
  272. currentDist := currentPoints.At(i)
  273. // start timestamp was set from _created
  274. if a.useCreatedMetric &&
  275. !currentDist.Flags().NoRecordedValue() &&
  276. currentDist.StartTimestamp() < currentDist.Timestamp() {
  277. continue
  278. }
  279. tsi, found := tsm.get(current, currentDist.Attributes())
  280. if !found {
  281. // initialize everything.
  282. tsi.histogram.startTime = currentDist.StartTimestamp()
  283. tsi.histogram.previousCount = currentDist.Count()
  284. tsi.histogram.previousSum = currentDist.Sum()
  285. continue
  286. }
  287. if currentDist.Flags().NoRecordedValue() {
  288. // TODO: Investigate why this does not reset.
  289. currentDist.SetStartTimestamp(tsi.histogram.startTime)
  290. continue
  291. }
  292. if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum {
  293. // reset re-initialize everything.
  294. tsi.histogram.startTime = currentDist.StartTimestamp()
  295. tsi.histogram.previousCount = currentDist.Count()
  296. tsi.histogram.previousSum = currentDist.Sum()
  297. continue
  298. }
  299. // Update only previous values.
  300. tsi.histogram.previousCount = currentDist.Count()
  301. tsi.histogram.previousSum = currentDist.Sum()
  302. currentDist.SetStartTimestamp(tsi.histogram.startTime)
  303. }
  304. }
  305. func (a *initialPointAdjuster) adjustMetricSum(tsm *timeseriesMap, current pmetric.Metric) {
  306. currentPoints := current.Sum().DataPoints()
  307. for i := 0; i < currentPoints.Len(); i++ {
  308. currentSum := currentPoints.At(i)
  309. // start timestamp was set from _created
  310. if a.useCreatedMetric &&
  311. !currentSum.Flags().NoRecordedValue() &&
  312. currentSum.StartTimestamp() < currentSum.Timestamp() {
  313. continue
  314. }
  315. tsi, found := tsm.get(current, currentSum.Attributes())
  316. if !found {
  317. // initialize everything.
  318. tsi.number.startTime = currentSum.StartTimestamp()
  319. tsi.number.previousValue = currentSum.DoubleValue()
  320. continue
  321. }
  322. if currentSum.Flags().NoRecordedValue() {
  323. // TODO: Investigate why this does not reset.
  324. currentSum.SetStartTimestamp(tsi.number.startTime)
  325. continue
  326. }
  327. if currentSum.DoubleValue() < tsi.number.previousValue {
  328. // reset re-initialize everything.
  329. tsi.number.startTime = currentSum.StartTimestamp()
  330. tsi.number.previousValue = currentSum.DoubleValue()
  331. continue
  332. }
  333. // Update only previous values.
  334. tsi.number.previousValue = currentSum.DoubleValue()
  335. currentSum.SetStartTimestamp(tsi.number.startTime)
  336. }
  337. }
  338. func (a *initialPointAdjuster) adjustMetricSummary(tsm *timeseriesMap, current pmetric.Metric) {
  339. currentPoints := current.Summary().DataPoints()
  340. for i := 0; i < currentPoints.Len(); i++ {
  341. currentSummary := currentPoints.At(i)
  342. // start timestamp was set from _created
  343. if a.useCreatedMetric &&
  344. !currentSummary.Flags().NoRecordedValue() &&
  345. currentSummary.StartTimestamp() < currentSummary.Timestamp() {
  346. continue
  347. }
  348. tsi, found := tsm.get(current, currentSummary.Attributes())
  349. if !found {
  350. // initialize everything.
  351. tsi.summary.startTime = currentSummary.StartTimestamp()
  352. tsi.summary.previousCount = currentSummary.Count()
  353. tsi.summary.previousSum = currentSummary.Sum()
  354. continue
  355. }
  356. if currentSummary.Flags().NoRecordedValue() {
  357. // TODO: Investigate why this does not reset.
  358. currentSummary.SetStartTimestamp(tsi.summary.startTime)
  359. continue
  360. }
  361. if (currentSummary.Count() != 0 &&
  362. tsi.summary.previousCount != 0 &&
  363. currentSummary.Count() < tsi.summary.previousCount) ||
  364. (currentSummary.Sum() != 0 &&
  365. tsi.summary.previousSum != 0 &&
  366. currentSummary.Sum() < tsi.summary.previousSum) {
  367. // reset re-initialize everything.
  368. tsi.summary.startTime = currentSummary.StartTimestamp()
  369. tsi.summary.previousCount = currentSummary.Count()
  370. tsi.summary.previousSum = currentSummary.Sum()
  371. continue
  372. }
  373. // Update only previous values.
  374. tsi.summary.previousCount = currentSummary.Count()
  375. tsi.summary.previousSum = currentSummary.Sum()
  376. currentSummary.SetStartTimestamp(tsi.summary.startTime)
  377. }
  378. }