metricfamily.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
  4. import (
  5. "encoding/hex"
  6. "fmt"
  7. "math"
  8. "sort"
  9. "strings"
  10. "github.com/prometheus/prometheus/model/exemplar"
  11. "github.com/prometheus/prometheus/model/labels"
  12. "github.com/prometheus/prometheus/model/value"
  13. "github.com/prometheus/prometheus/scrape"
  14. "go.opentelemetry.io/collector/pdata/pcommon"
  15. "go.opentelemetry.io/collector/pdata/pmetric"
  16. "go.uber.org/zap"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
  18. )
  19. const (
  20. traceIDKey = "trace_id"
  21. spanIDKey = "span_id"
  22. )
  23. type metricFamily struct {
  24. mtype pmetric.MetricType
  25. // isMonotonic only applies to sums
  26. isMonotonic bool
  27. groups map[uint64]*metricGroup
  28. name string
  29. metadata *scrape.MetricMetadata
  30. groupOrders []*metricGroup
  31. }
  32. // metricGroup, represents a single metric of a metric family. for example a histogram metric is usually represent by
  33. // a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
  34. // simple types like counter and gauge, each data point is a group of itself
  35. type metricGroup struct {
  36. mtype pmetric.MetricType
  37. ts int64
  38. ls labels.Labels
  39. count float64
  40. hasCount bool
  41. sum float64
  42. hasSum bool
  43. created float64
  44. value float64
  45. complexValue []*dataPoint
  46. exemplars pmetric.ExemplarSlice
  47. }
  48. func newMetricFamily(metricName string, mc scrape.MetricMetadataStore, logger *zap.Logger) *metricFamily {
  49. metadata, familyName := metadataForMetric(metricName, mc)
  50. mtype, isMonotonic := convToMetricType(metadata.Type)
  51. if mtype == pmetric.MetricTypeEmpty {
  52. logger.Debug(fmt.Sprintf("Unknown-typed metric : %s %+v", metricName, metadata))
  53. }
  54. return &metricFamily{
  55. mtype: mtype,
  56. isMonotonic: isMonotonic,
  57. groups: make(map[uint64]*metricGroup),
  58. name: familyName,
  59. metadata: metadata,
  60. }
  61. }
  62. // includesMetric returns true if the metric is part of the family
  63. func (mf *metricFamily) includesMetric(metricName string) bool {
  64. if mf.mtype != pmetric.MetricTypeGauge {
  65. // If it is a merged family type, then it should match the
  66. // family name when suffixes are trimmed.
  67. return normalizeMetricName(metricName) == mf.name
  68. }
  69. // If it isn't a merged type, the metricName and family name should match
  70. return metricName == mf.name
  71. }
  72. func (mg *metricGroup) sortPoints() {
  73. sort.Slice(mg.complexValue, func(i, j int) bool {
  74. return mg.complexValue[i].boundary < mg.complexValue[j].boundary
  75. })
  76. }
  77. func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) {
  78. if !mg.hasCount {
  79. return
  80. }
  81. mg.sortPoints()
  82. bucketCount := len(mg.complexValue) + 1
  83. // if the final bucket is +Inf, we ignore it
  84. if bucketCount > 1 && mg.complexValue[bucketCount-2].boundary == math.Inf(1) {
  85. bucketCount--
  86. }
  87. // for OTLP the bounds won't include +inf
  88. bounds := make([]float64, bucketCount-1)
  89. bucketCounts := make([]uint64, bucketCount)
  90. var adjustedCount float64
  91. pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)
  92. for i := 0; i < bucketCount-1; i++ {
  93. bounds[i] = mg.complexValue[i].boundary
  94. adjustedCount = mg.complexValue[i].value
  95. // Buckets still need to be sent to know to set them as stale,
  96. // but a staleness NaN converted to uint64 would be an extremely large number.
  97. // Setting to 0 instead.
  98. if pointIsStale {
  99. adjustedCount = 0
  100. } else if i != 0 {
  101. adjustedCount -= mg.complexValue[i-1].value
  102. }
  103. bucketCounts[i] = uint64(adjustedCount)
  104. }
  105. // Add the final bucket based on the total count
  106. adjustedCount = mg.count
  107. if pointIsStale {
  108. adjustedCount = 0
  109. } else if bucketCount > 1 {
  110. adjustedCount -= mg.complexValue[bucketCount-2].value
  111. }
  112. bucketCounts[bucketCount-1] = uint64(adjustedCount)
  113. point := dest.AppendEmpty()
  114. if pointIsStale {
  115. point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
  116. } else {
  117. point.SetCount(uint64(mg.count))
  118. if mg.hasSum {
  119. point.SetSum(mg.sum)
  120. }
  121. }
  122. point.ExplicitBounds().FromRaw(bounds)
  123. point.BucketCounts().FromRaw(bucketCounts)
  124. // The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
  125. tsNanos := timestampFromMs(mg.ts)
  126. if mg.created != 0 {
  127. point.SetStartTimestamp(timestampFromFloat64(mg.created))
  128. } else {
  129. // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
  130. point.SetStartTimestamp(tsNanos)
  131. }
  132. point.SetTimestamp(tsNanos)
  133. populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())
  134. mg.setExemplars(point.Exemplars())
  135. }
  136. func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) {
  137. if mg == nil {
  138. return
  139. }
  140. if mg.exemplars.Len() > 0 {
  141. mg.exemplars.MoveAndAppendTo(exemplars)
  142. }
  143. }
  144. func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice) {
  145. // expecting count to be provided, however, in the following two cases, they can be missed.
  146. // 1. data is corrupted
  147. // 2. ignored by startValue evaluation
  148. if !mg.hasCount {
  149. return
  150. }
  151. mg.sortPoints()
  152. point := dest.AppendEmpty()
  153. pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)
  154. if pointIsStale {
  155. point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
  156. } else {
  157. if mg.hasSum {
  158. point.SetSum(mg.sum)
  159. }
  160. point.SetCount(uint64(mg.count))
  161. }
  162. quantileValues := point.QuantileValues()
  163. for _, p := range mg.complexValue {
  164. quantile := quantileValues.AppendEmpty()
  165. // Quantiles still need to be sent to know to set them as stale,
  166. // but a staleness NaN converted to uint64 would be an extremely large number.
  167. // By not setting the quantile value, it will default to 0.
  168. if !pointIsStale {
  169. quantile.SetValue(p.value)
  170. }
  171. quantile.SetQuantile(p.boundary)
  172. }
  173. // Based on the summary description from https://prometheus.io/docs/concepts/metric_types/#summary
  174. // the quantiles are calculated over a sliding time window, however, the count is the total count of
  175. // observations and the corresponding sum is a sum of all observed values, thus the sum and count used
  176. // at the global level of the metricspb.SummaryValue
  177. // The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
  178. tsNanos := timestampFromMs(mg.ts)
  179. point.SetTimestamp(tsNanos)
  180. if mg.created != 0 {
  181. point.SetStartTimestamp(timestampFromFloat64(mg.created))
  182. } else {
  183. // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
  184. point.SetStartTimestamp(tsNanos)
  185. }
  186. populateAttributes(pmetric.MetricTypeSummary, mg.ls, point.Attributes())
  187. }
  188. func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice) {
  189. tsNanos := timestampFromMs(mg.ts)
  190. point := dest.AppendEmpty()
  191. // gauge/undefined types have no start time.
  192. if mg.mtype == pmetric.MetricTypeSum {
  193. if mg.created != 0 {
  194. point.SetStartTimestamp(timestampFromFloat64(mg.created))
  195. } else {
  196. // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
  197. point.SetStartTimestamp(tsNanos)
  198. }
  199. }
  200. point.SetTimestamp(tsNanos)
  201. if value.IsStaleNaN(mg.value) {
  202. point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
  203. } else {
  204. point.SetDoubleValue(mg.value)
  205. }
  206. populateAttributes(pmetric.MetricTypeGauge, mg.ls, point.Attributes())
  207. mg.setExemplars(point.Exemplars())
  208. }
  209. func populateAttributes(mType pmetric.MetricType, ls labels.Labels, dest pcommon.Map) {
  210. dest.EnsureCapacity(ls.Len())
  211. names := getSortedNotUsefulLabels(mType)
  212. j := 0
  213. for i := range ls {
  214. for j < len(names) && names[j] < ls[i].Name {
  215. j++
  216. }
  217. if j < len(names) && ls[i].Name == names[j] {
  218. continue
  219. }
  220. if ls[i].Value == "" {
  221. // empty label values should be omitted
  222. continue
  223. }
  224. dest.PutStr(ls[i].Name, ls[i].Value)
  225. }
  226. }
  227. func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Labels, ts int64) *metricGroup {
  228. mg, ok := mf.groups[groupKey]
  229. if !ok {
  230. mg = &metricGroup{
  231. mtype: mf.mtype,
  232. ts: ts,
  233. ls: ls,
  234. exemplars: pmetric.NewExemplarSlice(),
  235. }
  236. mf.groups[groupKey] = mg
  237. // maintaining data insertion order is helpful to generate stable/reproducible metric output
  238. mf.groupOrders = append(mf.groupOrders, mg)
  239. }
  240. return mg
  241. }
  242. func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, v float64) error {
  243. mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t)
  244. if mg.ts != t {
  245. return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)
  246. }
  247. switch mf.mtype {
  248. case pmetric.MetricTypeHistogram, pmetric.MetricTypeSummary:
  249. switch {
  250. case strings.HasSuffix(metricName, metricsSuffixSum):
  251. mg.sum = v
  252. mg.hasSum = true
  253. case strings.HasSuffix(metricName, metricsSuffixCount):
  254. // always use the timestamp from count, because is the only required field for histograms and summaries.
  255. mg.ts = t
  256. mg.count = v
  257. mg.hasCount = true
  258. case strings.HasSuffix(metricName, metricSuffixCreated):
  259. mg.created = v
  260. default:
  261. boundary, err := getBoundary(mf.mtype, ls)
  262. if err != nil {
  263. return err
  264. }
  265. mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary})
  266. }
  267. case pmetric.MetricTypeSum:
  268. if strings.HasSuffix(metricName, metricSuffixCreated) {
  269. mg.created = v
  270. } else {
  271. mg.value = v
  272. }
  273. case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram:
  274. fallthrough
  275. default:
  276. mg.value = v
  277. }
  278. return nil
  279. }
  280. func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes bool) {
  281. metric := pmetric.NewMetric()
  282. // Trims type and unit suffixes from metric name
  283. name := mf.name
  284. if trimSuffixes {
  285. name = prometheus.TrimPromSuffixes(name, mf.mtype, mf.metadata.Unit)
  286. }
  287. metric.SetName(name)
  288. metric.SetDescription(mf.metadata.Help)
  289. metric.SetUnit(prometheus.UnitWordToUCUM(mf.metadata.Unit))
  290. var pointCount int
  291. switch mf.mtype {
  292. case pmetric.MetricTypeHistogram:
  293. histogram := metric.SetEmptyHistogram()
  294. histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
  295. hdpL := histogram.DataPoints()
  296. for _, mg := range mf.groupOrders {
  297. mg.toDistributionPoint(hdpL)
  298. }
  299. pointCount = hdpL.Len()
  300. case pmetric.MetricTypeSummary:
  301. summary := metric.SetEmptySummary()
  302. sdpL := summary.DataPoints()
  303. for _, mg := range mf.groupOrders {
  304. mg.toSummaryPoint(sdpL)
  305. }
  306. pointCount = sdpL.Len()
  307. case pmetric.MetricTypeSum:
  308. sum := metric.SetEmptySum()
  309. sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
  310. sum.SetIsMonotonic(mf.isMonotonic)
  311. sdpL := sum.DataPoints()
  312. for _, mg := range mf.groupOrders {
  313. mg.toNumberDataPoint(sdpL)
  314. }
  315. pointCount = sdpL.Len()
  316. case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram:
  317. fallthrough
  318. default: // Everything else should be set to a Gauge.
  319. gauge := metric.SetEmptyGauge()
  320. gdpL := gauge.DataPoints()
  321. for _, mg := range mf.groupOrders {
  322. mg.toNumberDataPoint(gdpL)
  323. }
  324. pointCount = gdpL.Len()
  325. }
  326. if pointCount == 0 {
  327. return
  328. }
  329. metric.MoveTo(metrics.AppendEmpty())
  330. }
  331. func (mf *metricFamily) addExemplar(seriesRef uint64, e exemplar.Exemplar) {
  332. mg := mf.groups[seriesRef]
  333. if mg == nil {
  334. return
  335. }
  336. es := mg.exemplars
  337. convertExemplar(e, es.AppendEmpty())
  338. }
  339. func convertExemplar(pe exemplar.Exemplar, e pmetric.Exemplar) {
  340. e.SetTimestamp(timestampFromMs(pe.Ts))
  341. e.SetDoubleValue(pe.Value)
  342. e.FilteredAttributes().EnsureCapacity(len(pe.Labels))
  343. for _, lb := range pe.Labels {
  344. switch strings.ToLower(lb.Name) {
  345. case traceIDKey:
  346. var tid [16]byte
  347. err := decodeAndCopyToLowerBytes(tid[:], []byte(lb.Value))
  348. if err == nil {
  349. e.SetTraceID(tid)
  350. } else {
  351. e.FilteredAttributes().PutStr(lb.Name, lb.Value)
  352. }
  353. case spanIDKey:
  354. var sid [8]byte
  355. err := decodeAndCopyToLowerBytes(sid[:], []byte(lb.Value))
  356. if err == nil {
  357. e.SetSpanID(sid)
  358. } else {
  359. e.FilteredAttributes().PutStr(lb.Name, lb.Value)
  360. }
  361. default:
  362. e.FilteredAttributes().PutStr(lb.Name, lb.Value)
  363. }
  364. }
  365. }
  366. /*
  367. decodeAndCopyToLowerBytes copies src to dst on lower bytes instead of higher
  368. 1. If len(src) > len(dst) -> copy first len(dst) bytes as it is. Example -> src = []byte{0xab,0xcd,0xef,0xgh,0xij}, dst = [2]byte, result dst = [2]byte{0xab, 0xcd}
  369. 2. If len(src) = len(dst) -> copy src to dst as it is
  370. 3. If len(src) < len(dst) -> prepend required 0s and then add src to dst. Example -> src = []byte{0xab, 0xcd}, dst = [8]byte, result dst = [8]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd}
  371. */
  372. func decodeAndCopyToLowerBytes(dst []byte, src []byte) error {
  373. var err error
  374. decodedLen := hex.DecodedLen(len(src))
  375. if decodedLen >= len(dst) {
  376. _, err = hex.Decode(dst, src[:hex.EncodedLen(len(dst))])
  377. } else {
  378. _, err = hex.Decode(dst[len(dst)-decodedLen:], src)
  379. }
  380. return err
  381. }