receiver.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package awscontainerinsightreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "time"
  8. "go.opentelemetry.io/collector/component"
  9. "go.opentelemetry.io/collector/consumer"
  10. "go.opentelemetry.io/collector/pdata/pmetric"
  11. "go.opentelemetry.io/collector/receiver"
  12. "go.uber.org/zap"
  13. ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
  14. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor"
  15. ecsinfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/ecsInfo"
  16. hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
  17. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
  19. )
  20. var _ receiver.Metrics = (*awsContainerInsightReceiver)(nil)
  21. type metricsProvider interface {
  22. GetMetrics() []pmetric.Metrics
  23. Shutdown() error
  24. }
  25. // awsContainerInsightReceiver implements the receiver.Metrics
  26. type awsContainerInsightReceiver struct {
  27. settings component.TelemetrySettings
  28. nextConsumer consumer.Metrics
  29. config *Config
  30. cancel context.CancelFunc
  31. cadvisor metricsProvider
  32. k8sapiserver metricsProvider
  33. }
  34. // newAWSContainerInsightReceiver creates the aws container insight receiver with the given parameters.
  35. func newAWSContainerInsightReceiver(
  36. settings component.TelemetrySettings,
  37. config *Config,
  38. nextConsumer consumer.Metrics) (receiver.Metrics, error) {
  39. if nextConsumer == nil {
  40. return nil, component.ErrNilNextConsumer
  41. }
  42. r := &awsContainerInsightReceiver{
  43. settings: settings,
  44. nextConsumer: nextConsumer,
  45. config: config,
  46. }
  47. return r, nil
  48. }
  49. // Start collecting metrics from cadvisor and k8s api server (if it is an elected leader)
  50. func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host component.Host) error {
  51. ctx, acir.cancel = context.WithCancel(ctx)
  52. hostinfo, err := hostInfo.NewInfo(acir.config.ContainerOrchestrator, acir.config.CollectionInterval, acir.settings.Logger)
  53. if err != nil {
  54. return err
  55. }
  56. if acir.config.ContainerOrchestrator == ci.EKS {
  57. k8sDecorator, err := stores.NewK8sDecorator(ctx, acir.config.TagService, acir.config.PrefFullPodName, acir.config.AddFullPodNameMetricLabel, acir.settings.Logger)
  58. if err != nil {
  59. return err
  60. }
  61. decoratorOption := cadvisor.WithDecorator(k8sDecorator)
  62. acir.cadvisor, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, decoratorOption)
  63. if err != nil {
  64. return err
  65. }
  66. acir.k8sapiserver, err = k8sapiserver.New(hostinfo, acir.settings.Logger)
  67. if err != nil {
  68. return err
  69. }
  70. }
  71. if acir.config.ContainerOrchestrator == ci.ECS {
  72. ecsInfo, err := ecsinfo.NewECSInfo(acir.config.CollectionInterval, hostinfo, host, acir.settings)
  73. if err != nil {
  74. return err
  75. }
  76. ecsOption := cadvisor.WithECSInfoCreator(ecsInfo)
  77. acir.cadvisor, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, ecsOption)
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. go func() {
  83. // cadvisor collects data at dynamical intervals (from 1 to 15 seconds). If the ticker happens
  84. // at beginning of a minute, it might read the data collected at end of last minute. To avoid this,
  85. // we want to wait until at least two cadvisor collection intervals happens before collecting the metrics
  86. secondsInMin := time.Now().Second()
  87. if secondsInMin < 30 {
  88. time.Sleep(time.Duration(30-secondsInMin) * time.Second)
  89. }
  90. ticker := time.NewTicker(acir.config.CollectionInterval)
  91. defer ticker.Stop()
  92. for {
  93. select {
  94. case <-ticker.C:
  95. _ = acir.collectData(ctx)
  96. case <-ctx.Done():
  97. return
  98. }
  99. }
  100. }()
  101. return nil
  102. }
  103. // Shutdown stops the awsContainerInsightReceiver receiver.
  104. func (acir *awsContainerInsightReceiver) Shutdown(context.Context) error {
  105. if acir.cancel == nil {
  106. return nil
  107. }
  108. acir.cancel()
  109. var errs error
  110. if acir.k8sapiserver != nil {
  111. errs = errors.Join(errs, acir.k8sapiserver.Shutdown())
  112. }
  113. if acir.cadvisor != nil {
  114. errs = errors.Join(errs, acir.cadvisor.Shutdown())
  115. }
  116. return errs
  117. }
  118. // collectData collects container stats from Amazon ECS Task Metadata Endpoint
  119. func (acir *awsContainerInsightReceiver) collectData(ctx context.Context) error {
  120. var mds []pmetric.Metrics
  121. if acir.cadvisor == nil && acir.k8sapiserver == nil {
  122. err := errors.New("both cadvisor and k8sapiserver failed to start")
  123. acir.settings.Logger.Error("Failed to collect stats", zap.Error(err))
  124. return err
  125. }
  126. if acir.cadvisor != nil {
  127. mds = append(mds, acir.cadvisor.GetMetrics()...)
  128. }
  129. if acir.k8sapiserver != nil {
  130. mds = append(mds, acir.k8sapiserver.GetMetrics()...)
  131. }
  132. for _, md := range mds {
  133. err := acir.nextConsumer.ConsumeMetrics(ctx, md)
  134. if err != nil {
  135. return err
  136. }
  137. }
  138. return nil
  139. }