e2e_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. //go:build e2e
  4. // +build e2e
  5. package k8sclusterreceiver
  6. import (
  7. "context"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/google/uuid"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. "go.opentelemetry.io/collector/component/componenttest"
  16. "go.opentelemetry.io/collector/consumer/consumertest"
  17. "go.opentelemetry.io/collector/pdata/pmetric"
  18. "go.opentelemetry.io/collector/receiver/otlpreceiver"
  19. "go.opentelemetry.io/collector/receiver/receivertest"
  20. "k8s.io/client-go/dynamic"
  21. "k8s.io/client-go/tools/clientcmd"
  22. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest"
  23. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
  24. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
  25. )
  26. const testKubeConfig = "/tmp/kube-config-otelcol-e2e-testing"
  27. // TestE2E tests the k8s cluster receiver with a real k8s cluster.
  28. // The test requires a prebuilt otelcontribcol image uploaded to a kind k8s cluster defined in
  29. // `/tmp/kube-config-otelcol-e2e-testing`. Run the following command prior to running the test locally:
  30. //
  31. // kind create cluster --kubeconfig=/tmp/kube-config-otelcol-e2e-testing
  32. // make docker-otelcontribcol
  33. // KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest
  34. func TestE2E(t *testing.T) {
  35. var expected pmetric.Metrics
  36. expectedFile := filepath.Join("testdata", "e2e", "expected.yaml")
  37. expected, err := golden.ReadMetrics(expectedFile)
  38. require.NoError(t, err)
  39. kubeConfig, err := clientcmd.BuildConfigFromFlags("", testKubeConfig)
  40. require.NoError(t, err)
  41. dynamicClient, err := dynamic.NewForConfig(kubeConfig)
  42. require.NoError(t, err)
  43. testID := uuid.NewString()[:8]
  44. collectorObjs := k8stest.CreateCollectorObjects(t, dynamicClient, testID)
  45. defer func() {
  46. for _, obj := range append(collectorObjs) {
  47. require.NoErrorf(t, k8stest.DeleteObject(dynamicClient, obj), "failed to delete object %s", obj.GetName())
  48. }
  49. }()
  50. metricsConsumer := new(consumertest.MetricsSink)
  51. wantEntries := 10 // Minimal number of metrics to wait for.
  52. waitForData(t, wantEntries, metricsConsumer)
  53. replaceWithStar := func(string) string { return "*" }
  54. shortenNames := func(value string) string {
  55. if strings.HasPrefix(value, "kube-proxy") {
  56. return "kube-proxy"
  57. }
  58. if strings.HasPrefix(value, "local-path-provisioner") {
  59. return "local-path-provisioner"
  60. }
  61. if strings.HasPrefix(value, "kindnet") {
  62. return "kindnet"
  63. }
  64. if strings.HasPrefix(value, "coredns") {
  65. return "coredns"
  66. }
  67. if strings.HasPrefix(value, "otelcol") {
  68. return "otelcol"
  69. }
  70. return value
  71. }
  72. containerImageShorten := func(value string) string {
  73. return value[(strings.LastIndex(value, "/") + 1):]
  74. }
  75. require.NoError(t, pmetrictest.CompareMetrics(expected, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1],
  76. pmetrictest.IgnoreTimestamp(),
  77. pmetrictest.IgnoreStartTimestamp(),
  78. pmetrictest.IgnoreMetricValues("k8s.deployment.desired", "k8s.deployment.available", "k8s.container.restarts", "k8s.container.cpu_request", "k8s.container.memory_request", "k8s.container.memory_limit"),
  79. pmetrictest.ChangeResourceAttributeValue("k8s.deployment.name", shortenNames),
  80. pmetrictest.ChangeResourceAttributeValue("k8s.pod.name", shortenNames),
  81. pmetrictest.ChangeResourceAttributeValue("k8s.replicaset.name", shortenNames),
  82. pmetrictest.ChangeResourceAttributeValue("k8s.deployment.uid", replaceWithStar),
  83. pmetrictest.ChangeResourceAttributeValue("k8s.pod.uid", replaceWithStar),
  84. pmetrictest.ChangeResourceAttributeValue("k8s.replicaset.uid", replaceWithStar),
  85. pmetrictest.ChangeResourceAttributeValue("container.id", replaceWithStar),
  86. pmetrictest.ChangeResourceAttributeValue("container.image.tag", replaceWithStar),
  87. pmetrictest.ChangeResourceAttributeValue("k8s.node.uid", replaceWithStar),
  88. pmetrictest.ChangeResourceAttributeValue("k8s.namespace.uid", replaceWithStar),
  89. pmetrictest.ChangeResourceAttributeValue("k8s.daemonset.uid", replaceWithStar),
  90. pmetrictest.ChangeResourceAttributeValue("container.image.name", containerImageShorten),
  91. pmetrictest.IgnoreScopeVersion(),
  92. pmetrictest.IgnoreResourceMetricsOrder(),
  93. pmetrictest.IgnoreMetricsOrder(),
  94. pmetrictest.IgnoreScopeMetricsOrder(),
  95. pmetrictest.IgnoreMetricDataPointsOrder(),
  96. ),
  97. )
  98. }
  99. func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink) {
  100. f := otlpreceiver.NewFactory()
  101. cfg := f.CreateDefaultConfig().(*otlpreceiver.Config)
  102. rcvr, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, mc)
  103. require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
  104. require.NoError(t, err, "failed creating metrics receiver")
  105. defer func() {
  106. assert.NoError(t, rcvr.Shutdown(context.Background()))
  107. }()
  108. timeoutMinutes := 3
  109. require.Eventuallyf(t, func() bool {
  110. return len(mc.AllMetrics()) > entriesNum
  111. }, time.Duration(timeoutMinutes)*time.Minute, 1*time.Second,
  112. "failed to receive %d entries, received %d metrics in %d minutes", entriesNum,
  113. len(mc.AllMetrics()), timeoutMinutes)
  114. }