123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package k8sclusterreceiver
- import (
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/receiver/receivertest"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- "go.uber.org/zap/zaptest/observer"
- corev1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes/fake"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps"
- "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
- )
- var commonPodMetadata = map[string]string{
- "foo": "bar",
- "foo1": "",
- "pod.creation_timestamp": "0001-01-01T00:00:00Z",
- }
- func TestSetupMetadataExporters(t *testing.T) {
- type fields struct {
- metadataConsumers []metadataConsumer
- }
- type args struct {
- exporters map[component.ID]component.Component
- metadataExportersFromConfig []string
- }
- tests := []struct {
- name string
- fields fields
- args args
- wantErr bool
- }{
- {
- "Unsupported exporter",
- fields{},
- args{
- exporters: map[component.ID]component.Component{
- component.NewID("nop"): MockExporter{},
- },
- metadataExportersFromConfig: []string{"nop"},
- },
- true,
- },
- {
- "Supported exporter",
- fields{
- metadataConsumers: []metadataConsumer{(&mockExporterWithK8sMetadata{}).ConsumeMetadata},
- },
- args{exporters: map[component.ID]component.Component{
- component.NewID("nop"): mockExporterWithK8sMetadata{},
- },
- metadataExportersFromConfig: []string{"nop"},
- },
- false,
- },
- {
- "Non-existent exporter",
- fields{
- metadataConsumers: []metadataConsumer{},
- },
- args{exporters: map[component.ID]component.Component{
- component.NewID("nop"): mockExporterWithK8sMetadata{},
- },
- metadataExportersFromConfig: []string{"nop/1"},
- },
- true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- rw := &resourceWatcher{
- logger: zap.NewNop(),
- }
- if err := rw.setupMetadataExporters(tt.args.exporters, tt.args.metadataExportersFromConfig); (err != nil) != tt.wantErr {
- t.Errorf("setupMetadataExporters() error = %v, wantErr %v", err, tt.wantErr)
- }
- require.Equal(t, len(tt.fields.metadataConsumers), len(rw.metadataConsumers))
- })
- }
- }
- func TestIsKindSupported(t *testing.T) {
- var tests = []struct {
- name string
- client *fake.Clientset
- gvk schema.GroupVersionKind
- expected bool
- }{
- {
- name: "nothing_supported",
- client: fake.NewSimpleClientset(),
- gvk: gvk.Pod,
- expected: false,
- },
- {
- name: "all_kinds_supported",
- client: newFakeClientWithAllResources(),
- gvk: gvk.Pod,
- expected: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- rw := &resourceWatcher{
- client: tt.client,
- logger: zap.NewNop(),
- }
- supported, err := rw.isKindSupported(tt.gvk)
- assert.NoError(t, err)
- assert.Equal(t, tt.expected, supported)
- })
- }
- }
- func TestPrepareSharedInformerFactory(t *testing.T) {
- var tests = []struct {
- name string
- client *fake.Clientset
- }{
- {
- name: "new_server_version",
- client: newFakeClientWithAllResources(),
- },
- {
- name: "old_server_version", // With no batch/v1.CronJob support.
- client: func() *fake.Clientset {
- client := fake.NewSimpleClientset()
- client.Resources = []*metav1.APIResourceList{
- {
- GroupVersion: "v1",
- APIResources: []metav1.APIResource{
- gvkToAPIResource(gvk.Pod),
- gvkToAPIResource(gvk.Node),
- gvkToAPIResource(gvk.Namespace),
- gvkToAPIResource(gvk.ReplicationController),
- gvkToAPIResource(gvk.ResourceQuota),
- gvkToAPIResource(gvk.Service),
- },
- },
- {
- GroupVersion: "apps/v1",
- APIResources: []metav1.APIResource{
- gvkToAPIResource(gvk.DaemonSet),
- gvkToAPIResource(gvk.Deployment),
- gvkToAPIResource(gvk.ReplicaSet),
- gvkToAPIResource(gvk.StatefulSet),
- },
- },
- {
- GroupVersion: "batch/v1",
- APIResources: []metav1.APIResource{
- gvkToAPIResource(gvk.Job),
- },
- },
- {
- GroupVersion: "autoscaling/v2",
- APIResources: []metav1.APIResource{
- gvkToAPIResource(gvk.HorizontalPodAutoscaler),
- },
- },
- }
- return client
- }(),
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- obs, logs := observer.New(zap.WarnLevel)
- obsLogger := zap.New(obs)
- rw := &resourceWatcher{
- client: newFakeClientWithAllResources(),
- logger: obsLogger,
- metadataStore: metadata.NewStore(),
- config: &Config{},
- }
- assert.NoError(t, rw.prepareSharedInformerFactory())
- // Make sure no warning or error logs are raised
- assert.Equal(t, 0, logs.Len())
- })
- }
- }
- func TestSetupInformerForKind(t *testing.T) {
- obs, logs := observer.New(zap.WarnLevel)
- obsLogger := zap.New(obs)
- rw := &resourceWatcher{
- client: newFakeClientWithAllResources(),
- logger: obsLogger,
- }
- factory := informers.NewSharedInformerFactoryWithOptions(rw.client, 0)
- rw.setupInformerForKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "WrongKind"}, factory)
- assert.Equal(t, 1, logs.Len())
- assert.Equal(t, "Could not setup an informer for provided group version kind", logs.All()[0].Entry.Message)
- }
- func TestSyncMetadataAndEmitEntityEvents(t *testing.T) {
- client := newFakeClientWithAllResources()
- logsConsumer := new(consumertest.LogsSink)
- // Setup k8s resources.
- pods := createPods(t, client, 1)
- origPod := pods[0]
- updatedPod := getUpdatedPod(origPod)
- rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{}, metadata.NewStore())
- rw.entityLogConsumer = logsConsumer
- step1 := time.Now()
- // Make some changes to the pod. Each change should result in an entity event represented
- // as a log record.
- // Pod is created.
- rw.syncMetadataUpdate(nil, rw.objMetadata(origPod))
- step2 := time.Now()
- // Pod is updated.
- rw.syncMetadataUpdate(rw.objMetadata(origPod), rw.objMetadata(updatedPod))
- step3 := time.Now()
- // Pod is updated again, but nothing changed in the pod.
- // Should still result in entity event because they are emitted even
- // if the entity is not changed.
- rw.syncMetadataUpdate(rw.objMetadata(updatedPod), rw.objMetadata(updatedPod))
- step4 := time.Now()
- // Change pod's state back to original
- rw.syncMetadataUpdate(rw.objMetadata(updatedPod), rw.objMetadata(origPod))
- step5 := time.Now()
- // Delete the pod
- rw.syncMetadataUpdate(rw.objMetadata(origPod), nil)
- step6 := time.Now()
- // Must have 5 entity events.
- require.EqualValues(t, 5, logsConsumer.LogRecordCount())
- // Event 1 should contain the initial state of the pod.
- lr := logsConsumer.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
- expected := map[string]any{
- "otel.entity.event.type": "entity_state",
- "otel.entity.type": "k8s.pod",
- "otel.entity.id": map[string]any{"k8s.pod.uid": "pod0"},
- "otel.entity.attributes": map[string]any{"pod.creation_timestamp": "0001-01-01T00:00:00Z"},
- }
- assert.EqualValues(t, expected, lr.Attributes().AsRaw())
- assert.WithinRange(t, lr.Timestamp().AsTime(), step1, step2)
- // Event 2 should contain the updated state of the pod.
- lr = logsConsumer.AllLogs()[1].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
- attrs := expected["otel.entity.attributes"].(map[string]any)
- attrs["key"] = "value"
- assert.EqualValues(t, expected, lr.Attributes().AsRaw())
- assert.WithinRange(t, lr.Timestamp().AsTime(), step2, step3)
- // Event 3 should be identical to the previous one since pod state didn't change.
- lr = logsConsumer.AllLogs()[2].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
- assert.EqualValues(t, expected, lr.Attributes().AsRaw())
- assert.WithinRange(t, lr.Timestamp().AsTime(), step3, step4)
- // Event 4 should contain the reverted state of the pod.
- lr = logsConsumer.AllLogs()[3].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
- attrs = expected["otel.entity.attributes"].(map[string]any)
- delete(attrs, "key")
- assert.EqualValues(t, expected, lr.Attributes().AsRaw())
- assert.WithinRange(t, lr.Timestamp().AsTime(), step4, step5)
- // Event 5 should indicate pod deletion.
- lr = logsConsumer.AllLogs()[4].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
- expected = map[string]any{
- "otel.entity.event.type": "entity_delete",
- "otel.entity.id": map[string]any{"k8s.pod.uid": "pod0"},
- }
- assert.EqualValues(t, expected, lr.Attributes().AsRaw())
- assert.WithinRange(t, lr.Timestamp().AsTime(), step5, step6)
- }
- func TestObjMetadata(t *testing.T) {
- tests := []struct {
- name string
- metadataStore *metadata.Store
- resource any
- want map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata
- }{
- {
- name: "Pod and container metadata simple case",
- metadataStore: metadata.NewStore(),
- resource: testutils.NewPodWithContainer(
- "0",
- testutils.NewPodSpecWithContainer("container-name"),
- testutils.NewPodStatusWithContainer("container-name", "container-id"),
- ),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
- EntityType: "k8s.pod",
- ResourceIDKey: "k8s.pod.uid",
- ResourceID: "test-pod-0-uid",
- Metadata: commonPodMetadata,
- },
- experimentalmetricmetadata.ResourceID("container-id"): {
- EntityType: "container",
- ResourceIDKey: "container.id",
- ResourceID: "container-id",
- Metadata: map[string]string{
- "container.status": "running",
- },
- },
- },
- },
- {
- name: "Pod with Owner Reference",
- metadataStore: metadata.NewStore(),
- resource: testutils.WithOwnerReferences([]metav1.OwnerReference{
- {
- Kind: "StatefulSet",
- Name: "test-statefulset-0",
- UID: "test-statefulset-0-uid",
- },
- }, testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{})),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
- EntityType: "k8s.pod",
- ResourceIDKey: "k8s.pod.uid",
- ResourceID: "test-pod-0-uid",
- Metadata: allPodMetadata(map[string]string{
- "k8s.workload.kind": "StatefulSet",
- "k8s.workload.name": "test-statefulset-0",
- "k8s.statefulset.name": "test-statefulset-0",
- "k8s.statefulset.uid": "test-statefulset-0-uid",
- }),
- },
- },
- },
- {
- name: "Pod with Service metadata",
- metadataStore: func() *metadata.Store {
- ms := metadata.NewStore()
- ms.Setup(gvk.Service, &testutils.MockStore{
- Cache: map[string]any{
- "test-namespace/test-service": &corev1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: "test-service",
- Namespace: "test-namespace",
- UID: "test-service-uid",
- },
- Spec: corev1.ServiceSpec{
- Selector: map[string]string{
- "k8s-app": "my-app",
- },
- },
- },
- },
- })
- return ms
- }(),
- resource: podWithAdditionalLabels(
- map[string]string{"k8s-app": "my-app"},
- testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{}),
- ),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
- EntityType: "k8s.pod",
- ResourceIDKey: "k8s.pod.uid",
- ResourceID: "test-pod-0-uid",
- Metadata: allPodMetadata(map[string]string{
- "k8s.service.test-service": "",
- "k8s-app": "my-app",
- }),
- },
- },
- },
- {
- name: "Daemonset simple case",
- metadataStore: &metadata.Store{},
- resource: testutils.NewDaemonset("1"),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-daemonset-1-uid"): {
- EntityType: "k8s.daemonset",
- ResourceIDKey: "k8s.daemonset.uid",
- ResourceID: "test-daemonset-1-uid",
- Metadata: map[string]string{
- "k8s.workload.kind": "DaemonSet",
- "k8s.workload.name": "test-daemonset-1",
- "daemonset.creation_timestamp": "0001-01-01T00:00:00Z",
- },
- },
- },
- },
- {
- name: "Deployment simple case",
- metadataStore: &metadata.Store{},
- resource: testutils.NewDeployment("1"),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-deployment-1-uid"): {
- EntityType: "k8s.deployment",
- ResourceIDKey: "k8s.deployment.uid",
- ResourceID: "test-deployment-1-uid",
- Metadata: map[string]string{
- "k8s.workload.kind": "Deployment",
- "k8s.workload.name": "test-deployment-1",
- "k8s.deployment.name": "test-deployment-1",
- "deployment.creation_timestamp": "0001-01-01T00:00:00Z",
- },
- },
- },
- },
- {
- name: "HPA simple case",
- metadataStore: &metadata.Store{},
- resource: testutils.NewHPA("1"),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-hpa-1-uid"): {
- EntityType: "k8s.hpa",
- ResourceIDKey: "k8s.hpa.uid",
- ResourceID: "test-hpa-1-uid",
- Metadata: map[string]string{
- "k8s.workload.kind": "HPA",
- "k8s.workload.name": "test-hpa-1",
- "hpa.creation_timestamp": "0001-01-01T00:00:00Z",
- },
- },
- },
- },
- {
- name: "Job simple case",
- metadataStore: &metadata.Store{},
- resource: testutils.NewJob("1"),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-job-1-uid"): {
- EntityType: "k8s.job",
- ResourceIDKey: "k8s.job.uid",
- ResourceID: "test-job-1-uid",
- Metadata: map[string]string{
- "foo": "bar",
- "foo1": "",
- "k8s.workload.kind": "Job",
- "k8s.workload.name": "test-job-1",
- "job.creation_timestamp": "0001-01-01T00:00:00Z",
- },
- },
- },
- },
- {
- name: "Node simple case",
- metadataStore: &metadata.Store{},
- resource: testutils.NewNode("1"),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-node-1-uid"): {
- EntityType: "k8s.node",
- ResourceIDKey: "k8s.node.uid",
- ResourceID: "test-node-1-uid",
- Metadata: map[string]string{
- "foo": "bar",
- "foo1": "",
- "k8s.node.name": "test-node-1",
- "node.creation_timestamp": "0001-01-01T00:00:00Z",
- },
- },
- },
- },
- {
- name: "ReplicaSet simple case",
- metadataStore: &metadata.Store{},
- resource: testutils.NewReplicaSet("1"),
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-replicaset-1-uid"): {
- EntityType: "k8s.replicaset",
- ResourceIDKey: "k8s.replicaset.uid",
- ResourceID: "test-replicaset-1-uid",
- Metadata: map[string]string{
- "foo": "bar",
- "foo1": "",
- "k8s.workload.kind": "ReplicaSet",
- "k8s.workload.name": "test-replicaset-1",
- "replicaset.creation_timestamp": "0001-01-01T00:00:00Z",
- },
- },
- },
- },
- {
- name: "ReplicationController simple case",
- metadataStore: &metadata.Store{},
- resource: &corev1.ReplicationController{
- ObjectMeta: metav1.ObjectMeta{
- Name: "test-replicationcontroller-1",
- Namespace: "test-namespace",
- UID: types.UID("test-replicationcontroller-1-uid"),
- },
- },
- want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
- experimentalmetricmetadata.ResourceID("test-replicationcontroller-1-uid"): {
- EntityType: "k8s.replicationcontroller",
- ResourceIDKey: "k8s.replicationcontroller.uid",
- ResourceID: "test-replicationcontroller-1-uid",
- Metadata: map[string]string{
- "k8s.workload.kind": "ReplicationController",
- "k8s.workload.name": "test-replicationcontroller-1",
- "replicationcontroller.creation_timestamp": "0001-01-01T00:00:00Z",
- },
- },
- },
- },
- }
- for _, tt := range tests {
- observedLogger, _ := observer.New(zapcore.WarnLevel)
- set := receivertest.NewNopCreateSettings()
- set.TelemetrySettings.Logger = zap.New(observedLogger)
- t.Run(tt.name, func(t *testing.T) {
- dc := &resourceWatcher{metadataStore: tt.metadataStore}
- actual := dc.objMetadata(tt.resource)
- require.Equal(t, len(tt.want), len(actual))
- for key, item := range tt.want {
- got, exists := actual[key]
- require.True(t, exists)
- require.Equal(t, *item, *got)
- }
- })
- }
- }
- var allPodMetadata = func(metadata map[string]string) map[string]string {
- out := maps.MergeStringMaps(metadata, commonPodMetadata)
- return out
- }
- func podWithAdditionalLabels(labels map[string]string, pod *corev1.Pod) any {
- if pod.Labels == nil {
- pod.Labels = make(map[string]string, len(labels))
- }
- for k, v := range labels {
- pod.Labels[k] = v
- }
- return pod
- }
|