123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package golden // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
- import (
- "sort"
- "time"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
- )
- func normalizeTimestamps(metrics pmetric.Metrics) {
- rms := metrics.ResourceMetrics()
- for i := 0; i < rms.Len(); i++ {
- for j := 0; j < rms.At(i).ScopeMetrics().Len(); j++ {
- for k := 0; k < rms.At(i).ScopeMetrics().At(j).Metrics().Len(); k++ {
- m := rms.At(i).ScopeMetrics().At(j).Metrics().At(k)
- //exhaustive:enforce
- switch m.Type() {
- case pmetric.MetricTypeGauge:
- normalizeDataPointSlice(dataPointSlice[pmetric.NumberDataPoint](m.Gauge().DataPoints()))
- case pmetric.MetricTypeSum:
- normalizeDataPointSlice(dataPointSlice[pmetric.NumberDataPoint](m.Sum().DataPoints()))
- case pmetric.MetricTypeHistogram:
- normalizeDataPointSlice(dataPointSlice[pmetric.HistogramDataPoint](m.Histogram().DataPoints()))
- case pmetric.MetricTypeExponentialHistogram:
- normalizeDataPointSlice(dataPointSlice[pmetric.ExponentialHistogramDataPoint](m.ExponentialHistogram().DataPoints()))
- case pmetric.MetricTypeSummary:
- normalizeDataPointSlice(dataPointSlice[pmetric.SummaryDataPoint](m.Summary().DataPoints()))
- }
- }
- }
- }
- }
- // returns a map of the original timestamps with their corresponding normalized values.
- // normalization entails setting nonunique subsequent timestamps to the same value while incrementing unique timestamps by a set value of 1,000,000 ns
- func normalizeTimeSeries(timeSeries []pcommon.Timestamp) map[pcommon.Timestamp]pcommon.Timestamp {
- sort.Slice(timeSeries, func(i, j int) bool {
- return func(t1, t2 pcommon.Timestamp) int {
- if t1 < t2 {
- return -1
- } else if t1 > t2 {
- return 1
- }
- return 0
- }(timeSeries[i], timeSeries[j]) < 0
- })
- // normalize values
- normalizedTs := make(map[pcommon.Timestamp]pcommon.Timestamp)
- count := 0
- for _, v := range timeSeries {
- if v == 0 {
- continue
- }
- if _, ok := normalizedTs[v]; !ok {
- normalizedTs[v] = normalTime(count)
- count++
- }
- }
- return normalizedTs
- }
- func normalTime(timeSeriesIndex int) pcommon.Timestamp {
- return pcommon.NewTimestampFromTime(time.Unix(0, 0).Add(time.Duration(timeSeriesIndex+1) * 1000000 * time.Nanosecond))
- }
- type dataPointSlice[T dataPoint] interface {
- Len() int
- At(i int) T
- }
- type dataPoint interface {
- pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint | pmetric.SummaryDataPoint
- Attributes() pcommon.Map
- StartTimestamp() pcommon.Timestamp
- SetStartTimestamp(pcommon.Timestamp)
- Timestamp() pcommon.Timestamp
- SetTimestamp(pcommon.Timestamp)
- }
- func normalizeDataPointSlice[T dataPoint](dps dataPointSlice[T]) {
- attrCache := make(map[[16]byte]bool)
- for i := 0; i < dps.Len(); i++ {
- attrHash := pdatautil.MapHash(dps.At(i).Attributes())
- if attrCache[attrHash] {
- continue
- }
- timeSeries := []pcommon.Timestamp{dps.At(i).StartTimestamp(), dps.At(i).Timestamp()}
- // Find any other data points in the time series
- for j := i + 1; j < dps.Len(); j++ {
- if pdatautil.MapHash(dps.At(j).Attributes()) != attrHash {
- continue
- }
- timeSeries = append(timeSeries, dps.At(j).StartTimestamp(), dps.At(j).Timestamp())
- }
- normalizedTs := normalizeTimeSeries(timeSeries)
- for k := 0; k < dps.Len(); k++ {
- if pdatautil.MapHash(dps.At(k).Attributes()) != attrHash {
- continue
- }
- dps.At(k).SetTimestamp(normalizedTs[dps.At(k).Timestamp()])
- dps.At(k).SetStartTimestamp(normalizedTs[dps.At(k).StartTimestamp()])
- }
- attrCache[attrHash] = true
- }
- }
|