config_test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkareceiver
  4. import (
  5. "path/filepath"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. "go.opentelemetry.io/collector/component"
  11. "go.opentelemetry.io/collector/config/configtls"
  12. "go.opentelemetry.io/collector/confmap/confmaptest"
  13. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
  14. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata"
  16. )
  17. func TestLoadConfig(t *testing.T) {
  18. t.Parallel()
  19. cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
  20. require.NoError(t, err)
  21. tests := []struct {
  22. id component.ID
  23. expected component.Config
  24. expectedErr error
  25. }{
  26. {
  27. id: component.NewIDWithName(metadata.Type, ""),
  28. expected: &Config{
  29. Topic: "spans",
  30. Encoding: "otlp_proto",
  31. Brokers: []string{"foo:123", "bar:456"},
  32. ResolveCanonicalBootstrapServersOnly: true,
  33. ClientID: "otel-collector",
  34. GroupID: "otel-collector",
  35. InitialOffset: "latest",
  36. Authentication: kafka.Authentication{
  37. TLS: &configtls.TLSClientSetting{
  38. TLSSetting: configtls.TLSSetting{
  39. CAFile: "ca.pem",
  40. CertFile: "cert.pem",
  41. KeyFile: "key.pem",
  42. },
  43. },
  44. },
  45. Metadata: kafkaexporter.Metadata{
  46. Full: true,
  47. Retry: kafkaexporter.MetadataRetry{
  48. Max: 10,
  49. Backoff: time.Second * 5,
  50. },
  51. },
  52. AutoCommit: AutoCommit{
  53. Enable: true,
  54. Interval: 1 * time.Second,
  55. },
  56. },
  57. },
  58. {
  59. id: component.NewIDWithName(metadata.Type, "logs"),
  60. expected: &Config{
  61. Topic: "logs",
  62. Encoding: "direct",
  63. Brokers: []string{"coffee:123", "foobar:456"},
  64. ClientID: "otel-collector",
  65. GroupID: "otel-collector",
  66. InitialOffset: "earliest",
  67. Authentication: kafka.Authentication{
  68. TLS: &configtls.TLSClientSetting{
  69. TLSSetting: configtls.TLSSetting{
  70. CAFile: "ca.pem",
  71. CertFile: "cert.pem",
  72. KeyFile: "key.pem",
  73. },
  74. },
  75. },
  76. Metadata: kafkaexporter.Metadata{
  77. Full: true,
  78. Retry: kafkaexporter.MetadataRetry{
  79. Max: 10,
  80. Backoff: time.Second * 5,
  81. },
  82. },
  83. AutoCommit: AutoCommit{
  84. Enable: true,
  85. Interval: 1 * time.Second,
  86. },
  87. },
  88. },
  89. }
  90. for _, tt := range tests {
  91. t.Run(tt.id.String(), func(t *testing.T) {
  92. factory := NewFactory()
  93. cfg := factory.CreateDefaultConfig()
  94. sub, err := cm.Sub(tt.id.String())
  95. require.NoError(t, err)
  96. require.NoError(t, component.UnmarshalConfig(sub, cfg))
  97. assert.NoError(t, component.ValidateConfig(cfg))
  98. assert.Equal(t, tt.expected, cfg)
  99. })
  100. }
  101. }