watcher_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package k8sclusterreceiver
  4. import (
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/stretchr/testify/require"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/consumer/consumertest"
  11. "go.opentelemetry.io/collector/receiver/receivertest"
  12. "go.uber.org/zap"
  13. "go.uber.org/zap/zapcore"
  14. "go.uber.org/zap/zaptest/observer"
  15. corev1 "k8s.io/api/core/v1"
  16. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  17. "k8s.io/apimachinery/pkg/runtime/schema"
  18. "k8s.io/apimachinery/pkg/types"
  19. "k8s.io/client-go/informers"
  20. "k8s.io/client-go/kubernetes/fake"
  21. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps"
  22. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
  23. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
  24. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
  25. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
  26. )
  27. var commonPodMetadata = map[string]string{
  28. "foo": "bar",
  29. "foo1": "",
  30. "pod.creation_timestamp": "0001-01-01T00:00:00Z",
  31. }
  32. func TestSetupMetadataExporters(t *testing.T) {
  33. type fields struct {
  34. metadataConsumers []metadataConsumer
  35. }
  36. type args struct {
  37. exporters map[component.ID]component.Component
  38. metadataExportersFromConfig []string
  39. }
  40. tests := []struct {
  41. name string
  42. fields fields
  43. args args
  44. wantErr bool
  45. }{
  46. {
  47. "Unsupported exporter",
  48. fields{},
  49. args{
  50. exporters: map[component.ID]component.Component{
  51. component.NewID("nop"): MockExporter{},
  52. },
  53. metadataExportersFromConfig: []string{"nop"},
  54. },
  55. true,
  56. },
  57. {
  58. "Supported exporter",
  59. fields{
  60. metadataConsumers: []metadataConsumer{(&mockExporterWithK8sMetadata{}).ConsumeMetadata},
  61. },
  62. args{exporters: map[component.ID]component.Component{
  63. component.NewID("nop"): mockExporterWithK8sMetadata{},
  64. },
  65. metadataExportersFromConfig: []string{"nop"},
  66. },
  67. false,
  68. },
  69. {
  70. "Non-existent exporter",
  71. fields{
  72. metadataConsumers: []metadataConsumer{},
  73. },
  74. args{exporters: map[component.ID]component.Component{
  75. component.NewID("nop"): mockExporterWithK8sMetadata{},
  76. },
  77. metadataExportersFromConfig: []string{"nop/1"},
  78. },
  79. true,
  80. },
  81. }
  82. for _, tt := range tests {
  83. t.Run(tt.name, func(t *testing.T) {
  84. rw := &resourceWatcher{
  85. logger: zap.NewNop(),
  86. }
  87. if err := rw.setupMetadataExporters(tt.args.exporters, tt.args.metadataExportersFromConfig); (err != nil) != tt.wantErr {
  88. t.Errorf("setupMetadataExporters() error = %v, wantErr %v", err, tt.wantErr)
  89. }
  90. require.Equal(t, len(tt.fields.metadataConsumers), len(rw.metadataConsumers))
  91. })
  92. }
  93. }
  94. func TestIsKindSupported(t *testing.T) {
  95. var tests = []struct {
  96. name string
  97. client *fake.Clientset
  98. gvk schema.GroupVersionKind
  99. expected bool
  100. }{
  101. {
  102. name: "nothing_supported",
  103. client: fake.NewSimpleClientset(),
  104. gvk: gvk.Pod,
  105. expected: false,
  106. },
  107. {
  108. name: "all_kinds_supported",
  109. client: newFakeClientWithAllResources(),
  110. gvk: gvk.Pod,
  111. expected: true,
  112. },
  113. }
  114. for _, tt := range tests {
  115. t.Run(tt.name, func(t *testing.T) {
  116. rw := &resourceWatcher{
  117. client: tt.client,
  118. logger: zap.NewNop(),
  119. }
  120. supported, err := rw.isKindSupported(tt.gvk)
  121. assert.NoError(t, err)
  122. assert.Equal(t, tt.expected, supported)
  123. })
  124. }
  125. }
  126. func TestPrepareSharedInformerFactory(t *testing.T) {
  127. var tests = []struct {
  128. name string
  129. client *fake.Clientset
  130. }{
  131. {
  132. name: "new_server_version",
  133. client: newFakeClientWithAllResources(),
  134. },
  135. {
  136. name: "old_server_version", // With no batch/v1.CronJob support.
  137. client: func() *fake.Clientset {
  138. client := fake.NewSimpleClientset()
  139. client.Resources = []*metav1.APIResourceList{
  140. {
  141. GroupVersion: "v1",
  142. APIResources: []metav1.APIResource{
  143. gvkToAPIResource(gvk.Pod),
  144. gvkToAPIResource(gvk.Node),
  145. gvkToAPIResource(gvk.Namespace),
  146. gvkToAPIResource(gvk.ReplicationController),
  147. gvkToAPIResource(gvk.ResourceQuota),
  148. gvkToAPIResource(gvk.Service),
  149. },
  150. },
  151. {
  152. GroupVersion: "apps/v1",
  153. APIResources: []metav1.APIResource{
  154. gvkToAPIResource(gvk.DaemonSet),
  155. gvkToAPIResource(gvk.Deployment),
  156. gvkToAPIResource(gvk.ReplicaSet),
  157. gvkToAPIResource(gvk.StatefulSet),
  158. },
  159. },
  160. {
  161. GroupVersion: "batch/v1",
  162. APIResources: []metav1.APIResource{
  163. gvkToAPIResource(gvk.Job),
  164. },
  165. },
  166. {
  167. GroupVersion: "autoscaling/v2",
  168. APIResources: []metav1.APIResource{
  169. gvkToAPIResource(gvk.HorizontalPodAutoscaler),
  170. },
  171. },
  172. }
  173. return client
  174. }(),
  175. },
  176. }
  177. for _, tt := range tests {
  178. t.Run(tt.name, func(t *testing.T) {
  179. obs, logs := observer.New(zap.WarnLevel)
  180. obsLogger := zap.New(obs)
  181. rw := &resourceWatcher{
  182. client: newFakeClientWithAllResources(),
  183. logger: obsLogger,
  184. metadataStore: metadata.NewStore(),
  185. config: &Config{},
  186. }
  187. assert.NoError(t, rw.prepareSharedInformerFactory())
  188. // Make sure no warning or error logs are raised
  189. assert.Equal(t, 0, logs.Len())
  190. })
  191. }
  192. }
  193. func TestSetupInformerForKind(t *testing.T) {
  194. obs, logs := observer.New(zap.WarnLevel)
  195. obsLogger := zap.New(obs)
  196. rw := &resourceWatcher{
  197. client: newFakeClientWithAllResources(),
  198. logger: obsLogger,
  199. }
  200. factory := informers.NewSharedInformerFactoryWithOptions(rw.client, 0)
  201. rw.setupInformerForKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "WrongKind"}, factory)
  202. assert.Equal(t, 1, logs.Len())
  203. assert.Equal(t, "Could not setup an informer for provided group version kind", logs.All()[0].Entry.Message)
  204. }
  205. func TestSyncMetadataAndEmitEntityEvents(t *testing.T) {
  206. client := newFakeClientWithAllResources()
  207. logsConsumer := new(consumertest.LogsSink)
  208. // Setup k8s resources.
  209. pods := createPods(t, client, 1)
  210. origPod := pods[0]
  211. updatedPod := getUpdatedPod(origPod)
  212. rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{}, metadata.NewStore())
  213. rw.entityLogConsumer = logsConsumer
  214. step1 := time.Now()
  215. // Make some changes to the pod. Each change should result in an entity event represented
  216. // as a log record.
  217. // Pod is created.
  218. rw.syncMetadataUpdate(nil, rw.objMetadata(origPod))
  219. step2 := time.Now()
  220. // Pod is updated.
  221. rw.syncMetadataUpdate(rw.objMetadata(origPod), rw.objMetadata(updatedPod))
  222. step3 := time.Now()
  223. // Pod is updated again, but nothing changed in the pod.
  224. // Should still result in entity event because they are emitted even
  225. // if the entity is not changed.
  226. rw.syncMetadataUpdate(rw.objMetadata(updatedPod), rw.objMetadata(updatedPod))
  227. step4 := time.Now()
  228. // Change pod's state back to original
  229. rw.syncMetadataUpdate(rw.objMetadata(updatedPod), rw.objMetadata(origPod))
  230. step5 := time.Now()
  231. // Delete the pod
  232. rw.syncMetadataUpdate(rw.objMetadata(origPod), nil)
  233. step6 := time.Now()
  234. // Must have 5 entity events.
  235. require.EqualValues(t, 5, logsConsumer.LogRecordCount())
  236. // Event 1 should contain the initial state of the pod.
  237. lr := logsConsumer.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
  238. expected := map[string]any{
  239. "otel.entity.event.type": "entity_state",
  240. "otel.entity.type": "k8s.pod",
  241. "otel.entity.id": map[string]any{"k8s.pod.uid": "pod0"},
  242. "otel.entity.attributes": map[string]any{"pod.creation_timestamp": "0001-01-01T00:00:00Z"},
  243. }
  244. assert.EqualValues(t, expected, lr.Attributes().AsRaw())
  245. assert.WithinRange(t, lr.Timestamp().AsTime(), step1, step2)
  246. // Event 2 should contain the updated state of the pod.
  247. lr = logsConsumer.AllLogs()[1].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
  248. attrs := expected["otel.entity.attributes"].(map[string]any)
  249. attrs["key"] = "value"
  250. assert.EqualValues(t, expected, lr.Attributes().AsRaw())
  251. assert.WithinRange(t, lr.Timestamp().AsTime(), step2, step3)
  252. // Event 3 should be identical to the previous one since pod state didn't change.
  253. lr = logsConsumer.AllLogs()[2].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
  254. assert.EqualValues(t, expected, lr.Attributes().AsRaw())
  255. assert.WithinRange(t, lr.Timestamp().AsTime(), step3, step4)
  256. // Event 4 should contain the reverted state of the pod.
  257. lr = logsConsumer.AllLogs()[3].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
  258. attrs = expected["otel.entity.attributes"].(map[string]any)
  259. delete(attrs, "key")
  260. assert.EqualValues(t, expected, lr.Attributes().AsRaw())
  261. assert.WithinRange(t, lr.Timestamp().AsTime(), step4, step5)
  262. // Event 5 should indicate pod deletion.
  263. lr = logsConsumer.AllLogs()[4].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
  264. expected = map[string]any{
  265. "otel.entity.event.type": "entity_delete",
  266. "otel.entity.id": map[string]any{"k8s.pod.uid": "pod0"},
  267. }
  268. assert.EqualValues(t, expected, lr.Attributes().AsRaw())
  269. assert.WithinRange(t, lr.Timestamp().AsTime(), step5, step6)
  270. }
  271. func TestObjMetadata(t *testing.T) {
  272. tests := []struct {
  273. name string
  274. metadataStore *metadata.Store
  275. resource any
  276. want map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata
  277. }{
  278. {
  279. name: "Pod and container metadata simple case",
  280. metadataStore: metadata.NewStore(),
  281. resource: testutils.NewPodWithContainer(
  282. "0",
  283. testutils.NewPodSpecWithContainer("container-name"),
  284. testutils.NewPodStatusWithContainer("container-name", "container-id"),
  285. ),
  286. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  287. experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
  288. EntityType: "k8s.pod",
  289. ResourceIDKey: "k8s.pod.uid",
  290. ResourceID: "test-pod-0-uid",
  291. Metadata: commonPodMetadata,
  292. },
  293. experimentalmetricmetadata.ResourceID("container-id"): {
  294. EntityType: "container",
  295. ResourceIDKey: "container.id",
  296. ResourceID: "container-id",
  297. Metadata: map[string]string{
  298. "container.status": "running",
  299. },
  300. },
  301. },
  302. },
  303. {
  304. name: "Pod with Owner Reference",
  305. metadataStore: metadata.NewStore(),
  306. resource: testutils.WithOwnerReferences([]metav1.OwnerReference{
  307. {
  308. Kind: "StatefulSet",
  309. Name: "test-statefulset-0",
  310. UID: "test-statefulset-0-uid",
  311. },
  312. }, testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{})),
  313. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  314. experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
  315. EntityType: "k8s.pod",
  316. ResourceIDKey: "k8s.pod.uid",
  317. ResourceID: "test-pod-0-uid",
  318. Metadata: allPodMetadata(map[string]string{
  319. "k8s.workload.kind": "StatefulSet",
  320. "k8s.workload.name": "test-statefulset-0",
  321. "k8s.statefulset.name": "test-statefulset-0",
  322. "k8s.statefulset.uid": "test-statefulset-0-uid",
  323. }),
  324. },
  325. },
  326. },
  327. {
  328. name: "Pod with Service metadata",
  329. metadataStore: func() *metadata.Store {
  330. ms := metadata.NewStore()
  331. ms.Setup(gvk.Service, &testutils.MockStore{
  332. Cache: map[string]any{
  333. "test-namespace/test-service": &corev1.Service{
  334. ObjectMeta: metav1.ObjectMeta{
  335. Name: "test-service",
  336. Namespace: "test-namespace",
  337. UID: "test-service-uid",
  338. },
  339. Spec: corev1.ServiceSpec{
  340. Selector: map[string]string{
  341. "k8s-app": "my-app",
  342. },
  343. },
  344. },
  345. },
  346. })
  347. return ms
  348. }(),
  349. resource: podWithAdditionalLabels(
  350. map[string]string{"k8s-app": "my-app"},
  351. testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{}),
  352. ),
  353. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  354. experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
  355. EntityType: "k8s.pod",
  356. ResourceIDKey: "k8s.pod.uid",
  357. ResourceID: "test-pod-0-uid",
  358. Metadata: allPodMetadata(map[string]string{
  359. "k8s.service.test-service": "",
  360. "k8s-app": "my-app",
  361. }),
  362. },
  363. },
  364. },
  365. {
  366. name: "Daemonset simple case",
  367. metadataStore: &metadata.Store{},
  368. resource: testutils.NewDaemonset("1"),
  369. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  370. experimentalmetricmetadata.ResourceID("test-daemonset-1-uid"): {
  371. EntityType: "k8s.daemonset",
  372. ResourceIDKey: "k8s.daemonset.uid",
  373. ResourceID: "test-daemonset-1-uid",
  374. Metadata: map[string]string{
  375. "k8s.workload.kind": "DaemonSet",
  376. "k8s.workload.name": "test-daemonset-1",
  377. "daemonset.creation_timestamp": "0001-01-01T00:00:00Z",
  378. },
  379. },
  380. },
  381. },
  382. {
  383. name: "Deployment simple case",
  384. metadataStore: &metadata.Store{},
  385. resource: testutils.NewDeployment("1"),
  386. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  387. experimentalmetricmetadata.ResourceID("test-deployment-1-uid"): {
  388. EntityType: "k8s.deployment",
  389. ResourceIDKey: "k8s.deployment.uid",
  390. ResourceID: "test-deployment-1-uid",
  391. Metadata: map[string]string{
  392. "k8s.workload.kind": "Deployment",
  393. "k8s.workload.name": "test-deployment-1",
  394. "k8s.deployment.name": "test-deployment-1",
  395. "deployment.creation_timestamp": "0001-01-01T00:00:00Z",
  396. },
  397. },
  398. },
  399. },
  400. {
  401. name: "HPA simple case",
  402. metadataStore: &metadata.Store{},
  403. resource: testutils.NewHPA("1"),
  404. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  405. experimentalmetricmetadata.ResourceID("test-hpa-1-uid"): {
  406. EntityType: "k8s.hpa",
  407. ResourceIDKey: "k8s.hpa.uid",
  408. ResourceID: "test-hpa-1-uid",
  409. Metadata: map[string]string{
  410. "k8s.workload.kind": "HPA",
  411. "k8s.workload.name": "test-hpa-1",
  412. "hpa.creation_timestamp": "0001-01-01T00:00:00Z",
  413. },
  414. },
  415. },
  416. },
  417. {
  418. name: "Job simple case",
  419. metadataStore: &metadata.Store{},
  420. resource: testutils.NewJob("1"),
  421. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  422. experimentalmetricmetadata.ResourceID("test-job-1-uid"): {
  423. EntityType: "k8s.job",
  424. ResourceIDKey: "k8s.job.uid",
  425. ResourceID: "test-job-1-uid",
  426. Metadata: map[string]string{
  427. "foo": "bar",
  428. "foo1": "",
  429. "k8s.workload.kind": "Job",
  430. "k8s.workload.name": "test-job-1",
  431. "job.creation_timestamp": "0001-01-01T00:00:00Z",
  432. },
  433. },
  434. },
  435. },
  436. {
  437. name: "Node simple case",
  438. metadataStore: &metadata.Store{},
  439. resource: testutils.NewNode("1"),
  440. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  441. experimentalmetricmetadata.ResourceID("test-node-1-uid"): {
  442. EntityType: "k8s.node",
  443. ResourceIDKey: "k8s.node.uid",
  444. ResourceID: "test-node-1-uid",
  445. Metadata: map[string]string{
  446. "foo": "bar",
  447. "foo1": "",
  448. "k8s.node.name": "test-node-1",
  449. "node.creation_timestamp": "0001-01-01T00:00:00Z",
  450. },
  451. },
  452. },
  453. },
  454. {
  455. name: "ReplicaSet simple case",
  456. metadataStore: &metadata.Store{},
  457. resource: testutils.NewReplicaSet("1"),
  458. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  459. experimentalmetricmetadata.ResourceID("test-replicaset-1-uid"): {
  460. EntityType: "k8s.replicaset",
  461. ResourceIDKey: "k8s.replicaset.uid",
  462. ResourceID: "test-replicaset-1-uid",
  463. Metadata: map[string]string{
  464. "foo": "bar",
  465. "foo1": "",
  466. "k8s.workload.kind": "ReplicaSet",
  467. "k8s.workload.name": "test-replicaset-1",
  468. "replicaset.creation_timestamp": "0001-01-01T00:00:00Z",
  469. },
  470. },
  471. },
  472. },
  473. {
  474. name: "ReplicationController simple case",
  475. metadataStore: &metadata.Store{},
  476. resource: &corev1.ReplicationController{
  477. ObjectMeta: metav1.ObjectMeta{
  478. Name: "test-replicationcontroller-1",
  479. Namespace: "test-namespace",
  480. UID: types.UID("test-replicationcontroller-1-uid"),
  481. },
  482. },
  483. want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
  484. experimentalmetricmetadata.ResourceID("test-replicationcontroller-1-uid"): {
  485. EntityType: "k8s.replicationcontroller",
  486. ResourceIDKey: "k8s.replicationcontroller.uid",
  487. ResourceID: "test-replicationcontroller-1-uid",
  488. Metadata: map[string]string{
  489. "k8s.workload.kind": "ReplicationController",
  490. "k8s.workload.name": "test-replicationcontroller-1",
  491. "replicationcontroller.creation_timestamp": "0001-01-01T00:00:00Z",
  492. },
  493. },
  494. },
  495. },
  496. }
  497. for _, tt := range tests {
  498. observedLogger, _ := observer.New(zapcore.WarnLevel)
  499. set := receivertest.NewNopCreateSettings()
  500. set.TelemetrySettings.Logger = zap.New(observedLogger)
  501. t.Run(tt.name, func(t *testing.T) {
  502. dc := &resourceWatcher{metadataStore: tt.metadataStore}
  503. actual := dc.objMetadata(tt.resource)
  504. require.Equal(t, len(tt.want), len(actual))
  505. for key, item := range tt.want {
  506. got, exists := actual[key]
  507. require.True(t, exists)
  508. require.Equal(t, *item, *got)
  509. }
  510. })
  511. }
  512. }
  513. var allPodMetadata = func(metadata map[string]string) map[string]string {
  514. out := maps.MergeStringMaps(metadata, commonPodMetadata)
  515. return out
  516. }
  517. func podWithAdditionalLabels(labels map[string]string, pod *corev1.Pod) any {
  518. if pod.Labels == nil {
  519. pod.Labels = make(map[string]string, len(labels))
  520. }
  521. for k, v := range labels {
  522. pod.Labels[k] = v
  523. }
  524. return pod
  525. }