config.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package googlecloudpubsubexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudpubsubexporter"
  4. import (
  5. "fmt"
  6. "regexp"
  7. "time"
  8. "go.opentelemetry.io/collector/exporter/exporterhelper"
  9. )
  10. var topicMatcher = regexp.MustCompile(`^projects/[a-z][a-z0-9\-]*/topics/`)
  11. type Config struct {
  12. // Timeout for all API calls. If not set, defaults to 12 seconds.
  13. exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
  14. exporterhelper.QueueSettings `mapstructure:"sending_queue"`
  15. exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
  16. // Google Cloud Project ID where the Pubsub client will connect to
  17. ProjectID string `mapstructure:"project"`
  18. // User agent that will be used by the Pubsub client to connect to the service
  19. UserAgent string `mapstructure:"user_agent"`
  20. // Override of the Pubsub endpoint, for testing only
  21. endpoint string
  22. // Only has effect if Endpoint is not ""
  23. insecure bool
  24. // The fully qualified resource name of the Pubsub topic
  25. Topic string `mapstructure:"topic"`
  26. // Compression of the payload (only gzip or is supported, no compression is the default)
  27. Compression string `mapstructure:"compression"`
  28. // Watermark defines the watermark (the ce-time attribute on the message) behavior
  29. Watermark WatermarkConfig `mapstructure:"watermark"`
  30. }
  31. // WatermarkConfig customizes the behavior of the watermark
  32. type WatermarkConfig struct {
  33. // Behavior of the watermark. Currently, only of the message (none, earliest and current, current being the default)
  34. // will set the timestamp on pubsub based on timestamps of the events inside the message
  35. Behavior string `mapstructure:"behavior"`
  36. // Indication on how much the timestamp can drift from the current time, the timestamp will be capped to the allowed
  37. // maximum. A duration of 0 is the same as maximum duration
  38. AllowedDrift time.Duration `mapstructure:"allowed_drift"`
  39. }
  40. func (config *Config) Validate() error {
  41. if !topicMatcher.MatchString(config.Topic) {
  42. return fmt.Errorf("topic '%s' is not a valid format, use 'projects/<project_id>/topics/<name>'", config.Topic)
  43. }
  44. _, err := config.parseCompression()
  45. if err != nil {
  46. return err
  47. }
  48. return config.Watermark.validate()
  49. }
  50. func (config *WatermarkConfig) validate() error {
  51. if config.AllowedDrift == 0 {
  52. config.AllowedDrift = 1<<63 - 1
  53. }
  54. _, err := config.parseWatermarkBehavior()
  55. return err
  56. }
  57. func (config *Config) parseCompression() (compression, error) {
  58. switch config.Compression {
  59. case "gzip":
  60. return gZip, nil
  61. case "":
  62. return uncompressed, nil
  63. }
  64. return uncompressed, fmt.Errorf("compression %v is not supported. supported compression formats include [gzip]", config.Compression)
  65. }
  66. func (config *WatermarkConfig) parseWatermarkBehavior() (WatermarkBehavior, error) {
  67. switch config.Behavior {
  68. case "earliest":
  69. return earliest, nil
  70. case "current":
  71. return current, nil
  72. case "":
  73. return current, nil
  74. }
  75. return current, fmt.Errorf("behavior %v is not supported. supported compression formats include [current,earliest]", config.Behavior)
  76. }