config_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package googlecloudpubsubexporter
  4. import (
  5. "path/filepath"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. "go.opentelemetry.io/collector/component"
  11. "go.opentelemetry.io/collector/confmap/confmaptest"
  12. "go.opentelemetry.io/collector/exporter/exporterhelper"
  13. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudpubsubexporter/internal/metadata"
  14. )
  15. func TestLoadConfig(t *testing.T) {
  16. cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
  17. require.NoError(t, err)
  18. factory := NewFactory()
  19. cfg := factory.CreateDefaultConfig()
  20. sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
  21. require.NoError(t, err)
  22. require.NoError(t, component.UnmarshalConfig(sub, cfg))
  23. defaultConfig := factory.CreateDefaultConfig().(*Config)
  24. assert.Equal(t, cfg, defaultConfig)
  25. sub, err = cm.Sub(component.NewIDWithName(metadata.Type, "customname").String())
  26. require.NoError(t, err)
  27. require.NoError(t, component.UnmarshalConfig(sub, cfg))
  28. customConfig := factory.CreateDefaultConfig().(*Config)
  29. customConfig.ProjectID = "my-project"
  30. customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}"
  31. customConfig.TimeoutSettings = exporterhelper.TimeoutSettings{
  32. Timeout: 20 * time.Second,
  33. }
  34. customConfig.Topic = "projects/my-project/topics/otlp-topic"
  35. customConfig.Compression = "gzip"
  36. customConfig.Watermark.Behavior = "earliest"
  37. customConfig.Watermark.AllowedDrift = time.Hour
  38. assert.Equal(t, cfg, customConfig)
  39. }
  40. func TestTopicConfigValidation(t *testing.T) {
  41. factory := NewFactory()
  42. c := factory.CreateDefaultConfig().(*Config)
  43. assert.Error(t, c.Validate())
  44. c.Topic = "projects/000project/topics/my-topic"
  45. assert.Error(t, c.Validate())
  46. c.Topic = "projects/my-project/subscriptions/my-subscription"
  47. assert.Error(t, c.Validate())
  48. c.Topic = "projects/my-project/topics/my-topic"
  49. assert.NoError(t, c.Validate())
  50. }
  51. func TestCompressionConfigValidation(t *testing.T) {
  52. factory := NewFactory()
  53. c := factory.CreateDefaultConfig().(*Config)
  54. c.Topic = "projects/my-project/topics/my-topic"
  55. assert.NoError(t, c.Validate())
  56. c.Compression = "xxx"
  57. assert.Error(t, c.Validate())
  58. c.Compression = "gzip"
  59. assert.NoError(t, c.Validate())
  60. c.Compression = "none"
  61. assert.Error(t, c.Validate())
  62. c.Compression = ""
  63. assert.NoError(t, c.Validate())
  64. }
  65. func TestWatermarkBehaviorConfigValidation(t *testing.T) {
  66. factory := NewFactory()
  67. c := factory.CreateDefaultConfig().(*Config)
  68. c.Topic = "projects/my-project/topics/my-topic"
  69. assert.NoError(t, c.Validate())
  70. c.Watermark.Behavior = "xxx"
  71. assert.Error(t, c.Validate())
  72. c.Watermark.Behavior = "earliest"
  73. assert.NoError(t, c.Validate())
  74. c.Watermark.Behavior = "none"
  75. assert.Error(t, c.Validate())
  76. c.Watermark.Behavior = "current"
  77. assert.NoError(t, c.Validate())
  78. }
  79. func TestWatermarkDefaultMaxDriftValidation(t *testing.T) {
  80. factory := NewFactory()
  81. c := factory.CreateDefaultConfig().(*Config)
  82. c.Topic = "projects/my-project/topics/my-topic"
  83. assert.NoError(t, c.Validate())
  84. c.Watermark.AllowedDrift = 0
  85. assert.Equal(t, time.Duration(0), c.Watermark.AllowedDrift)
  86. assert.NoError(t, c.Validate())
  87. assert.Equal(t, time.Duration(9223372036854775807), c.Watermark.AllowedDrift)
  88. }