kafka_exporter_test.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkaexporter
  4. import (
  5. "context"
  6. "fmt"
  7. "testing"
  8. "github.com/IBM/sarama"
  9. "github.com/IBM/sarama/mocks"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. "go.opentelemetry.io/collector/config/configtls"
  13. "go.opentelemetry.io/collector/exporter/exportertest"
  14. "go.opentelemetry.io/collector/pdata/plog"
  15. "go.opentelemetry.io/collector/pdata/pmetric"
  16. "go.opentelemetry.io/collector/pdata/ptrace"
  17. "go.uber.org/zap"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
  20. )
  21. func TestNewExporter_err_version(t *testing.T) {
  22. c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
  23. texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
  24. assert.Error(t, err)
  25. assert.Nil(t, texp)
  26. }
  27. func TestNewExporter_err_encoding(t *testing.T) {
  28. c := Config{Encoding: "foo"}
  29. texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
  30. assert.EqualError(t, err, errUnrecognizedEncoding.Error())
  31. assert.Nil(t, texp)
  32. }
  33. func TestNewMetricsExporter_err_version(t *testing.T) {
  34. c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
  35. mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
  36. assert.Error(t, err)
  37. assert.Nil(t, mexp)
  38. }
  39. func TestNewMetricsExporter_err_encoding(t *testing.T) {
  40. c := Config{Encoding: "bar"}
  41. mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
  42. assert.EqualError(t, err, errUnrecognizedEncoding.Error())
  43. assert.Nil(t, mexp)
  44. }
  45. func TestNewMetricsExporter_err_traces_encoding(t *testing.T) {
  46. c := Config{Encoding: "jaeger_proto"}
  47. mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
  48. assert.EqualError(t, err, errUnrecognizedEncoding.Error())
  49. assert.Nil(t, mexp)
  50. }
  51. func TestNewLogsExporter_err_version(t *testing.T) {
  52. c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
  53. mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
  54. assert.Error(t, err)
  55. assert.Nil(t, mexp)
  56. }
  57. func TestNewLogsExporter_err_encoding(t *testing.T) {
  58. c := Config{Encoding: "bar"}
  59. mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
  60. assert.EqualError(t, err, errUnrecognizedEncoding.Error())
  61. assert.Nil(t, mexp)
  62. }
  63. func TestNewLogsExporter_err_traces_encoding(t *testing.T) {
  64. c := Config{Encoding: "jaeger_proto"}
  65. mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
  66. assert.EqualError(t, err, errUnrecognizedEncoding.Error())
  67. assert.Nil(t, mexp)
  68. }
  69. func TestNewExporter_err_auth_type(t *testing.T) {
  70. c := Config{
  71. ProtocolVersion: "2.0.0",
  72. Authentication: kafka.Authentication{
  73. TLS: &configtls.TLSClientSetting{
  74. TLSSetting: configtls.TLSSetting{
  75. CAFile: "/doesnotexist",
  76. },
  77. },
  78. },
  79. Encoding: defaultEncoding,
  80. Metadata: Metadata{
  81. Full: false,
  82. },
  83. Producer: Producer{
  84. Compression: "none",
  85. },
  86. }
  87. texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
  88. assert.Error(t, err)
  89. assert.Contains(t, err.Error(), "failed to load TLS config")
  90. assert.Nil(t, texp)
  91. mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
  92. assert.Error(t, err)
  93. assert.Contains(t, err.Error(), "failed to load TLS config")
  94. assert.Nil(t, mexp)
  95. lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
  96. assert.Error(t, err)
  97. assert.Contains(t, err.Error(), "failed to load TLS config")
  98. assert.Nil(t, lexp)
  99. }
  100. func TestNewExporter_err_compression(t *testing.T) {
  101. c := Config{
  102. Encoding: defaultEncoding,
  103. Producer: Producer{
  104. Compression: "idk",
  105. },
  106. }
  107. texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
  108. assert.Error(t, err)
  109. assert.Contains(t, err.Error(), "producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value idk")
  110. assert.Nil(t, texp)
  111. }
  112. func TestTracesPusher(t *testing.T) {
  113. c := sarama.NewConfig()
  114. producer := mocks.NewSyncProducer(t, c)
  115. producer.ExpectSendMessageAndSucceed()
  116. p := kafkaTracesProducer{
  117. producer: producer,
  118. marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding),
  119. }
  120. t.Cleanup(func() {
  121. require.NoError(t, p.Close(context.Background()))
  122. })
  123. err := p.tracesPusher(context.Background(), testdata.GenerateTracesTwoSpansSameResource())
  124. require.NoError(t, err)
  125. }
  126. func TestTracesPusher_err(t *testing.T) {
  127. c := sarama.NewConfig()
  128. producer := mocks.NewSyncProducer(t, c)
  129. expErr := fmt.Errorf("failed to send")
  130. producer.ExpectSendMessageAndFail(expErr)
  131. p := kafkaTracesProducer{
  132. producer: producer,
  133. marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding),
  134. logger: zap.NewNop(),
  135. }
  136. t.Cleanup(func() {
  137. require.NoError(t, p.Close(context.Background()))
  138. })
  139. td := testdata.GenerateTracesTwoSpansSameResource()
  140. err := p.tracesPusher(context.Background(), td)
  141. assert.EqualError(t, err, expErr.Error())
  142. }
  143. func TestTracesPusher_marshal_error(t *testing.T) {
  144. expErr := fmt.Errorf("failed to marshal")
  145. p := kafkaTracesProducer{
  146. marshaler: &tracesErrorMarshaler{err: expErr},
  147. logger: zap.NewNop(),
  148. }
  149. td := testdata.GenerateTracesTwoSpansSameResource()
  150. err := p.tracesPusher(context.Background(), td)
  151. require.Error(t, err)
  152. assert.Contains(t, err.Error(), expErr.Error())
  153. }
  154. func TestMetricsDataPusher(t *testing.T) {
  155. c := sarama.NewConfig()
  156. producer := mocks.NewSyncProducer(t, c)
  157. producer.ExpectSendMessageAndSucceed()
  158. p := kafkaMetricsProducer{
  159. producer: producer,
  160. marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding),
  161. }
  162. t.Cleanup(func() {
  163. require.NoError(t, p.Close(context.Background()))
  164. })
  165. err := p.metricsDataPusher(context.Background(), testdata.GenerateMetricsTwoMetrics())
  166. require.NoError(t, err)
  167. }
  168. func TestMetricsDataPusher_err(t *testing.T) {
  169. c := sarama.NewConfig()
  170. producer := mocks.NewSyncProducer(t, c)
  171. expErr := fmt.Errorf("failed to send")
  172. producer.ExpectSendMessageAndFail(expErr)
  173. p := kafkaMetricsProducer{
  174. producer: producer,
  175. marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding),
  176. logger: zap.NewNop(),
  177. }
  178. t.Cleanup(func() {
  179. require.NoError(t, p.Close(context.Background()))
  180. })
  181. md := testdata.GenerateMetricsTwoMetrics()
  182. err := p.metricsDataPusher(context.Background(), md)
  183. assert.EqualError(t, err, expErr.Error())
  184. }
  185. func TestMetricsDataPusher_marshal_error(t *testing.T) {
  186. expErr := fmt.Errorf("failed to marshal")
  187. p := kafkaMetricsProducer{
  188. marshaler: &metricsErrorMarshaler{err: expErr},
  189. logger: zap.NewNop(),
  190. }
  191. md := testdata.GenerateMetricsTwoMetrics()
  192. err := p.metricsDataPusher(context.Background(), md)
  193. require.Error(t, err)
  194. assert.Contains(t, err.Error(), expErr.Error())
  195. }
  196. func TestLogsDataPusher(t *testing.T) {
  197. c := sarama.NewConfig()
  198. producer := mocks.NewSyncProducer(t, c)
  199. producer.ExpectSendMessageAndSucceed()
  200. p := kafkaLogsProducer{
  201. producer: producer,
  202. marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding),
  203. }
  204. t.Cleanup(func() {
  205. require.NoError(t, p.Close(context.Background()))
  206. })
  207. err := p.logsDataPusher(context.Background(), testdata.GenerateLogsOneLogRecord())
  208. require.NoError(t, err)
  209. }
  210. func TestLogsDataPusher_err(t *testing.T) {
  211. c := sarama.NewConfig()
  212. producer := mocks.NewSyncProducer(t, c)
  213. expErr := fmt.Errorf("failed to send")
  214. producer.ExpectSendMessageAndFail(expErr)
  215. p := kafkaLogsProducer{
  216. producer: producer,
  217. marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding),
  218. logger: zap.NewNop(),
  219. }
  220. t.Cleanup(func() {
  221. require.NoError(t, p.Close(context.Background()))
  222. })
  223. ld := testdata.GenerateLogsOneLogRecord()
  224. err := p.logsDataPusher(context.Background(), ld)
  225. assert.EqualError(t, err, expErr.Error())
  226. }
  227. func TestLogsDataPusher_marshal_error(t *testing.T) {
  228. expErr := fmt.Errorf("failed to marshal")
  229. p := kafkaLogsProducer{
  230. marshaler: &logsErrorMarshaler{err: expErr},
  231. logger: zap.NewNop(),
  232. }
  233. ld := testdata.GenerateLogsOneLogRecord()
  234. err := p.logsDataPusher(context.Background(), ld)
  235. require.Error(t, err)
  236. assert.Contains(t, err.Error(), expErr.Error())
  237. }
  238. type tracesErrorMarshaler struct {
  239. err error
  240. }
  241. type metricsErrorMarshaler struct {
  242. err error
  243. }
  244. type logsErrorMarshaler struct {
  245. err error
  246. }
  247. func (e metricsErrorMarshaler) Marshal(_ pmetric.Metrics, _ string) ([]*sarama.ProducerMessage, error) {
  248. return nil, e.err
  249. }
  250. func (e metricsErrorMarshaler) Encoding() string {
  251. panic("implement me")
  252. }
  253. var _ TracesMarshaler = (*tracesErrorMarshaler)(nil)
  254. func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*sarama.ProducerMessage, error) {
  255. return nil, e.err
  256. }
  257. func (e tracesErrorMarshaler) Encoding() string {
  258. panic("implement me")
  259. }
  260. func (e logsErrorMarshaler) Marshal(_ plog.Logs, _ string) ([]*sarama.ProducerMessage, error) {
  261. return nil, e.err
  262. }
  263. func (e logsErrorMarshaler) Encoding() string {
  264. panic("implement me")
  265. }