replay_timer.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package filereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filereceiver"
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. "go.opentelemetry.io/collector/pdata/pcommon"
  9. )
  10. type replayTimer struct {
  11. sleepFunc func(ctx context.Context, d time.Duration) error
  12. prev pcommon.Timestamp
  13. throttle float64
  14. }
  15. func newReplayTimer(throttle float64) *replayTimer {
  16. return &replayTimer{
  17. throttle: throttle,
  18. sleepFunc: sleepWithContext,
  19. }
  20. }
  21. func (t *replayTimer) wait(ctx context.Context, next pcommon.Timestamp) error {
  22. if next == 0 {
  23. return nil
  24. }
  25. var sleepDuration pcommon.Timestamp
  26. if t.prev > 0 {
  27. sleepDuration = pcommon.Timestamp(float64(next-t.prev) * t.throttle)
  28. }
  29. t.prev = next
  30. err := t.sleepFunc(ctx, time.Duration(sleepDuration))
  31. if err != nil {
  32. return fmt.Errorf("context cancelled while waiting for replay timer: %w", err)
  33. }
  34. return nil
  35. }
  36. func sleepWithContext(ctx context.Context, d time.Duration) error {
  37. timer := time.NewTimer(d)
  38. defer timer.Stop()
  39. select {
  40. case <-timer.C:
  41. return nil
  42. case <-ctx.Done():
  43. return ctx.Err()
  44. }
  45. }