normalize_timestamps.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package golden // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
  4. import (
  5. "sort"
  6. "time"
  7. "go.opentelemetry.io/collector/pdata/pcommon"
  8. "go.opentelemetry.io/collector/pdata/pmetric"
  9. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
  10. )
  11. func normalizeTimestamps(metrics pmetric.Metrics) {
  12. rms := metrics.ResourceMetrics()
  13. for i := 0; i < rms.Len(); i++ {
  14. for j := 0; j < rms.At(i).ScopeMetrics().Len(); j++ {
  15. for k := 0; k < rms.At(i).ScopeMetrics().At(j).Metrics().Len(); k++ {
  16. m := rms.At(i).ScopeMetrics().At(j).Metrics().At(k)
  17. //exhaustive:enforce
  18. switch m.Type() {
  19. case pmetric.MetricTypeGauge:
  20. normalizeDataPointSlice(dataPointSlice[pmetric.NumberDataPoint](m.Gauge().DataPoints()))
  21. case pmetric.MetricTypeSum:
  22. normalizeDataPointSlice(dataPointSlice[pmetric.NumberDataPoint](m.Sum().DataPoints()))
  23. case pmetric.MetricTypeHistogram:
  24. normalizeDataPointSlice(dataPointSlice[pmetric.HistogramDataPoint](m.Histogram().DataPoints()))
  25. case pmetric.MetricTypeExponentialHistogram:
  26. normalizeDataPointSlice(dataPointSlice[pmetric.ExponentialHistogramDataPoint](m.ExponentialHistogram().DataPoints()))
  27. case pmetric.MetricTypeSummary:
  28. normalizeDataPointSlice(dataPointSlice[pmetric.SummaryDataPoint](m.Summary().DataPoints()))
  29. }
  30. }
  31. }
  32. }
  33. }
  34. // returns a map of the original timestamps with their corresponding normalized values.
  35. // normalization entails setting nonunique subsequent timestamps to the same value while incrementing unique timestamps by a set value of 1,000,000 ns
  36. func normalizeTimeSeries(timeSeries []pcommon.Timestamp) map[pcommon.Timestamp]pcommon.Timestamp {
  37. sort.Slice(timeSeries, func(i, j int) bool {
  38. return func(t1, t2 pcommon.Timestamp) int {
  39. if t1 < t2 {
  40. return -1
  41. } else if t1 > t2 {
  42. return 1
  43. }
  44. return 0
  45. }(timeSeries[i], timeSeries[j]) < 0
  46. })
  47. // normalize values
  48. normalizedTs := make(map[pcommon.Timestamp]pcommon.Timestamp)
  49. count := 0
  50. for _, v := range timeSeries {
  51. if v == 0 {
  52. continue
  53. }
  54. if _, ok := normalizedTs[v]; !ok {
  55. normalizedTs[v] = normalTime(count)
  56. count++
  57. }
  58. }
  59. return normalizedTs
  60. }
  61. func normalTime(timeSeriesIndex int) pcommon.Timestamp {
  62. return pcommon.NewTimestampFromTime(time.Unix(0, 0).Add(time.Duration(timeSeriesIndex+1) * 1000000 * time.Nanosecond))
  63. }
  64. type dataPointSlice[T dataPoint] interface {
  65. Len() int
  66. At(i int) T
  67. }
  68. type dataPoint interface {
  69. pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint | pmetric.SummaryDataPoint
  70. Attributes() pcommon.Map
  71. StartTimestamp() pcommon.Timestamp
  72. SetStartTimestamp(pcommon.Timestamp)
  73. Timestamp() pcommon.Timestamp
  74. SetTimestamp(pcommon.Timestamp)
  75. }
  76. func normalizeDataPointSlice[T dataPoint](dps dataPointSlice[T]) {
  77. attrCache := make(map[[16]byte]bool)
  78. for i := 0; i < dps.Len(); i++ {
  79. attrHash := pdatautil.MapHash(dps.At(i).Attributes())
  80. if attrCache[attrHash] {
  81. continue
  82. }
  83. timeSeries := []pcommon.Timestamp{dps.At(i).StartTimestamp(), dps.At(i).Timestamp()}
  84. // Find any other data points in the time series
  85. for j := i + 1; j < dps.Len(); j++ {
  86. if pdatautil.MapHash(dps.At(j).Attributes()) != attrHash {
  87. continue
  88. }
  89. timeSeries = append(timeSeries, dps.At(j).StartTimestamp(), dps.At(j).Timestamp())
  90. }
  91. normalizedTs := normalizeTimeSeries(timeSeries)
  92. for k := 0; k < dps.Len(); k++ {
  93. if pdatautil.MapHash(dps.At(k).Attributes()) != attrHash {
  94. continue
  95. }
  96. dps.At(k).SetTimestamp(normalizedTs[dps.At(k).Timestamp()])
  97. dps.At(k).SetStartTimestamp(normalizedTs[dps.At(k).StartTimestamp()])
  98. }
  99. attrCache[attrHash] = true
  100. }
  101. }