datapoint.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
  4. import (
  5. "fmt"
  6. "math"
  7. "strconv"
  8. "time"
  9. "go.opentelemetry.io/collector/pdata/pcommon"
  10. "go.opentelemetry.io/collector/pdata/pmetric"
  11. "go.uber.org/zap"
  12. "golang.org/x/exp/maps"
  13. aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
  14. )
  15. const (
  16. summaryCountSuffix = "_count"
  17. summarySumSuffix = "_sum"
  18. )
  19. type emfCalculators struct {
  20. delta aws.MetricCalculator
  21. summary aws.MetricCalculator
  22. }
  23. func calculateSummaryDelta(prev *aws.MetricValue, val any, _ time.Time) (any, bool) {
  24. metricEntry := val.(summaryMetricEntry)
  25. summaryDelta := metricEntry.sum
  26. countDelta := metricEntry.count
  27. if prev != nil {
  28. prevSummaryEntry := prev.RawValue.(summaryMetricEntry)
  29. summaryDelta = metricEntry.sum - prevSummaryEntry.sum
  30. countDelta = metricEntry.count - prevSummaryEntry.count
  31. } else {
  32. return summaryMetricEntry{summaryDelta, countDelta}, false
  33. }
  34. return summaryMetricEntry{summaryDelta, countDelta}, true
  35. }
  36. // dataPoint represents a processed metric data point
  37. type dataPoint struct {
  38. name string
  39. value any
  40. labels map[string]string
  41. timestampMs int64
  42. }
  43. // dataPoints is a wrapper interface for:
  44. // - pmetric.NumberDataPointSlice
  45. // - pmetric.HistogramDataPointSlice
  46. // - pmetric.SummaryDataPointSlice
  47. type dataPoints interface {
  48. Len() int
  49. // CalculateDeltaDatapoints calculates the delta datapoint from the DataPointSlice at i-th index
  50. // for some type (Counter, Summary)
  51. // dataPoint: the adjusted data point
  52. // retained: indicates whether the data point is valid for further process
  53. // NOTE: It is an expensive call as it calculates the metric value.
  54. CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool, calculators *emfCalculators) (dataPoint []dataPoint, retained bool)
  55. // IsStaleOrNaN returns true if metric value has NoRecordedValue flag set or if any metric value contains a NaN.
  56. // When return value is true, IsStaleOrNaN also returns the attributes attached to the metric which can be used for
  57. // logging purposes.
  58. IsStaleOrNaN(i int) (bool, pcommon.Map)
  59. }
  60. // deltaMetricMetadata contains the metadata required to perform rate/delta calculation
  61. type deltaMetricMetadata struct {
  62. adjustToDelta bool
  63. retainInitialValueForDelta bool
  64. metricName string
  65. namespace string
  66. logGroup string
  67. logStream string
  68. }
  69. // numberDataPointSlice is a wrapper for pmetric.NumberDataPointSlice
  70. type numberDataPointSlice struct {
  71. deltaMetricMetadata
  72. pmetric.NumberDataPointSlice
  73. }
  74. // histogramDataPointSlice is a wrapper for pmetric.HistogramDataPointSlice
  75. type histogramDataPointSlice struct {
  76. // Todo:(khanhntd) Calculate delta value for count and sum value with histogram
  77. // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18245
  78. deltaMetricMetadata
  79. pmetric.HistogramDataPointSlice
  80. }
  81. type exponentialHistogramDataPointSlice struct {
  82. // TODO: Calculate delta value for count and sum value with exponential histogram
  83. // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18245
  84. deltaMetricMetadata
  85. pmetric.ExponentialHistogramDataPointSlice
  86. }
  87. // summaryDataPointSlice is a wrapper for pmetric.SummaryDataPointSlice
  88. type summaryDataPointSlice struct {
  89. deltaMetricMetadata
  90. pmetric.SummaryDataPointSlice
  91. }
  92. type summaryMetricEntry struct {
  93. sum float64
  94. count uint64
  95. }
  96. // CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
  97. func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
  98. metric := dps.NumberDataPointSlice.At(i)
  99. labels := createLabels(metric.Attributes(), instrumentationScopeName)
  100. timestampMs := unixNanoToMilliseconds(metric.Timestamp())
  101. var metricVal float64
  102. switch metric.ValueType() {
  103. case pmetric.NumberDataPointValueTypeDouble:
  104. metricVal = metric.DoubleValue()
  105. case pmetric.NumberDataPointValueTypeInt:
  106. metricVal = float64(metric.IntValue())
  107. }
  108. retained := true
  109. if dps.adjustToDelta {
  110. var deltaVal any
  111. mKey := aws.NewKey(dps.deltaMetricMetadata, labels)
  112. deltaVal, retained = calculators.delta.Calculate(mKey, metricVal, metric.Timestamp().AsTime())
  113. // If a delta to the previous data point could not be computed use the current metric value instead
  114. if !retained && dps.retainInitialValueForDelta {
  115. retained = true
  116. deltaVal = metricVal
  117. }
  118. if !retained {
  119. return nil, retained
  120. }
  121. // It should not happen in practice that the previous metric value is smaller than the current one.
  122. // If it happens, we assume that the metric is reset for some reason.
  123. if deltaVal.(float64) >= 0 {
  124. metricVal = deltaVal.(float64)
  125. }
  126. }
  127. return []dataPoint{{name: dps.metricName, value: metricVal, labels: labels, timestampMs: timestampMs}}, retained
  128. }
  129. func (dps numberDataPointSlice) IsStaleOrNaN(i int) (bool, pcommon.Map) {
  130. metric := dps.NumberDataPointSlice.At(i)
  131. if metric.Flags().NoRecordedValue() {
  132. return true, metric.Attributes()
  133. }
  134. if metric.ValueType() == pmetric.NumberDataPointValueTypeDouble {
  135. return math.IsNaN(metric.DoubleValue()), metric.Attributes()
  136. }
  137. return false, pcommon.Map{}
  138. }
  139. // CalculateDeltaDatapoints retrieves the HistogramDataPoint at the given index.
  140. func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
  141. metric := dps.HistogramDataPointSlice.At(i)
  142. labels := createLabels(metric.Attributes(), instrumentationScopeName)
  143. timestamp := unixNanoToMilliseconds(metric.Timestamp())
  144. return []dataPoint{{
  145. name: dps.metricName,
  146. value: &cWMetricStats{
  147. Count: metric.Count(),
  148. Sum: metric.Sum(),
  149. Max: metric.Max(),
  150. Min: metric.Min(),
  151. },
  152. labels: labels,
  153. timestampMs: timestamp,
  154. }}, true
  155. }
  156. func (dps histogramDataPointSlice) IsStaleOrNaN(i int) (bool, pcommon.Map) {
  157. metric := dps.HistogramDataPointSlice.At(i)
  158. if metric.Flags().NoRecordedValue() {
  159. return true, metric.Attributes()
  160. }
  161. if math.IsNaN(metric.Max()) || math.IsNaN(metric.Sum()) || math.IsNaN(metric.Min()) {
  162. return true, metric.Attributes()
  163. }
  164. return false, pcommon.Map{}
  165. }
  166. // CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
  167. func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
  168. metric := dps.ExponentialHistogramDataPointSlice.At(idx)
  169. scale := metric.Scale()
  170. base := math.Pow(2, math.Pow(2, float64(-scale)))
  171. arrayValues := []float64{}
  172. arrayCounts := []float64{}
  173. var bucketBegin float64
  174. var bucketEnd float64
  175. // Set mid-point of positive buckets in values/counts array.
  176. positiveBuckets := metric.Positive()
  177. positiveOffset := positiveBuckets.Offset()
  178. positiveBucketCounts := positiveBuckets.BucketCounts()
  179. bucketBegin = 0
  180. bucketEnd = 0
  181. for i := 0; i < positiveBucketCounts.Len(); i++ {
  182. index := i + int(positiveOffset)
  183. if bucketBegin == 0 {
  184. bucketBegin = math.Pow(base, float64(index))
  185. } else {
  186. bucketBegin = bucketEnd
  187. }
  188. bucketEnd = math.Pow(base, float64(index+1))
  189. metricVal := (bucketBegin + bucketEnd) / 2
  190. count := positiveBucketCounts.At(i)
  191. if count > 0 {
  192. arrayValues = append(arrayValues, metricVal)
  193. arrayCounts = append(arrayCounts, float64(count))
  194. }
  195. }
  196. // Set count of zero bucket in values/counts array.
  197. if metric.ZeroCount() > 0 {
  198. arrayValues = append(arrayValues, 0)
  199. arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
  200. }
  201. // Set mid-point of negative buckets in values/counts array.
  202. // According to metrics spec, the value in histogram is expected to be non-negative.
  203. // https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
  204. // However, the negative support is defined in metrics data model.
  205. // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
  206. // The negative is also supported but only verified with unit test.
  207. negativeBuckets := metric.Negative()
  208. negativeOffset := negativeBuckets.Offset()
  209. negativeBucketCounts := negativeBuckets.BucketCounts()
  210. bucketBegin = 0
  211. bucketEnd = 0
  212. for i := 0; i < negativeBucketCounts.Len(); i++ {
  213. index := i + int(negativeOffset)
  214. if bucketEnd == 0 {
  215. bucketEnd = -math.Pow(base, float64(index))
  216. } else {
  217. bucketEnd = bucketBegin
  218. }
  219. bucketBegin = -math.Pow(base, float64(index+1))
  220. metricVal := (bucketBegin + bucketEnd) / 2
  221. count := negativeBucketCounts.At(i)
  222. if count > 0 {
  223. arrayValues = append(arrayValues, metricVal)
  224. arrayCounts = append(arrayCounts, float64(count))
  225. }
  226. }
  227. return []dataPoint{{
  228. name: dps.metricName,
  229. value: &cWMetricHistogram{
  230. Values: arrayValues,
  231. Counts: arrayCounts,
  232. Count: metric.Count(),
  233. Sum: metric.Sum(),
  234. Max: metric.Max(),
  235. Min: metric.Min(),
  236. },
  237. labels: createLabels(metric.Attributes(), instrumentationScopeName),
  238. timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
  239. }}, true
  240. }
  241. func (dps exponentialHistogramDataPointSlice) IsStaleOrNaN(i int) (bool, pcommon.Map) {
  242. metric := dps.ExponentialHistogramDataPointSlice.At(i)
  243. if metric.Flags().NoRecordedValue() {
  244. return true, metric.Attributes()
  245. }
  246. if math.IsNaN(metric.Max()) ||
  247. math.IsNaN(metric.Min()) ||
  248. math.IsNaN(metric.Sum()) {
  249. return true, metric.Attributes()
  250. }
  251. return false, pcommon.Map{}
  252. }
  253. // CalculateDeltaDatapoints retrieves the SummaryDataPoint at the given index and perform calculation with sum and count while retain the quantile value.
  254. func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool, calculators *emfCalculators) ([]dataPoint, bool) {
  255. metric := dps.SummaryDataPointSlice.At(i)
  256. labels := createLabels(metric.Attributes(), instrumentationScopeName)
  257. timestampMs := unixNanoToMilliseconds(metric.Timestamp())
  258. sum := metric.Sum()
  259. count := metric.Count()
  260. retained := true
  261. datapoints := []dataPoint{}
  262. if dps.adjustToDelta {
  263. var delta any
  264. mKey := aws.NewKey(dps.deltaMetricMetadata, labels)
  265. delta, retained = calculators.summary.Calculate(mKey, summaryMetricEntry{sum, count}, metric.Timestamp().AsTime())
  266. // If a delta to the previous data point could not be computed use the current metric value instead
  267. if !retained && dps.retainInitialValueForDelta {
  268. retained = true
  269. delta = summaryMetricEntry{sum, count}
  270. }
  271. if !retained {
  272. return datapoints, retained
  273. }
  274. summaryMetricDelta := delta.(summaryMetricEntry)
  275. sum = summaryMetricDelta.sum
  276. count = summaryMetricDelta.count
  277. }
  278. if detailedMetrics {
  279. // Instead of sending metrics as a Statistical Set (contains min,max, count, sum), the emfexporter will enrich the
  280. // values by sending each quantile values as a datapoint (from quantile 0 ... 1)
  281. values := metric.QuantileValues()
  282. datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, summarySumSuffix), value: sum, labels: labels, timestampMs: timestampMs})
  283. datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, summaryCountSuffix), value: count, labels: labels, timestampMs: timestampMs})
  284. for i := 0; i < values.Len(); i++ {
  285. cLabels := maps.Clone(labels)
  286. quantile := values.At(i)
  287. cLabels["quantile"] = strconv.FormatFloat(quantile.Quantile(), 'g', -1, 64)
  288. datapoints = append(datapoints, dataPoint{name: dps.metricName, value: quantile.Value(), labels: cLabels, timestampMs: timestampMs})
  289. }
  290. } else {
  291. metricVal := &cWMetricStats{Count: count, Sum: sum}
  292. if quantileValues := metric.QuantileValues(); quantileValues.Len() > 0 {
  293. metricVal.Min = quantileValues.At(0).Value()
  294. metricVal.Max = quantileValues.At(quantileValues.Len() - 1).Value()
  295. }
  296. datapoints = append(datapoints, dataPoint{name: dps.metricName, value: metricVal, labels: labels, timestampMs: timestampMs})
  297. }
  298. return datapoints, retained
  299. }
  300. func (dps summaryDataPointSlice) IsStaleOrNaN(i int) (bool, pcommon.Map) {
  301. metric := dps.SummaryDataPointSlice.At(i)
  302. if metric.Flags().NoRecordedValue() {
  303. return true, metric.Attributes()
  304. }
  305. if math.IsNaN(metric.Sum()) {
  306. return true, metric.Attributes()
  307. }
  308. values := metric.QuantileValues()
  309. for i := 0; i < values.Len(); i++ {
  310. quantile := values.At(i)
  311. if math.IsNaN(quantile.Value()) || math.IsNaN(quantile.Quantile()) {
  312. return true, metric.Attributes()
  313. }
  314. }
  315. return false, metric.Attributes()
  316. }
  317. // createLabels converts OTel AttributesMap attributes to a map
  318. // and optionally adds in the OTel instrumentation library name
  319. func createLabels(attributes pcommon.Map, instrLibName string) map[string]string {
  320. labels := make(map[string]string, attributes.Len()+1)
  321. attributes.Range(func(k string, v pcommon.Value) bool {
  322. labels[k] = v.AsString()
  323. return true
  324. })
  325. // Add OTel instrumentation lib name as an additional label if it is defined
  326. if instrLibName != "" {
  327. labels[oTellibDimensionKey] = instrLibName
  328. }
  329. return labels
  330. }
  331. // getDataPoints retrieves data points from OT Metric.
  332. func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Logger) dataPoints {
  333. metricMetadata := deltaMetricMetadata{
  334. adjustToDelta: false,
  335. retainInitialValueForDelta: metadata.retainInitialValueForDelta,
  336. metricName: pmd.Name(),
  337. namespace: metadata.namespace,
  338. logGroup: metadata.logGroup,
  339. logStream: metadata.logStream,
  340. }
  341. var dps dataPoints
  342. //exhaustive:enforce
  343. switch pmd.Type() {
  344. case pmetric.MetricTypeGauge:
  345. metric := pmd.Gauge()
  346. dps = numberDataPointSlice{
  347. metricMetadata,
  348. metric.DataPoints(),
  349. }
  350. case pmetric.MetricTypeSum:
  351. metric := pmd.Sum()
  352. metricMetadata.adjustToDelta = metric.AggregationTemporality() == pmetric.AggregationTemporalityCumulative
  353. dps = numberDataPointSlice{
  354. metricMetadata,
  355. metric.DataPoints(),
  356. }
  357. case pmetric.MetricTypeHistogram:
  358. metric := pmd.Histogram()
  359. dps = histogramDataPointSlice{
  360. metricMetadata,
  361. metric.DataPoints(),
  362. }
  363. case pmetric.MetricTypeExponentialHistogram:
  364. metric := pmd.ExponentialHistogram()
  365. dps = exponentialHistogramDataPointSlice{
  366. metricMetadata,
  367. metric.DataPoints(),
  368. }
  369. case pmetric.MetricTypeSummary:
  370. metric := pmd.Summary()
  371. // For summaries coming from the prometheus receiver, the sum and count are cumulative, whereas for summaries
  372. // coming from other sources, e.g. SDK, the sum and count are delta by being accumulated and reset periodically.
  373. // In order to ensure metrics are sent as deltas, we check the receiver attribute (which can be injected by
  374. // attribute processor) from resource metrics. If it exists, and equals to prometheus, the sum and count will be
  375. // converted.
  376. // For more information: https://github.com/open-telemetry/opentelemetry-collector/blob/main/receiver/prometheusreceiver/DESIGN.md#summary
  377. metricMetadata.adjustToDelta = metadata.receiver == prometheusReceiver
  378. dps = summaryDataPointSlice{
  379. metricMetadata,
  380. metric.DataPoints(),
  381. }
  382. default:
  383. logger.Warn("Unhandled metric data type.",
  384. zap.String("DataType", pmd.Type().String()),
  385. zap.String("Name", pmd.Name()),
  386. zap.String("Unit", pmd.Unit()),
  387. )
  388. }
  389. return dps
  390. }