factory_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkaexporter
  4. import (
  5. "context"
  6. "errors"
  7. "net"
  8. "testing"
  9. "github.com/IBM/sarama"
  10. "github.com/stretchr/testify/assert"
  11. "go.opentelemetry.io/collector/component/componenttest"
  12. "go.opentelemetry.io/collector/exporter/exportertest"
  13. "go.opentelemetry.io/collector/pdata/plog"
  14. "go.opentelemetry.io/collector/pdata/pmetric"
  15. "go.opentelemetry.io/collector/pdata/ptrace"
  16. )
  17. // data is a simple means of allowing
  18. // interchangeability between the
  19. // different marshaller types
  20. type data interface {
  21. ptrace.Traces | plog.Logs | pmetric.Metrics
  22. }
  23. type mockMarshaler[Data data] struct {
  24. consume func(d Data, topic string) ([]*sarama.ProducerMessage, error)
  25. encoding string
  26. }
  27. func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding }
  28. func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) {
  29. if mm.consume != nil {
  30. return mm.consume(d, topic)
  31. }
  32. return nil, errors.New("not implemented")
  33. }
  34. func newMockMarshaler[Data data](encoding string) *mockMarshaler[Data] {
  35. return &mockMarshaler[Data]{encoding: encoding}
  36. }
  37. // applyConfigOption is used to modify values of the
  38. // the default exporter config to make it easier to
  39. // use the return in a test table set up
  40. func applyConfigOption(option func(conf *Config)) *Config {
  41. conf := createDefaultConfig().(*Config)
  42. option(conf)
  43. return conf
  44. }
  45. func TestCreateDefaultConfig(t *testing.T) {
  46. cfg := createDefaultConfig().(*Config)
  47. assert.NotNil(t, cfg, "failed to create default config")
  48. assert.NoError(t, componenttest.CheckConfigStruct(cfg))
  49. assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
  50. assert.Equal(t, "", cfg.Topic)
  51. }
  52. func TestCreateMetricExporter(t *testing.T) {
  53. t.Parallel()
  54. tests := []struct {
  55. name string
  56. conf *Config
  57. marshalers []MetricsMarshaler
  58. err error
  59. }{
  60. {
  61. name: "valid config (no validating broker)",
  62. conf: applyConfigOption(func(conf *Config) {
  63. // this disables contacting the broker so
  64. // we can successfully create the exporter
  65. conf.Metadata.Full = false
  66. conf.Brokers = []string{"invalid:9092"}
  67. conf.ProtocolVersion = "2.0.0"
  68. }),
  69. err: nil,
  70. },
  71. {
  72. name: "invalid config (validating broker)",
  73. conf: applyConfigOption(func(conf *Config) {
  74. conf.Brokers = []string{"invalid:9092"}
  75. conf.ProtocolVersion = "2.0.0"
  76. }),
  77. err: &net.DNSError{},
  78. },
  79. {
  80. name: "default_encoding",
  81. conf: applyConfigOption(func(conf *Config) {
  82. // Disabling broker check to ensure encoding work
  83. conf.Metadata.Full = false
  84. conf.Encoding = defaultEncoding
  85. }),
  86. marshalers: nil,
  87. err: nil,
  88. },
  89. {
  90. name: "custom_encoding",
  91. conf: applyConfigOption(func(conf *Config) {
  92. // Disabling broker check to ensure encoding work
  93. conf.Metadata.Full = false
  94. conf.Encoding = "custom"
  95. }),
  96. marshalers: []MetricsMarshaler{
  97. newMockMarshaler[pmetric.Metrics]("custom"),
  98. },
  99. err: nil,
  100. },
  101. }
  102. for _, tc := range tests {
  103. tc := tc
  104. t.Run(tc.name, func(t *testing.T) {
  105. t.Parallel()
  106. f := NewFactory(withMetricsMarshalers(tc.marshalers...))
  107. exporter, err := f.CreateMetricsExporter(
  108. context.Background(),
  109. exportertest.NewNopCreateSettings(),
  110. tc.conf,
  111. )
  112. if tc.err != nil {
  113. assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
  114. assert.Nil(t, exporter, "Must return nil value for invalid exporter")
  115. return
  116. }
  117. assert.NoError(t, err, "Must not error")
  118. assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
  119. })
  120. }
  121. }
  122. func TestCreateLogExporter(t *testing.T) {
  123. t.Parallel()
  124. tests := []struct {
  125. name string
  126. conf *Config
  127. marshalers []LogsMarshaler
  128. err error
  129. }{
  130. {
  131. name: "valid config (no validating broker)",
  132. conf: applyConfigOption(func(conf *Config) {
  133. // this disables contacting the broker so
  134. // we can successfully create the exporter
  135. conf.Metadata.Full = false
  136. conf.Brokers = []string{"invalid:9092"}
  137. conf.ProtocolVersion = "2.0.0"
  138. }),
  139. err: nil,
  140. },
  141. {
  142. name: "invalid config (validating broker)",
  143. conf: applyConfigOption(func(conf *Config) {
  144. conf.Brokers = []string{"invalid:9092"}
  145. conf.ProtocolVersion = "2.0.0"
  146. }),
  147. err: &net.DNSError{},
  148. },
  149. {
  150. name: "default_encoding",
  151. conf: applyConfigOption(func(conf *Config) {
  152. // Disabling broker check to ensure encoding work
  153. conf.Metadata.Full = false
  154. conf.Encoding = defaultEncoding
  155. }),
  156. marshalers: nil,
  157. err: nil,
  158. },
  159. {
  160. name: "custom_encoding",
  161. conf: applyConfigOption(func(conf *Config) {
  162. // Disabling broker check to ensure encoding work
  163. conf.Metadata.Full = false
  164. conf.Encoding = "custom"
  165. }),
  166. marshalers: []LogsMarshaler{
  167. newMockMarshaler[plog.Logs]("custom"),
  168. },
  169. err: nil,
  170. },
  171. }
  172. for _, tc := range tests {
  173. tc := tc
  174. t.Run(tc.name, func(t *testing.T) {
  175. t.Parallel()
  176. f := NewFactory(withLogsMarshalers(tc.marshalers...))
  177. exporter, err := f.CreateLogsExporter(
  178. context.Background(),
  179. exportertest.NewNopCreateSettings(),
  180. tc.conf,
  181. )
  182. if tc.err != nil {
  183. assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
  184. assert.Nil(t, exporter, "Must return nil value for invalid exporter")
  185. return
  186. }
  187. assert.NoError(t, err, "Must not error")
  188. assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
  189. })
  190. }
  191. }
  192. func TestCreateTraceExporter(t *testing.T) {
  193. t.Parallel()
  194. tests := []struct {
  195. name string
  196. conf *Config
  197. marshalers []TracesMarshaler
  198. err error
  199. }{
  200. {
  201. name: "valid config (no validating brokers)",
  202. conf: applyConfigOption(func(conf *Config) {
  203. conf.Metadata.Full = false
  204. conf.Brokers = []string{"invalid:9092"}
  205. conf.ProtocolVersion = "2.0.0"
  206. }),
  207. marshalers: nil,
  208. err: nil,
  209. },
  210. {
  211. name: "invalid config (validating brokers)",
  212. conf: applyConfigOption(func(conf *Config) {
  213. conf.Brokers = []string{"invalid:9092"}
  214. conf.ProtocolVersion = "2.0.0"
  215. }),
  216. marshalers: nil,
  217. err: &net.DNSError{},
  218. },
  219. {
  220. name: "default_encoding",
  221. conf: applyConfigOption(func(conf *Config) {
  222. // Disabling broker check to ensure encoding work
  223. conf.Metadata.Full = false
  224. conf.Encoding = defaultEncoding
  225. }),
  226. marshalers: nil,
  227. err: nil,
  228. },
  229. {
  230. name: "custom_encoding",
  231. conf: applyConfigOption(func(conf *Config) {
  232. // Disabling broker check to ensure encoding work
  233. conf.Metadata.Full = false
  234. conf.Encoding = "custom"
  235. }),
  236. marshalers: []TracesMarshaler{
  237. newMockMarshaler[ptrace.Traces]("custom"),
  238. },
  239. err: nil,
  240. },
  241. }
  242. for _, tc := range tests {
  243. tc := tc
  244. t.Run(tc.name, func(t *testing.T) {
  245. t.Parallel()
  246. f := NewFactory(withTracesMarshalers(tc.marshalers...))
  247. exporter, err := f.CreateTracesExporter(
  248. context.Background(),
  249. exportertest.NewNopCreateSettings(),
  250. tc.conf,
  251. )
  252. if tc.err != nil {
  253. assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
  254. assert.Nil(t, exporter, "Must return nil value for invalid exporter")
  255. return
  256. }
  257. assert.NoError(t, err, "Must not error")
  258. assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
  259. })
  260. }
  261. }