1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package filereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filereceiver"
- import (
- "context"
- "fmt"
- "time"
- "go.opentelemetry.io/collector/pdata/pcommon"
- )
- type replayTimer struct {
- sleepFunc func(ctx context.Context, d time.Duration) error
- prev pcommon.Timestamp
- throttle float64
- }
- func newReplayTimer(throttle float64) *replayTimer {
- return &replayTimer{
- throttle: throttle,
- sleepFunc: sleepWithContext,
- }
- }
- func (t *replayTimer) wait(ctx context.Context, next pcommon.Timestamp) error {
- if next == 0 {
- return nil
- }
- var sleepDuration pcommon.Timestamp
- if t.prev > 0 {
- sleepDuration = pcommon.Timestamp(float64(next-t.prev) * t.throttle)
- }
- t.prev = next
- err := t.sleepFunc(ctx, time.Duration(sleepDuration))
- if err != nil {
- return fmt.Errorf("context cancelled while waiting for replay timer: %w", err)
- }
- return nil
- }
- func sleepWithContext(ctx context.Context, d time.Duration) error {
- timer := time.NewTimer(d)
- defer timer.Stop()
- select {
- case <-timer.C:
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }
|