receiver.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package filereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filereceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "os"
  10. "go.opentelemetry.io/collector/component"
  11. "go.opentelemetry.io/collector/consumer"
  12. "go.uber.org/zap"
  13. )
  14. type fileReceiver struct {
  15. consumer consumer.Metrics
  16. logger *zap.Logger
  17. cancel context.CancelFunc
  18. path string
  19. throttle float64
  20. }
  21. func (r *fileReceiver) Start(ctx context.Context, _ component.Host) error {
  22. ctx, r.cancel = context.WithCancel(ctx)
  23. file, err := os.Open(r.path)
  24. if err != nil {
  25. return fmt.Errorf("failed to open file %q: %w", r.path, err)
  26. }
  27. fr := newFileReader(r.consumer, file, newReplayTimer(r.throttle))
  28. go func() {
  29. err := fr.readAll(ctx)
  30. if err != nil {
  31. if errors.Is(err, io.EOF) {
  32. r.logger.Debug("EOF reached")
  33. } else {
  34. r.logger.Error("failed to read input file", zap.Error(err))
  35. }
  36. }
  37. }()
  38. return nil
  39. }
  40. func (r *fileReceiver) Shutdown(_ context.Context) error {
  41. if r.cancel != nil {
  42. r.cancel()
  43. }
  44. return nil
  45. }