receiver.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awscloudwatchmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver"
  4. import (
  5. "context"
  6. "sync"
  7. "time"
  8. "go.opentelemetry.io/collector/component"
  9. "go.opentelemetry.io/collector/consumer"
  10. "go.uber.org/zap"
  11. )
  12. type metricReceiver struct {
  13. region string
  14. profile string
  15. imdsEndpoint string
  16. pollInterval time.Duration
  17. nextStartTime time.Time
  18. logger *zap.Logger
  19. consumer consumer.Metrics
  20. wg *sync.WaitGroup
  21. doneChan chan bool
  22. }
  23. func newMetricReceiver(cfg *Config, logger *zap.Logger, consumer consumer.Metrics) *metricReceiver {
  24. return &metricReceiver{
  25. region: cfg.Region,
  26. profile: cfg.Profile,
  27. imdsEndpoint: cfg.IMDSEndpoint,
  28. pollInterval: cfg.PollInterval,
  29. nextStartTime: time.Now().Add(-cfg.PollInterval),
  30. logger: logger,
  31. wg: &sync.WaitGroup{},
  32. consumer: consumer,
  33. doneChan: make(chan bool),
  34. }
  35. }
  36. func (m *metricReceiver) Start(ctx context.Context, _ component.Host) error {
  37. m.logger.Debug("starting to poll for CloudWatch metrics")
  38. m.wg.Add(1)
  39. go m.startPolling(ctx)
  40. return nil
  41. }
  42. func (m *metricReceiver) Shutdown(_ context.Context) error {
  43. m.logger.Debug("shutting down awscloudwatchmetrics receiver")
  44. close(m.doneChan)
  45. m.wg.Wait()
  46. return nil
  47. }
  48. func (m *metricReceiver) startPolling(ctx context.Context) {
  49. defer m.wg.Done()
  50. for {
  51. select {
  52. case <-ctx.Done():
  53. return
  54. case <-m.doneChan:
  55. return
  56. }
  57. }
  58. }