authentication.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
  4. import (
  5. "crypto/sha256"
  6. "crypto/sha512"
  7. "fmt"
  8. "github.com/IBM/sarama"
  9. "go.opentelemetry.io/collector/config/configtls"
  10. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/awsmsk"
  11. )
  12. // Authentication defines authentication.
  13. type Authentication struct {
  14. PlainText *PlainTextConfig `mapstructure:"plain_text"`
  15. SASL *SASLConfig `mapstructure:"sasl"`
  16. TLS *configtls.TLSClientSetting `mapstructure:"tls"`
  17. Kerberos *KerberosConfig `mapstructure:"kerberos"`
  18. }
  19. // PlainTextConfig defines plaintext authentication.
  20. type PlainTextConfig struct {
  21. Username string `mapstructure:"username"`
  22. Password string `mapstructure:"password"`
  23. }
  24. // SASLConfig defines the configuration for the SASL authentication.
  25. type SASLConfig struct {
  26. // Username to be used on authentication
  27. Username string `mapstructure:"username"`
  28. // Password to be used on authentication
  29. Password string `mapstructure:"password"`
  30. // SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, SCRAM-SHA-256 or SCRAM-SHA-512).
  31. Mechanism string `mapstructure:"mechanism"`
  32. // SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0.
  33. Version int `mapstructure:"version"`
  34. AWSMSK AWSMSKConfig `mapstructure:"aws_msk"`
  35. }
  36. // AWSMSKConfig defines the additional SASL authentication
  37. // measures needed to use AWS_MSK_IAM mechanism
  38. type AWSMSKConfig struct {
  39. // Region is the AWS region the MSK cluster is based in
  40. Region string `mapstructure:"region"`
  41. // BrokerAddr is the client is connecting to in order to perform the auth required
  42. BrokerAddr string `mapstructure:"broker_addr"`
  43. }
  44. // KerberosConfig defines kereros configuration.
  45. type KerberosConfig struct {
  46. ServiceName string `mapstructure:"service_name"`
  47. Realm string `mapstructure:"realm"`
  48. UseKeyTab bool `mapstructure:"use_keytab"`
  49. Username string `mapstructure:"username"`
  50. Password string `mapstructure:"password" json:"-"`
  51. ConfigPath string `mapstructure:"config_file"`
  52. KeyTabPath string `mapstructure:"keytab_file"`
  53. }
  54. // ConfigureAuthentication configures authentication in sarama.Config.
  55. func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) error {
  56. if config.PlainText != nil {
  57. configurePlaintext(*config.PlainText, saramaConfig)
  58. }
  59. if config.TLS != nil {
  60. if err := configureTLS(*config.TLS, saramaConfig); err != nil {
  61. return err
  62. }
  63. }
  64. if config.SASL != nil {
  65. if err := configureSASL(*config.SASL, saramaConfig); err != nil {
  66. return err
  67. }
  68. }
  69. if config.Kerberos != nil {
  70. configureKerberos(*config.Kerberos, saramaConfig)
  71. }
  72. return nil
  73. }
  74. func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) {
  75. saramaConfig.Net.SASL.Enable = true
  76. saramaConfig.Net.SASL.User = config.Username
  77. saramaConfig.Net.SASL.Password = config.Password
  78. }
  79. func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
  80. if config.Username == "" {
  81. return fmt.Errorf("username have to be provided")
  82. }
  83. if config.Password == "" {
  84. return fmt.Errorf("password have to be provided")
  85. }
  86. saramaConfig.Net.SASL.Enable = true
  87. saramaConfig.Net.SASL.User = config.Username
  88. saramaConfig.Net.SASL.Password = config.Password
  89. switch config.Mechanism {
  90. case "SCRAM-SHA-512":
  91. saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: sha512.New} }
  92. saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
  93. case "SCRAM-SHA-256":
  94. saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: sha256.New} }
  95. saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
  96. case "PLAIN":
  97. saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
  98. case "AWS_MSK_IAM":
  99. saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
  100. return awsmsk.NewIAMSASLClient(config.AWSMSK.BrokerAddr, config.AWSMSK.Region, saramaConfig.ClientID)
  101. }
  102. saramaConfig.Net.SASL.Mechanism = awsmsk.Mechanism
  103. default:
  104. return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
  105. }
  106. switch config.Version {
  107. case 0:
  108. saramaConfig.Net.SASL.Version = sarama.SASLHandshakeV0
  109. case 1:
  110. saramaConfig.Net.SASL.Version = sarama.SASLHandshakeV1
  111. default:
  112. return fmt.Errorf(`invalid SASL Protocol Version %d: can be either 0 or 1`, config.Version)
  113. }
  114. return nil
  115. }
  116. func configureTLS(config configtls.TLSClientSetting, saramaConfig *sarama.Config) error {
  117. tlsConfig, err := config.LoadTLSConfig()
  118. if err != nil {
  119. return fmt.Errorf("error loading tls config: %w", err)
  120. }
  121. saramaConfig.Net.TLS.Enable = true
  122. saramaConfig.Net.TLS.Config = tlsConfig
  123. return nil
  124. }
  125. func configureKerberos(config KerberosConfig, saramaConfig *sarama.Config) {
  126. saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
  127. saramaConfig.Net.SASL.Enable = true
  128. if config.UseKeyTab {
  129. saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath
  130. saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
  131. } else {
  132. saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
  133. saramaConfig.Net.SASL.GSSAPI.Password = config.Password
  134. }
  135. saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath
  136. saramaConfig.Net.SASL.GSSAPI.Username = config.Username
  137. saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm
  138. saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName
  139. }