receiver.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "time"
  8. "go.opentelemetry.io/collector/component"
  9. "go.opentelemetry.io/collector/consumer"
  10. "go.opentelemetry.io/collector/receiver"
  11. "go.opentelemetry.io/collector/receiver/receiverhelper"
  12. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"
  13. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
  14. )
  15. const (
  16. transport = "http"
  17. defaultInitialSyncTimeout = 10 * time.Minute
  18. )
  19. var _ receiver.Metrics = (*kubernetesReceiver)(nil)
  20. type kubernetesReceiver struct {
  21. dataCollector *collection.DataCollector
  22. resourceWatcher *resourceWatcher
  23. config *Config
  24. settings receiver.CreateSettings
  25. metricsConsumer consumer.Metrics
  26. cancel context.CancelFunc
  27. obsrecv *receiverhelper.ObsReport
  28. }
  29. func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error {
  30. ctx, kr.cancel = context.WithCancel(ctx)
  31. if err := kr.resourceWatcher.initialize(); err != nil {
  32. return err
  33. }
  34. exporters := host.GetExporters() //nolint:staticcheck
  35. if err := kr.resourceWatcher.setupMetadataExporters(
  36. exporters[component.DataTypeMetrics], kr.config.MetadataExporters); err != nil {
  37. return err
  38. }
  39. go func() {
  40. kr.settings.Logger.Info("Starting shared informers and wait for initial cache sync.")
  41. for _, informer := range kr.resourceWatcher.informerFactories {
  42. if informer == nil {
  43. continue
  44. }
  45. timedContextForInitialSync := kr.resourceWatcher.startWatchingResources(ctx, informer)
  46. // Wait till either the initial cache sync times out or until the cancel method
  47. // corresponding to this context is called.
  48. <-timedContextForInitialSync.Done()
  49. // If the context times out, set initialSyncTimedOut and report a fatal error. Currently
  50. // this timeout is 10 minutes, which appears to be long enough.
  51. if errors.Is(timedContextForInitialSync.Err(), context.DeadlineExceeded) {
  52. kr.resourceWatcher.initialSyncTimedOut.Store(true)
  53. kr.settings.Logger.Error("Timed out waiting for initial cache sync.")
  54. host.ReportFatalError(errors.New("failed to start receiver"))
  55. return
  56. }
  57. }
  58. kr.settings.Logger.Info("Completed syncing shared informer caches.")
  59. kr.resourceWatcher.initialSyncDone.Store(true)
  60. ticker := time.NewTicker(kr.config.CollectionInterval)
  61. defer ticker.Stop()
  62. for {
  63. select {
  64. case <-ticker.C:
  65. kr.dispatchMetrics(ctx)
  66. case <-ctx.Done():
  67. return
  68. }
  69. }
  70. }()
  71. return nil
  72. }
  73. func (kr *kubernetesReceiver) Shutdown(context.Context) error {
  74. if kr.cancel == nil {
  75. return nil
  76. }
  77. kr.cancel()
  78. return nil
  79. }
  80. func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) {
  81. if kr.metricsConsumer == nil {
  82. // Metric collection is not enabled.
  83. return
  84. }
  85. mds := kr.dataCollector.CollectMetricData(time.Now())
  86. c := kr.obsrecv.StartMetricsOp(ctx)
  87. numPoints := mds.DataPointCount()
  88. err := kr.metricsConsumer.ConsumeMetrics(c, mds)
  89. kr.obsrecv.EndMetricsOp(c, metadata.Type, numPoints, err)
  90. }
  91. // newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
  92. func newMetricsReceiver(
  93. ctx context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics,
  94. ) (receiver.Metrics, error) {
  95. var err error
  96. r := receivers.GetOrAdd(
  97. cfg, func() component.Component {
  98. var rcv component.Component
  99. rcv, err = newReceiver(ctx, set, cfg)
  100. return rcv
  101. },
  102. )
  103. if err != nil {
  104. return nil, err
  105. }
  106. r.Unwrap().(*kubernetesReceiver).metricsConsumer = consumer
  107. return r, nil
  108. }
  109. // newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
  110. func newLogsReceiver(
  111. ctx context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Logs,
  112. ) (receiver.Logs, error) {
  113. var err error
  114. r := receivers.GetOrAdd(
  115. cfg, func() component.Component {
  116. var rcv component.Component
  117. rcv, err = newReceiver(ctx, set, cfg)
  118. return rcv
  119. },
  120. )
  121. if err != nil {
  122. return nil, err
  123. }
  124. r.Unwrap().(*kubernetesReceiver).resourceWatcher.entityLogConsumer = consumer
  125. return r, nil
  126. }
  127. // newMetricsReceiver creates the Kubernetes cluster receiver with the given configuration.
  128. func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) {
  129. rCfg := cfg.(*Config)
  130. obsrecv, err := receiverhelper.NewObsReport(
  131. receiverhelper.ObsReportSettings{
  132. ReceiverID: set.ID,
  133. Transport: transport,
  134. ReceiverCreateSettings: set,
  135. },
  136. )
  137. if err != nil {
  138. return nil, err
  139. }
  140. ms := metadata.NewStore()
  141. return &kubernetesReceiver{
  142. dataCollector: collection.NewDataCollector(set, ms, rCfg.MetricsBuilderConfig,
  143. rCfg.NodeConditionTypesToReport, rCfg.AllocatableTypesToReport),
  144. resourceWatcher: newResourceWatcher(set, rCfg, ms),
  145. settings: set,
  146. config: rCfg,
  147. obsrecv: obsrecv,
  148. }, nil
  149. }