receiver_test.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package k8sclusterreceiver
  4. import (
  5. "context"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. quotaclientset "github.com/openshift/client-go/quota/clientset/versioned"
  10. fakeQuota "github.com/openshift/client-go/quota/clientset/versioned/fake"
  11. "github.com/stretchr/testify/require"
  12. "go.opentelemetry.io/collector/component"
  13. "go.opentelemetry.io/collector/component/componenttest"
  14. "go.opentelemetry.io/collector/consumer"
  15. "go.opentelemetry.io/collector/consumer/consumertest"
  16. "go.opentelemetry.io/collector/obsreport/obsreporttest"
  17. "go.opentelemetry.io/collector/receiver"
  18. corev1 "k8s.io/api/core/v1"
  19. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/runtime/schema"
  21. "k8s.io/client-go/kubernetes"
  22. "k8s.io/client-go/kubernetes/fake"
  23. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
  24. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
  25. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
  26. )
  27. func TestReceiver(t *testing.T) {
  28. tt, err := obsreporttest.SetupTelemetry(component.NewID(metadata.Type))
  29. require.NoError(t, err)
  30. defer func() {
  31. require.NoError(t, tt.Shutdown(context.Background()))
  32. }()
  33. client := newFakeClientWithAllResources()
  34. osQuotaClient := fakeQuota.NewSimpleClientset()
  35. sink := new(consumertest.MetricsSink)
  36. r := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt)
  37. // Setup k8s resources.
  38. numPods := 2
  39. numNodes := 1
  40. numQuotas := 2
  41. numClusterQuotaMetrics := numQuotas * 4
  42. createPods(t, client, numPods)
  43. createNodes(t, client, numNodes)
  44. createClusterQuota(t, osQuotaClient, 2)
  45. ctx := context.Background()
  46. require.NoError(t, r.Start(ctx, componenttest.NewNopHost()))
  47. // Expects metric data from nodes and pods where each metric data
  48. // struct corresponds to one resource.
  49. expectedNumMetrics := numPods + numNodes + numClusterQuotaMetrics
  50. var initialDataPointCount int
  51. require.Eventually(t, func() bool {
  52. initialDataPointCount = sink.DataPointCount()
  53. return initialDataPointCount == expectedNumMetrics
  54. }, 10*time.Second, 100*time.Millisecond,
  55. "metrics not collected")
  56. numPodsToDelete := 1
  57. deletePods(t, client, numPodsToDelete)
  58. // Expects metric data from a node, since other resources were deleted.
  59. expectedNumMetrics = (numPods - numPodsToDelete) + numNodes + numClusterQuotaMetrics
  60. var metricsCountDelta int
  61. require.Eventually(t, func() bool {
  62. metricsCountDelta = sink.DataPointCount() - initialDataPointCount
  63. return metricsCountDelta == expectedNumMetrics
  64. }, 10*time.Second, 100*time.Millisecond,
  65. "updated metrics not collected")
  66. require.NoError(t, r.Shutdown(ctx))
  67. }
  68. func TestReceiverTimesOutAfterStartup(t *testing.T) {
  69. tt, err := obsreporttest.SetupTelemetry(component.NewID(metadata.Type))
  70. require.NoError(t, err)
  71. defer func() {
  72. require.NoError(t, tt.Shutdown(context.Background()))
  73. }()
  74. client := newFakeClientWithAllResources()
  75. // Mock initial cache sync timing out, using a small timeout.
  76. r := setupReceiver(client, nil, consumertest.NewNop(), nil, 1*time.Millisecond, tt)
  77. createPods(t, client, 1)
  78. ctx := context.Background()
  79. require.NoError(t, r.Start(ctx, componenttest.NewNopHost()))
  80. require.Eventually(t, func() bool {
  81. return r.resourceWatcher.initialSyncTimedOut.Load()
  82. }, 10*time.Second, 100*time.Millisecond)
  83. require.NoError(t, r.Shutdown(ctx))
  84. }
  85. func TestReceiverWithManyResources(t *testing.T) {
  86. tt, err := obsreporttest.SetupTelemetry(component.NewID(metadata.Type))
  87. require.NoError(t, err)
  88. defer func() {
  89. require.NoError(t, tt.Shutdown(context.Background()))
  90. }()
  91. client := newFakeClientWithAllResources()
  92. osQuotaClient := fakeQuota.NewSimpleClientset()
  93. sink := new(consumertest.MetricsSink)
  94. r := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt)
  95. numPods := 1000
  96. numQuotas := 2
  97. numExpectedMetrics := numPods + numQuotas*4
  98. createPods(t, client, numPods)
  99. createClusterQuota(t, osQuotaClient, 2)
  100. ctx := context.Background()
  101. require.NoError(t, r.Start(ctx, componenttest.NewNopHost()))
  102. require.Eventually(t, func() bool {
  103. // 4 points from the cluster quota.
  104. return sink.DataPointCount() == numExpectedMetrics
  105. }, 10*time.Second, 100*time.Millisecond,
  106. "metrics not collected")
  107. require.NoError(t, r.Shutdown(ctx))
  108. }
  109. var numCalls *atomic.Int32
  110. var consumeMetadataInvocation = func() {
  111. if numCalls != nil {
  112. numCalls.Add(1)
  113. }
  114. }
  115. func TestReceiverWithMetadata(t *testing.T) {
  116. tt, err := obsreporttest.SetupTelemetry(component.NewID(metadata.Type))
  117. require.NoError(t, err)
  118. defer func() {
  119. require.NoError(t, tt.Shutdown(context.Background()))
  120. }()
  121. client := newFakeClientWithAllResources()
  122. metricsConsumer := &mockExporterWithK8sMetadata{MetricsSink: new(consumertest.MetricsSink)}
  123. numCalls = &atomic.Int32{}
  124. logsConsumer := new(consumertest.LogsSink)
  125. r := setupReceiver(client, nil, metricsConsumer, logsConsumer, 10*time.Second, tt)
  126. r.config.MetadataExporters = []string{"nop/withmetadata"}
  127. // Setup k8s resources.
  128. pods := createPods(t, client, 1)
  129. ctx := context.Background()
  130. require.NoError(t, r.Start(ctx, newNopHostWithExporters()))
  131. // Mock an update on the Pod object. It appears that the fake clientset
  132. // does not pass on events for updates to resources.
  133. require.Len(t, pods, 1)
  134. updatedPod := getUpdatedPod(pods[0])
  135. r.resourceWatcher.onUpdate(pods[0], updatedPod)
  136. // Should not result in ConsumerKubernetesMetadata invocation since the pod
  137. // is not changed. Should result in entity event because they are emitted even
  138. // if the entity is not changed.
  139. r.resourceWatcher.onUpdate(updatedPod, updatedPod)
  140. deletePods(t, client, 1)
  141. // Ensure ConsumeKubernetesMetadata is called twice, once for the add and
  142. // then for the update. Note the second update does not result in metatada call
  143. // since the pod is not changed.
  144. require.Eventually(t, func() bool {
  145. return int(numCalls.Load()) == 2
  146. }, 10*time.Second, 100*time.Millisecond,
  147. "metadata not collected")
  148. // Must have 3 entity events: once for the add, followed by an update and
  149. // then another update, which unlike metadata calls actually happens since
  150. // even unchanged entities trigger an event.
  151. require.Eventually(t, func() bool {
  152. return logsConsumer.LogRecordCount() == 3
  153. }, 10*time.Second, 100*time.Millisecond,
  154. "entity events not collected")
  155. require.NoError(t, r.Shutdown(ctx))
  156. }
  157. func getUpdatedPod(pod *corev1.Pod) any {
  158. return &corev1.Pod{
  159. ObjectMeta: v1.ObjectMeta{
  160. Name: pod.Name,
  161. Namespace: pod.Namespace,
  162. UID: pod.UID,
  163. Labels: map[string]string{
  164. "key": "value",
  165. },
  166. },
  167. }
  168. }
  169. func setupReceiver(
  170. client *fake.Clientset,
  171. osQuotaClient quotaclientset.Interface,
  172. metricsConsumer consumer.Metrics,
  173. logsConsumer consumer.Logs,
  174. initialSyncTimeout time.Duration,
  175. tt obsreporttest.TestTelemetry) *kubernetesReceiver {
  176. distribution := distributionKubernetes
  177. if osQuotaClient != nil {
  178. distribution = distributionOpenShift
  179. }
  180. config := &Config{
  181. CollectionInterval: 1 * time.Second,
  182. NodeConditionTypesToReport: []string{"Ready"},
  183. AllocatableTypesToReport: []string{"cpu", "memory"},
  184. Distribution: distribution,
  185. MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
  186. }
  187. r, _ := newReceiver(context.Background(), receiver.CreateSettings{ID: component.NewID(metadata.Type), TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()}, config)
  188. kr := r.(*kubernetesReceiver)
  189. kr.metricsConsumer = metricsConsumer
  190. kr.resourceWatcher.makeClient = func(_ k8sconfig.APIConfig) (kubernetes.Interface, error) {
  191. return client, nil
  192. }
  193. kr.resourceWatcher.makeOpenShiftQuotaClient = func(_ k8sconfig.APIConfig) (quotaclientset.Interface, error) {
  194. return osQuotaClient, nil
  195. }
  196. kr.resourceWatcher.initialTimeout = initialSyncTimeout
  197. kr.resourceWatcher.entityLogConsumer = logsConsumer
  198. return kr
  199. }
  200. func newFakeClientWithAllResources() *fake.Clientset {
  201. client := fake.NewSimpleClientset()
  202. client.Resources = []*v1.APIResourceList{
  203. {
  204. GroupVersion: "v1",
  205. APIResources: []v1.APIResource{
  206. gvkToAPIResource(gvk.Pod),
  207. gvkToAPIResource(gvk.Node),
  208. gvkToAPIResource(gvk.Namespace),
  209. gvkToAPIResource(gvk.ReplicationController),
  210. gvkToAPIResource(gvk.ResourceQuota),
  211. gvkToAPIResource(gvk.Service),
  212. },
  213. },
  214. {
  215. GroupVersion: "apps/v1",
  216. APIResources: []v1.APIResource{
  217. gvkToAPIResource(gvk.DaemonSet),
  218. gvkToAPIResource(gvk.Deployment),
  219. gvkToAPIResource(gvk.ReplicaSet),
  220. gvkToAPIResource(gvk.StatefulSet),
  221. },
  222. },
  223. {
  224. GroupVersion: "batch/v1",
  225. APIResources: []v1.APIResource{
  226. gvkToAPIResource(gvk.Job),
  227. gvkToAPIResource(gvk.CronJob),
  228. },
  229. },
  230. {
  231. GroupVersion: "autoscaling/v2",
  232. APIResources: []v1.APIResource{
  233. gvkToAPIResource(gvk.HorizontalPodAutoscaler),
  234. },
  235. },
  236. }
  237. return client
  238. }
  239. func gvkToAPIResource(gvk schema.GroupVersionKind) v1.APIResource {
  240. return v1.APIResource{
  241. Group: gvk.Group,
  242. Version: gvk.Version,
  243. Kind: gvk.Kind,
  244. }
  245. }