watermark.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package googlecloudpubsubexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudpubsubexporter"
  4. import (
  5. "time"
  6. "go.opentelemetry.io/collector/pdata/pcommon"
  7. "go.opentelemetry.io/collector/pdata/plog"
  8. "go.opentelemetry.io/collector/pdata/pmetric"
  9. "go.opentelemetry.io/collector/pdata/ptrace"
  10. )
  11. type metricsWatermarkFunc func(metrics pmetric.Metrics, processingTime time.Time, allowedDrift time.Duration) time.Time
  12. type logsWatermarkFunc func(logs plog.Logs, processingTime time.Time, allowedDrift time.Duration) time.Time
  13. type tracesWatermarkFunc func(traces ptrace.Traces, processingTime time.Time, allowedDrift time.Duration) time.Time
  14. type collectFunc func(timestamp pcommon.Timestamp) bool
  15. // collector helps traverse the OTLP tree to calculate the final time to set to the ce-time attribute
  16. type collector struct {
  17. // the current system clock time, set at the start of the tree traversal
  18. processingTime time.Time
  19. // maximum allowed difference for the processingTime
  20. allowedDrift time.Duration
  21. // calculated time, that can be set each time a timestamp is given to a calculation function
  22. calculatedTime time.Time
  23. }
  24. // add a new timestamp, and set the calculated time if it's earlier then the current calculated,
  25. // taking into account the allowedDrift
  26. func (c *collector) earliest(timestamp pcommon.Timestamp) bool {
  27. t := timestamp.AsTime()
  28. if t.Before(c.calculatedTime) {
  29. min := c.processingTime.Add(-c.allowedDrift)
  30. if t.Before(min) {
  31. c.calculatedTime = min
  32. return true
  33. }
  34. c.calculatedTime = t
  35. }
  36. return false
  37. }
  38. // function that doesn't traverse the metric data, return the processingTime
  39. func currentMetricsWatermark(_ pmetric.Metrics, processingTime time.Time, _ time.Duration) time.Time {
  40. return processingTime
  41. }
  42. // function that traverse the metric data, and returns the earliest timestamp (within limits of the allowedDrift)
  43. func earliestMetricsWatermark(metrics pmetric.Metrics, processingTime time.Time, allowedDrift time.Duration) time.Time {
  44. collector := &collector{
  45. processingTime: processingTime,
  46. allowedDrift: allowedDrift,
  47. calculatedTime: processingTime,
  48. }
  49. traverseMetrics(metrics, collector.earliest)
  50. return collector.calculatedTime
  51. }
  52. // traverse the metric data, with a collectFunc
  53. func traverseMetrics(metrics pmetric.Metrics, collect collectFunc) {
  54. for rix := 0; rix < metrics.ResourceMetrics().Len(); rix++ {
  55. r := metrics.ResourceMetrics().At(rix)
  56. for lix := 0; lix < r.ScopeMetrics().Len(); lix++ {
  57. l := r.ScopeMetrics().At(lix)
  58. for dix := 0; dix < l.Metrics().Len(); dix++ {
  59. d := l.Metrics().At(dix)
  60. //exhaustive:enforce
  61. switch d.Type() {
  62. case pmetric.MetricTypeHistogram:
  63. for pix := 0; pix < d.Histogram().DataPoints().Len(); pix++ {
  64. p := d.Histogram().DataPoints().At(pix)
  65. if collect(p.Timestamp()) {
  66. return
  67. }
  68. }
  69. case pmetric.MetricTypeExponentialHistogram:
  70. for pix := 0; pix < d.ExponentialHistogram().DataPoints().Len(); pix++ {
  71. p := d.ExponentialHistogram().DataPoints().At(pix)
  72. if collect(p.Timestamp()) {
  73. return
  74. }
  75. }
  76. case pmetric.MetricTypeSum:
  77. for pix := 0; pix < d.Sum().DataPoints().Len(); pix++ {
  78. p := d.Sum().DataPoints().At(pix)
  79. if collect(p.Timestamp()) {
  80. return
  81. }
  82. }
  83. case pmetric.MetricTypeGauge:
  84. for pix := 0; pix < d.Gauge().DataPoints().Len(); pix++ {
  85. p := d.Gauge().DataPoints().At(pix)
  86. if collect(p.Timestamp()) {
  87. return
  88. }
  89. }
  90. case pmetric.MetricTypeSummary:
  91. for pix := 0; pix < d.Summary().DataPoints().Len(); pix++ {
  92. p := d.Summary().DataPoints().At(pix)
  93. if collect(p.Timestamp()) {
  94. return
  95. }
  96. }
  97. }
  98. }
  99. }
  100. }
  101. }
  102. // function that doesn't traverse the log data, return the processingTime
  103. func currentLogsWatermark(_ plog.Logs, processingTime time.Time, _ time.Duration) time.Time {
  104. return processingTime
  105. }
  106. // function that traverse the log data, and returns the earliest timestamp (within limits of the allowedDrift)
  107. func earliestLogsWatermark(logs plog.Logs, processingTime time.Time, allowedDrift time.Duration) time.Time {
  108. c := collector{
  109. processingTime: processingTime,
  110. allowedDrift: allowedDrift,
  111. calculatedTime: processingTime,
  112. }
  113. traverseLogs(logs, c.earliest)
  114. return c.calculatedTime
  115. }
  116. // traverse the log data, with a collectFunc
  117. func traverseLogs(logs plog.Logs, collect collectFunc) {
  118. for rix := 0; rix < logs.ResourceLogs().Len(); rix++ {
  119. r := logs.ResourceLogs().At(rix)
  120. for lix := 0; lix < r.ScopeLogs().Len(); lix++ {
  121. l := r.ScopeLogs().At(lix)
  122. for dix := 0; dix < l.LogRecords().Len(); dix++ {
  123. d := l.LogRecords().At(dix)
  124. if collect(d.Timestamp()) {
  125. return
  126. }
  127. }
  128. }
  129. }
  130. }
  131. // function that doesn't traverse the trace data, return the processingTime
  132. func currentTracesWatermark(_ ptrace.Traces, processingTime time.Time, _ time.Duration) time.Time {
  133. return processingTime
  134. }
  135. // function that traverse the trace data, and returns the earliest timestamp (within limits of the allowedDrift)
  136. func earliestTracesWatermark(traces ptrace.Traces, processingTime time.Time, allowedDrift time.Duration) time.Time {
  137. c := collector{
  138. processingTime: processingTime,
  139. allowedDrift: allowedDrift,
  140. calculatedTime: processingTime,
  141. }
  142. traverseTraces(traces, c.earliest)
  143. return c.calculatedTime
  144. }
  145. // traverse the trace data, with a collectFunc
  146. func traverseTraces(traces ptrace.Traces, collect collectFunc) {
  147. for rix := 0; rix < traces.ResourceSpans().Len(); rix++ {
  148. r := traces.ResourceSpans().At(rix)
  149. for lix := 0; lix < r.ScopeSpans().Len(); lix++ {
  150. l := r.ScopeSpans().At(lix)
  151. for dix := 0; dix < l.Spans().Len(); dix++ {
  152. d := l.Spans().At(dix)
  153. if collect(d.StartTimestamp()) {
  154. return
  155. }
  156. }
  157. }
  158. }
  159. }