watcher.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver"
  4. import (
  5. "context"
  6. "fmt"
  7. "reflect"
  8. "sync/atomic"
  9. "time"
  10. quotaclientset "github.com/openshift/client-go/quota/clientset/versioned"
  11. quotainformersv1 "github.com/openshift/client-go/quota/informers/externalversions"
  12. "go.opentelemetry.io/collector/component"
  13. "go.opentelemetry.io/collector/consumer"
  14. "go.opentelemetry.io/collector/pdata/pcommon"
  15. "go.opentelemetry.io/collector/receiver"
  16. "go.uber.org/zap"
  17. appsv1 "k8s.io/api/apps/v1"
  18. autoscalingv2 "k8s.io/api/autoscaling/v2"
  19. batchv1 "k8s.io/api/batch/v1"
  20. corev1 "k8s.io/api/core/v1"
  21. apierrors "k8s.io/apimachinery/pkg/api/errors"
  22. "k8s.io/apimachinery/pkg/runtime/schema"
  23. "k8s.io/client-go/informers"
  24. "k8s.io/client-go/kubernetes"
  25. "k8s.io/client-go/tools/cache"
  26. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
  27. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
  28. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob"
  29. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset"
  30. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment"
  31. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
  32. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa"
  33. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs"
  34. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
  35. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node"
  36. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"
  37. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset"
  38. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicationcontroller"
  39. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset"
  40. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils"
  41. )
  42. type sharedInformer interface {
  43. Start(<-chan struct{})
  44. WaitForCacheSync(<-chan struct{}) map[reflect.Type]bool
  45. }
  46. type resourceWatcher struct {
  47. client kubernetes.Interface
  48. osQuotaClient quotaclientset.Interface
  49. informerFactories []sharedInformer
  50. metadataStore *metadata.Store
  51. logger *zap.Logger
  52. metadataConsumers []metadataConsumer
  53. initialTimeout time.Duration
  54. initialSyncDone *atomic.Bool
  55. initialSyncTimedOut *atomic.Bool
  56. config *Config
  57. entityLogConsumer consumer.Logs
  58. // For mocking.
  59. makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error)
  60. makeOpenShiftQuotaClient func(apiConf k8sconfig.APIConfig) (quotaclientset.Interface, error)
  61. }
  62. type metadataConsumer func(metadata []*experimentalmetricmetadata.MetadataUpdate) error
  63. // newResourceWatcher creates a Kubernetes resource watcher.
  64. func newResourceWatcher(set receiver.CreateSettings, cfg *Config, metadataStore *metadata.Store) *resourceWatcher {
  65. return &resourceWatcher{
  66. logger: set.Logger,
  67. metadataStore: metadataStore,
  68. initialSyncDone: &atomic.Bool{},
  69. initialSyncTimedOut: &atomic.Bool{},
  70. initialTimeout: defaultInitialSyncTimeout,
  71. config: cfg,
  72. makeClient: k8sconfig.MakeClient,
  73. makeOpenShiftQuotaClient: k8sconfig.MakeOpenShiftQuotaClient,
  74. }
  75. }
  76. func (rw *resourceWatcher) initialize() error {
  77. client, err := rw.makeClient(rw.config.APIConfig)
  78. if err != nil {
  79. return fmt.Errorf("Failed to create Kubernnetes client: %w", err)
  80. }
  81. rw.client = client
  82. if rw.config.Distribution == distributionOpenShift {
  83. rw.osQuotaClient, err = rw.makeOpenShiftQuotaClient(rw.config.APIConfig)
  84. if err != nil {
  85. return fmt.Errorf("Failed to create OpenShift quota API client: %w", err)
  86. }
  87. }
  88. err = rw.prepareSharedInformerFactory()
  89. if err != nil {
  90. return err
  91. }
  92. return nil
  93. }
  94. func (rw *resourceWatcher) prepareSharedInformerFactory() error {
  95. factory := informers.NewSharedInformerFactoryWithOptions(rw.client, rw.config.MetadataCollectionInterval)
  96. // Map of supported group version kinds by name of a kind.
  97. // If none of the group versions are supported by k8s server for a specific kind,
  98. // informer for that kind won't be set and a warning message is thrown.
  99. // This map should be kept in sync with what can be provided by the supported k8s server versions.
  100. supportedKinds := map[string][]schema.GroupVersionKind{
  101. "Pod": {gvk.Pod},
  102. "Node": {gvk.Node},
  103. "Namespace": {gvk.Namespace},
  104. "ReplicationController": {gvk.ReplicationController},
  105. "ResourceQuota": {gvk.ResourceQuota},
  106. "Service": {gvk.Service},
  107. "DaemonSet": {gvk.DaemonSet},
  108. "Deployment": {gvk.Deployment},
  109. "ReplicaSet": {gvk.ReplicaSet},
  110. "StatefulSet": {gvk.StatefulSet},
  111. "Job": {gvk.Job},
  112. "CronJob": {gvk.CronJob},
  113. "HorizontalPodAutoscaler": {gvk.HorizontalPodAutoscaler},
  114. }
  115. for kind, gvks := range supportedKinds {
  116. anySupported := false
  117. for _, gvk := range gvks {
  118. supported, err := rw.isKindSupported(gvk)
  119. if err != nil {
  120. return err
  121. }
  122. if supported {
  123. anySupported = true
  124. rw.setupInformerForKind(gvk, factory)
  125. }
  126. }
  127. if !anySupported {
  128. rw.logger.Warn("Server doesn't support any of the group versions defined for the kind",
  129. zap.String("kind", kind))
  130. }
  131. }
  132. if rw.osQuotaClient != nil {
  133. quotaFactory := quotainformersv1.NewSharedInformerFactory(rw.osQuotaClient, 0)
  134. rw.setupInformer(gvk.ClusterResourceQuota, quotaFactory.Quota().V1().ClusterResourceQuotas().Informer())
  135. rw.informerFactories = append(rw.informerFactories, quotaFactory)
  136. }
  137. rw.informerFactories = append(rw.informerFactories, factory)
  138. return nil
  139. }
  140. func (rw *resourceWatcher) isKindSupported(gvk schema.GroupVersionKind) (bool, error) {
  141. resources, err := rw.client.Discovery().ServerResourcesForGroupVersion(gvk.GroupVersion().String())
  142. if err != nil {
  143. if apierrors.IsNotFound(err) { // if the discovery endpoint isn't present, assume group version is not supported
  144. rw.logger.Debug("Group version is not supported", zap.String("group", gvk.GroupVersion().String()))
  145. return false, nil
  146. }
  147. return false, fmt.Errorf("failed to fetch group version details: %w", err)
  148. }
  149. for _, r := range resources.APIResources {
  150. if r.Kind == gvk.Kind {
  151. return true, nil
  152. }
  153. }
  154. return false, nil
  155. }
  156. func (rw *resourceWatcher) setupInformerForKind(kind schema.GroupVersionKind, factory informers.SharedInformerFactory) {
  157. switch kind {
  158. case gvk.Pod:
  159. rw.setupInformer(kind, factory.Core().V1().Pods().Informer())
  160. case gvk.Node:
  161. rw.setupInformer(kind, factory.Core().V1().Nodes().Informer())
  162. case gvk.Namespace:
  163. rw.setupInformer(kind, factory.Core().V1().Namespaces().Informer())
  164. case gvk.ReplicationController:
  165. rw.setupInformer(kind, factory.Core().V1().ReplicationControllers().Informer())
  166. case gvk.ResourceQuota:
  167. rw.setupInformer(kind, factory.Core().V1().ResourceQuotas().Informer())
  168. case gvk.Service:
  169. rw.setupInformer(kind, factory.Core().V1().Services().Informer())
  170. case gvk.DaemonSet:
  171. rw.setupInformer(kind, factory.Apps().V1().DaemonSets().Informer())
  172. case gvk.Deployment:
  173. rw.setupInformer(kind, factory.Apps().V1().Deployments().Informer())
  174. case gvk.ReplicaSet:
  175. rw.setupInformer(kind, factory.Apps().V1().ReplicaSets().Informer())
  176. case gvk.StatefulSet:
  177. rw.setupInformer(kind, factory.Apps().V1().StatefulSets().Informer())
  178. case gvk.Job:
  179. rw.setupInformer(kind, factory.Batch().V1().Jobs().Informer())
  180. case gvk.CronJob:
  181. rw.setupInformer(kind, factory.Batch().V1().CronJobs().Informer())
  182. case gvk.HorizontalPodAutoscaler:
  183. rw.setupInformer(kind, factory.Autoscaling().V2().HorizontalPodAutoscalers().Informer())
  184. default:
  185. rw.logger.Error("Could not setup an informer for provided group version kind",
  186. zap.String("group version kind", kind.String()))
  187. }
  188. }
  189. // startWatchingResources starts up all informers.
  190. func (rw *resourceWatcher) startWatchingResources(ctx context.Context, inf sharedInformer) context.Context {
  191. var cancel context.CancelFunc
  192. timedContextForInitialSync, cancel := context.WithTimeout(ctx, rw.initialTimeout)
  193. // Start off individual informers in the factory.
  194. inf.Start(ctx.Done())
  195. // Ensure cache is synced with initial state, once informers are started up.
  196. // Note that the event handler can start receiving events as soon as the informers
  197. // are started. So it's required to ensure that the receiver does not start
  198. // collecting data before the cache sync since all data may not be available.
  199. // This method will block either till the timeout set on the context, until
  200. // the initial sync is complete or the parent context is cancelled.
  201. inf.WaitForCacheSync(timedContextForInitialSync.Done())
  202. defer cancel()
  203. return timedContextForInitialSync
  204. }
  205. // setupInformer adds event handlers to informers and setups a metadataStore.
  206. func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer cache.SharedIndexInformer) {
  207. err := informer.SetTransform(transformObject)
  208. if err != nil {
  209. rw.logger.Error("error setting informer transform function", zap.Error(err))
  210. }
  211. _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  212. AddFunc: rw.onAdd,
  213. UpdateFunc: rw.onUpdate,
  214. })
  215. if err != nil {
  216. rw.logger.Error("error adding event handler to informer", zap.Error(err))
  217. }
  218. rw.metadataStore.Setup(gvk, informer.GetStore())
  219. }
  220. func (rw *resourceWatcher) onAdd(obj any) {
  221. rw.waitForInitialInformerSync()
  222. // Sync metadata only if there's at least one destination for it to sent.
  223. if !rw.hasDestination() {
  224. return
  225. }
  226. rw.syncMetadataUpdate(map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}, rw.objMetadata(obj))
  227. }
  228. func (rw *resourceWatcher) hasDestination() bool {
  229. return len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil
  230. }
  231. func (rw *resourceWatcher) onUpdate(oldObj, newObj any) {
  232. rw.waitForInitialInformerSync()
  233. // Sync metadata only if there's at least one destination for it to sent.
  234. if !rw.hasDestination() {
  235. return
  236. }
  237. rw.syncMetadataUpdate(rw.objMetadata(oldObj), rw.objMetadata(newObj))
  238. }
  239. // objMetadata returns the metadata for the given object.
  240. func (rw *resourceWatcher) objMetadata(obj any) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata {
  241. switch o := obj.(type) {
  242. case *corev1.Pod:
  243. return pod.GetMetadata(o, rw.metadataStore, rw.logger)
  244. case *corev1.Node:
  245. return node.GetMetadata(o)
  246. case *corev1.ReplicationController:
  247. return replicationcontroller.GetMetadata(o)
  248. case *appsv1.Deployment:
  249. return deployment.GetMetadata(o)
  250. case *appsv1.ReplicaSet:
  251. return replicaset.GetMetadata(o)
  252. case *appsv1.DaemonSet:
  253. return demonset.GetMetadata(o)
  254. case *appsv1.StatefulSet:
  255. return statefulset.GetMetadata(o)
  256. case *batchv1.Job:
  257. return jobs.GetMetadata(o)
  258. case *batchv1.CronJob:
  259. return cronjob.GetMetadata(o)
  260. case *autoscalingv2.HorizontalPodAutoscaler:
  261. return hpa.GetMetadata(o)
  262. }
  263. return nil
  264. }
  265. func (rw *resourceWatcher) waitForInitialInformerSync() {
  266. if rw.initialSyncDone.Load() || rw.initialSyncTimedOut.Load() {
  267. return
  268. }
  269. // Wait till initial sync is complete or timeout.
  270. for !rw.initialSyncDone.Load() {
  271. if rw.initialSyncTimedOut.Load() {
  272. return
  273. }
  274. time.Sleep(100 * time.Millisecond)
  275. }
  276. }
  277. func (rw *resourceWatcher) setupMetadataExporters(
  278. exporters map[component.ID]component.Component,
  279. metadataExportersFromConfig []string,
  280. ) error {
  281. var out []metadataConsumer
  282. metadataExportersSet := utils.StringSliceToMap(metadataExportersFromConfig)
  283. if err := validateMetadataExporters(metadataExportersSet, exporters); err != nil {
  284. return fmt.Errorf("failed to configure metadata_exporters: %w", err)
  285. }
  286. for cfg, exp := range exporters {
  287. if !metadataExportersSet[cfg.String()] {
  288. continue
  289. }
  290. kme, ok := exp.(experimentalmetricmetadata.MetadataExporter)
  291. if !ok {
  292. return fmt.Errorf("%s exporter does not implement MetadataExporter", cfg.Name())
  293. }
  294. out = append(out, kme.ConsumeMetadata)
  295. rw.logger.Info("Configured Kubernetes MetadataExporter",
  296. zap.String("exporter_name", cfg.String()),
  297. )
  298. }
  299. rw.metadataConsumers = out
  300. return nil
  301. }
  302. func validateMetadataExporters(metadataExporters map[string]bool, exporters map[component.ID]component.Component) error {
  303. configuredExporters := map[string]bool{}
  304. for cfg := range exporters {
  305. configuredExporters[cfg.String()] = true
  306. }
  307. for e := range metadataExporters {
  308. if !configuredExporters[e] {
  309. return fmt.Errorf("%s exporter is not in collector config", e)
  310. }
  311. }
  312. return nil
  313. }
  314. func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata) {
  315. timestamp := pcommon.NewTimestampFromTime(time.Now())
  316. metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata)
  317. if len(metadataUpdate) != 0 {
  318. for _, consume := range rw.metadataConsumers {
  319. _ = consume(metadataUpdate)
  320. }
  321. }
  322. if rw.entityLogConsumer != nil {
  323. // Represent metadata update as entity events.
  324. entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata, timestamp)
  325. // Convert entity events to log representation.
  326. logs := entityEvents.ConvertAndMoveToLogs()
  327. if logs.LogRecordCount() != 0 {
  328. err := rw.entityLogConsumer.ConsumeLogs(context.Background(), logs)
  329. if err != nil {
  330. rw.logger.Error("Error sending entity events to the consumer", zap.Error(err))
  331. // Note: receiver contract says that we need to retry sending if the
  332. // returned error is not Permanent. However, we are not doing it here.
  333. // Instead, we rely on the fact the metadata is collected periodically
  334. // and the entity events will be delivered on the next cycle. This is
  335. // fine because we deliver cumulative entity state.
  336. // This allows us to avoid stressing the Collector or its destination
  337. // unnecessarily (typically non-Permanent errors happen in stressed conditions).
  338. // The periodic collection will be implemented later, see
  339. // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24413
  340. }
  341. }
  342. }
  343. }