123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
- import (
- "fmt"
- "math"
- "strconv"
- "time"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.uber.org/zap"
- "golang.org/x/exp/maps"
- aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
- )
- const (
- summaryCountSuffix = "_count"
- summarySumSuffix = "_sum"
- )
- type emfCalculators struct {
- delta aws.MetricCalculator
- summary aws.MetricCalculator
- }
- func calculateSummaryDelta(prev *aws.MetricValue, val any, _ time.Time) (any, bool) {
- metricEntry := val.(summaryMetricEntry)
- summaryDelta := metricEntry.sum
- countDelta := metricEntry.count
- if prev != nil {
- prevSummaryEntry := prev.RawValue.(summaryMetricEntry)
- summaryDelta = metricEntry.sum - prevSummaryEntry.sum
- countDelta = metricEntry.count - prevSummaryEntry.count
- } else {
- return summaryMetricEntry{summaryDelta, countDelta}, false
- }
- return summaryMetricEntry{summaryDelta, countDelta}, true
- }
- // dataPoint represents a processed metric data point
- type dataPoint struct {
- name string
- value any
- labels map[string]string
- timestampMs int64
- }
- // dataPoints is a wrapper interface for:
- // - pmetric.NumberDataPointSlice
- // - pmetric.HistogramDataPointSlice
- // - pmetric.SummaryDataPointSlice
- type dataPoints interface {
- Len() int
- // CalculateDeltaDatapoints calculates the delta datapoint from the DataPointSlice at i-th index
- // for some type (Counter, Summary)
- // dataPoint: the adjusted data point
- // retained: indicates whether the data point is valid for further process
- // NOTE: It is an expensive call as it calculates the metric value.
- CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool, calculators *emfCalculators) (dataPoint []dataPoint, retained bool)
- // IsStaleOrNaN returns true if metric value has NoRecordedValue flag set or if any metric value contains a NaN.
- // When return value is true, IsStaleOrNaN also returns the attributes attached to the metric which can be used for
- // logging purposes.
- IsStaleOrNaN(i int) (bool, pcommon.Map)
- }
- // deltaMetricMetadata contains the metadata required to perform rate/delta calculation
- type deltaMetricMetadata struct {
- adjustToDelta bool
- retainInitialValueForDelta bool
- metricName string
- namespace string
- logGroup string
- logStream string
- }
- // numberDataPointSlice is a wrapper for pmetric.NumberDataPointSlice
- type numberDataPointSlice struct {
- deltaMetricMetadata
- pmetric.NumberDataPointSlice
- }
- // histogramDataPointSlice is a wrapper for pmetric.HistogramDataPointSlice
- type histogramDataPointSlice struct {
- // Todo:(khanhntd) Calculate delta value for count and sum value with histogram
- // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18245
- deltaMetricMetadata
- pmetric.HistogramDataPointSlice
- }
- type exponentialHistogramDataPointSlice struct {
- // TODO: Calculate delta value for count and sum value with exponential histogram
- // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18245
- deltaMetricMetadata
- pmetric.ExponentialHistogramDataPointSlice
- }
- // summaryDataPointSlice is a wrapper for pmetric.SummaryDataPointSlice
- type summaryDataPointSlice struct {
- deltaMetricMetadata
- pmetric.SummaryDataPointSlice
- }
- type summaryMetricEntry struct {
- sum float64
- count uint64
- }
- // CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
- func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
- metric := dps.NumberDataPointSlice.At(i)
- labels := createLabels(metric.Attributes(), instrumentationScopeName)
- timestampMs := unixNanoToMilliseconds(metric.Timestamp())
- var metricVal float64
- switch metric.ValueType() {
- case pmetric.NumberDataPointValueTypeDouble:
- metricVal = metric.DoubleValue()
- case pmetric.NumberDataPointValueTypeInt:
- metricVal = float64(metric.IntValue())
- }
- retained := true
- if dps.adjustToDelta {
- var deltaVal any
- mKey := aws.NewKey(dps.deltaMetricMetadata, labels)
- deltaVal, retained = calculators.delta.Calculate(mKey, metricVal, metric.Timestamp().AsTime())
- // If a delta to the previous data point could not be computed use the current metric value instead
- if !retained && dps.retainInitialValueForDelta {
- retained = true
- deltaVal = metricVal
- }
- if !retained {
- return nil, retained
- }
- // It should not happen in practice that the previous metric value is smaller than the current one.
- // If it happens, we assume that the metric is reset for some reason.
- if deltaVal.(float64) >= 0 {
- metricVal = deltaVal.(float64)
- }
- }
- return []dataPoint{{name: dps.metricName, value: metricVal, labels: labels, timestampMs: timestampMs}}, retained
- }
- func (dps numberDataPointSlice) IsStaleOrNaN(i int) (bool, pcommon.Map) {
- metric := dps.NumberDataPointSlice.At(i)
- if metric.Flags().NoRecordedValue() {
- return true, metric.Attributes()
- }
- if metric.ValueType() == pmetric.NumberDataPointValueTypeDouble {
- return math.IsNaN(metric.DoubleValue()), metric.Attributes()
- }
- return false, pcommon.Map{}
- }
- // CalculateDeltaDatapoints retrieves the HistogramDataPoint at the given index.
- func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
- metric := dps.HistogramDataPointSlice.At(i)
- labels := createLabels(metric.Attributes(), instrumentationScopeName)
- timestamp := unixNanoToMilliseconds(metric.Timestamp())
- return []dataPoint{{
- name: dps.metricName,
- value: &cWMetricStats{
- Count: metric.Count(),
- Sum: metric.Sum(),
- Max: metric.Max(),
- Min: metric.Min(),
- },
- labels: labels,
- timestampMs: timestamp,
- }}, true
- }
- func (dps histogramDataPointSlice) IsStaleOrNaN(i int) (bool, pcommon.Map) {
- metric := dps.HistogramDataPointSlice.At(i)
- if metric.Flags().NoRecordedValue() {
- return true, metric.Attributes()
- }
- if math.IsNaN(metric.Max()) || math.IsNaN(metric.Sum()) || math.IsNaN(metric.Min()) {
- return true, metric.Attributes()
- }
- return false, pcommon.Map{}
- }
- // CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
- func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
- metric := dps.ExponentialHistogramDataPointSlice.At(idx)
- scale := metric.Scale()
- base := math.Pow(2, math.Pow(2, float64(-scale)))
- arrayValues := []float64{}
- arrayCounts := []float64{}
- var bucketBegin float64
- var bucketEnd float64
- // Set mid-point of positive buckets in values/counts array.
- positiveBuckets := metric.Positive()
- positiveOffset := positiveBuckets.Offset()
- positiveBucketCounts := positiveBuckets.BucketCounts()
- bucketBegin = 0
- bucketEnd = 0
- for i := 0; i < positiveBucketCounts.Len(); i++ {
- index := i + int(positiveOffset)
- if bucketBegin == 0 {
- bucketBegin = math.Pow(base, float64(index))
- } else {
- bucketBegin = bucketEnd
- }
- bucketEnd = math.Pow(base, float64(index+1))
- metricVal := (bucketBegin + bucketEnd) / 2
- count := positiveBucketCounts.At(i)
- if count > 0 {
- arrayValues = append(arrayValues, metricVal)
- arrayCounts = append(arrayCounts, float64(count))
- }
- }
- // Set count of zero bucket in values/counts array.
- if metric.ZeroCount() > 0 {
- arrayValues = append(arrayValues, 0)
- arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
- }
- // Set mid-point of negative buckets in values/counts array.
- // According to metrics spec, the value in histogram is expected to be non-negative.
- // https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
- // However, the negative support is defined in metrics data model.
- // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
- // The negative is also supported but only verified with unit test.
- negativeBuckets := metric.Negative()
- negativeOffset := negativeBuckets.Offset()
- negativeBucketCounts := negativeBuckets.BucketCounts()
- bucketBegin = 0
- bucketEnd = 0
- for i := 0; i < negativeBucketCounts.Len(); i++ {
- index := i + int(negativeOffset)
- if bucketEnd == 0 {
- bucketEnd = -math.Pow(base, float64(index))
- } else {
- bucketEnd = bucketBegin
- }
- bucketBegin = -math.Pow(base, float64(index+1))
- metricVal := (bucketBegin + bucketEnd) / 2
- count := negativeBucketCounts.At(i)
- if count > 0 {
- arrayValues = append(arrayValues, metricVal)
- arrayCounts = append(arrayCounts, float64(count))
- }
- }
- return []dataPoint{{
- name: dps.metricName,
- value: &cWMetricHistogram{
- Values: arrayValues,
- Counts: arrayCounts,
- Count: metric.Count(),
- Sum: metric.Sum(),
- Max: metric.Max(),
- Min: metric.Min(),
- },
- labels: createLabels(metric.Attributes(), instrumentationScopeName),
- timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
- }}, true
- }
- func (dps exponentialHistogramDataPointSlice) IsStaleOrNaN(i int) (bool, pcommon.Map) {
- metric := dps.ExponentialHistogramDataPointSlice.At(i)
- if metric.Flags().NoRecordedValue() {
- return true, metric.Attributes()
- }
- if math.IsNaN(metric.Max()) ||
- math.IsNaN(metric.Min()) ||
- math.IsNaN(metric.Sum()) {
- return true, metric.Attributes()
- }
- return false, pcommon.Map{}
- }
- // CalculateDeltaDatapoints retrieves the SummaryDataPoint at the given index and perform calculation with sum and count while retain the quantile value.
- func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool, calculators *emfCalculators) ([]dataPoint, bool) {
- metric := dps.SummaryDataPointSlice.At(i)
- labels := createLabels(metric.Attributes(), instrumentationScopeName)
- timestampMs := unixNanoToMilliseconds(metric.Timestamp())
- sum := metric.Sum()
- count := metric.Count()
- retained := true
- datapoints := []dataPoint{}
- if dps.adjustToDelta {
- var delta any
- mKey := aws.NewKey(dps.deltaMetricMetadata, labels)
- delta, retained = calculators.summary.Calculate(mKey, summaryMetricEntry{sum, count}, metric.Timestamp().AsTime())
- // If a delta to the previous data point could not be computed use the current metric value instead
- if !retained && dps.retainInitialValueForDelta {
- retained = true
- delta = summaryMetricEntry{sum, count}
- }
- if !retained {
- return datapoints, retained
- }
- summaryMetricDelta := delta.(summaryMetricEntry)
- sum = summaryMetricDelta.sum
- count = summaryMetricDelta.count
- }
- if detailedMetrics {
- // Instead of sending metrics as a Statistical Set (contains min,max, count, sum), the emfexporter will enrich the
- // values by sending each quantile values as a datapoint (from quantile 0 ... 1)
- values := metric.QuantileValues()
- datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, summarySumSuffix), value: sum, labels: labels, timestampMs: timestampMs})
- datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, summaryCountSuffix), value: count, labels: labels, timestampMs: timestampMs})
- for i := 0; i < values.Len(); i++ {
- cLabels := maps.Clone(labels)
- quantile := values.At(i)
- cLabels["quantile"] = strconv.FormatFloat(quantile.Quantile(), 'g', -1, 64)
- datapoints = append(datapoints, dataPoint{name: dps.metricName, value: quantile.Value(), labels: cLabels, timestampMs: timestampMs})
- }
- } else {
- metricVal := &cWMetricStats{Count: count, Sum: sum}
- if quantileValues := metric.QuantileValues(); quantileValues.Len() > 0 {
- metricVal.Min = quantileValues.At(0).Value()
- metricVal.Max = quantileValues.At(quantileValues.Len() - 1).Value()
- }
- datapoints = append(datapoints, dataPoint{name: dps.metricName, value: metricVal, labels: labels, timestampMs: timestampMs})
- }
- return datapoints, retained
- }
- func (dps summaryDataPointSlice) IsStaleOrNaN(i int) (bool, pcommon.Map) {
- metric := dps.SummaryDataPointSlice.At(i)
- if metric.Flags().NoRecordedValue() {
- return true, metric.Attributes()
- }
- if math.IsNaN(metric.Sum()) {
- return true, metric.Attributes()
- }
- values := metric.QuantileValues()
- for i := 0; i < values.Len(); i++ {
- quantile := values.At(i)
- if math.IsNaN(quantile.Value()) || math.IsNaN(quantile.Quantile()) {
- return true, metric.Attributes()
- }
- }
- return false, metric.Attributes()
- }
- // createLabels converts OTel AttributesMap attributes to a map
- // and optionally adds in the OTel instrumentation library name
- func createLabels(attributes pcommon.Map, instrLibName string) map[string]string {
- labels := make(map[string]string, attributes.Len()+1)
- attributes.Range(func(k string, v pcommon.Value) bool {
- labels[k] = v.AsString()
- return true
- })
- // Add OTel instrumentation lib name as an additional label if it is defined
- if instrLibName != "" {
- labels[oTellibDimensionKey] = instrLibName
- }
- return labels
- }
- // getDataPoints retrieves data points from OT Metric.
- func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Logger) dataPoints {
- metricMetadata := deltaMetricMetadata{
- adjustToDelta: false,
- retainInitialValueForDelta: metadata.retainInitialValueForDelta,
- metricName: pmd.Name(),
- namespace: metadata.namespace,
- logGroup: metadata.logGroup,
- logStream: metadata.logStream,
- }
- var dps dataPoints
- //exhaustive:enforce
- switch pmd.Type() {
- case pmetric.MetricTypeGauge:
- metric := pmd.Gauge()
- dps = numberDataPointSlice{
- metricMetadata,
- metric.DataPoints(),
- }
- case pmetric.MetricTypeSum:
- metric := pmd.Sum()
- metricMetadata.adjustToDelta = metric.AggregationTemporality() == pmetric.AggregationTemporalityCumulative
- dps = numberDataPointSlice{
- metricMetadata,
- metric.DataPoints(),
- }
- case pmetric.MetricTypeHistogram:
- metric := pmd.Histogram()
- dps = histogramDataPointSlice{
- metricMetadata,
- metric.DataPoints(),
- }
- case pmetric.MetricTypeExponentialHistogram:
- metric := pmd.ExponentialHistogram()
- dps = exponentialHistogramDataPointSlice{
- metricMetadata,
- metric.DataPoints(),
- }
- case pmetric.MetricTypeSummary:
- metric := pmd.Summary()
- // For summaries coming from the prometheus receiver, the sum and count are cumulative, whereas for summaries
- // coming from other sources, e.g. SDK, the sum and count are delta by being accumulated and reset periodically.
- // In order to ensure metrics are sent as deltas, we check the receiver attribute (which can be injected by
- // attribute processor) from resource metrics. If it exists, and equals to prometheus, the sum and count will be
- // converted.
- // For more information: https://github.com/open-telemetry/opentelemetry-collector/blob/main/receiver/prometheusreceiver/DESIGN.md#summary
- metricMetadata.adjustToDelta = metadata.receiver == prometheusReceiver
- dps = summaryDataPointSlice{
- metricMetadata,
- metric.DataPoints(),
- }
- default:
- logger.Warn("Unhandled metric data type.",
- zap.String("DataType", pmd.Type().String()),
- zap.String("Name", pmd.Name()),
- zap.String("Unit", pmd.Unit()),
- )
- }
- return dps
- }
|