kafka_receiver_test.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkareceiver
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/IBM/sarama"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. "go.opencensus.io/stats/view"
  15. "go.opentelemetry.io/collector/component/componenttest"
  16. "go.opentelemetry.io/collector/config/configtls"
  17. "go.opentelemetry.io/collector/consumer/consumertest"
  18. "go.opentelemetry.io/collector/pdata/plog"
  19. "go.opentelemetry.io/collector/pdata/pmetric"
  20. "go.opentelemetry.io/collector/pdata/ptrace"
  21. "go.opentelemetry.io/collector/receiver/receiverhelper"
  22. "go.opentelemetry.io/collector/receiver/receivertest"
  23. "go.uber.org/zap"
  24. "go.uber.org/zap/zapcore"
  25. "go.uber.org/zap/zaptest/observer"
  26. "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
  27. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
  28. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils"
  29. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
  30. )
  31. func TestNewTracesReceiver_version_err(t *testing.T) {
  32. c := Config{
  33. Encoding: defaultEncoding,
  34. ProtocolVersion: "none",
  35. }
  36. r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
  37. assert.Error(t, err)
  38. assert.Nil(t, r)
  39. }
  40. func TestNewTracesReceiver_encoding_err(t *testing.T) {
  41. c := Config{
  42. Encoding: "foo",
  43. }
  44. r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
  45. require.Error(t, err)
  46. assert.Nil(t, r)
  47. assert.EqualError(t, err, errUnrecognizedEncoding.Error())
  48. }
  49. func TestNewTracesReceiver_err_auth_type(t *testing.T) {
  50. c := Config{
  51. ProtocolVersion: "2.0.0",
  52. Authentication: kafka.Authentication{
  53. TLS: &configtls.TLSClientSetting{
  54. TLSSetting: configtls.TLSSetting{
  55. CAFile: "/doesnotexist",
  56. },
  57. },
  58. },
  59. Encoding: defaultEncoding,
  60. Metadata: kafkaexporter.Metadata{
  61. Full: false,
  62. },
  63. }
  64. r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
  65. assert.Error(t, err)
  66. assert.Contains(t, err.Error(), "failed to load TLS config")
  67. assert.Nil(t, r)
  68. }
  69. func TestNewTracesReceiver_initial_offset_err(t *testing.T) {
  70. c := Config{
  71. InitialOffset: "foo",
  72. Encoding: defaultEncoding,
  73. }
  74. r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
  75. require.Error(t, err)
  76. assert.Nil(t, r)
  77. assert.EqualError(t, err, errInvalidInitialOffset.Error())
  78. }
  79. func TestTracesReceiverStart(t *testing.T) {
  80. c := kafkaTracesConsumer{
  81. nextConsumer: consumertest.NewNop(),
  82. settings: receivertest.NewNopCreateSettings(),
  83. consumerGroup: &testConsumerGroup{},
  84. }
  85. require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
  86. require.NoError(t, c.Shutdown(context.Background()))
  87. }
  88. func TestTracesReceiverStartConsume(t *testing.T) {
  89. c := kafkaTracesConsumer{
  90. nextConsumer: consumertest.NewNop(),
  91. settings: receivertest.NewNopCreateSettings(),
  92. consumerGroup: &testConsumerGroup{},
  93. }
  94. ctx, cancelFunc := context.WithCancel(context.Background())
  95. c.cancelConsumeLoop = cancelFunc
  96. require.NoError(t, c.Shutdown(context.Background()))
  97. err := c.consumeLoop(ctx, &tracesConsumerGroupHandler{
  98. ready: make(chan bool),
  99. })
  100. assert.EqualError(t, err, context.Canceled.Error())
  101. }
  102. func TestTracesReceiver_error(t *testing.T) {
  103. zcore, logObserver := observer.New(zapcore.ErrorLevel)
  104. logger := zap.New(zcore)
  105. settings := receivertest.NewNopCreateSettings()
  106. settings.Logger = logger
  107. expectedErr := errors.New("handler error")
  108. c := kafkaTracesConsumer{
  109. nextConsumer: consumertest.NewNop(),
  110. settings: settings,
  111. consumerGroup: &testConsumerGroup{err: expectedErr},
  112. }
  113. require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
  114. require.NoError(t, c.Shutdown(context.Background()))
  115. assert.Eventually(t, func() bool {
  116. return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
  117. }, 10*time.Second, time.Millisecond*100)
  118. }
  119. func TestTracesConsumerGroupHandler(t *testing.T) {
  120. view.Unregister(metricViews()...)
  121. views := metricViews()
  122. require.NoError(t, view.Register(views...))
  123. defer view.Unregister(views...)
  124. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  125. require.NoError(t, err)
  126. c := tracesConsumerGroupHandler{
  127. unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
  128. logger: zap.NewNop(),
  129. ready: make(chan bool),
  130. nextConsumer: consumertest.NewNop(),
  131. obsrecv: obsrecv,
  132. headerExtractor: &nopHeaderExtractor{},
  133. }
  134. testSession := testConsumerGroupSession{ctx: context.Background()}
  135. require.NoError(t, c.Setup(testSession))
  136. _, ok := <-c.ready
  137. assert.False(t, ok)
  138. viewData, err := view.RetrieveData(statPartitionStart.Name())
  139. require.NoError(t, err)
  140. assert.Equal(t, 1, len(viewData))
  141. distData := viewData[0].Data.(*view.SumData)
  142. assert.Equal(t, float64(1), distData.Value)
  143. require.NoError(t, c.Cleanup(testSession))
  144. viewData, err = view.RetrieveData(statPartitionClose.Name())
  145. require.NoError(t, err)
  146. assert.Equal(t, 1, len(viewData))
  147. distData = viewData[0].Data.(*view.SumData)
  148. assert.Equal(t, float64(1), distData.Value)
  149. groupClaim := testConsumerGroupClaim{
  150. messageChan: make(chan *sarama.ConsumerMessage),
  151. }
  152. wg := sync.WaitGroup{}
  153. wg.Add(1)
  154. go func() {
  155. require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
  156. wg.Done()
  157. }()
  158. groupClaim.messageChan <- &sarama.ConsumerMessage{}
  159. close(groupClaim.messageChan)
  160. wg.Wait()
  161. }
  162. func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
  163. view.Unregister(metricViews()...)
  164. views := metricViews()
  165. require.NoError(t, view.Register(views...))
  166. defer view.Unregister(views...)
  167. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  168. require.NoError(t, err)
  169. c := tracesConsumerGroupHandler{
  170. unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
  171. logger: zap.NewNop(),
  172. ready: make(chan bool),
  173. nextConsumer: consumertest.NewNop(),
  174. obsrecv: obsrecv,
  175. headerExtractor: &nopHeaderExtractor{},
  176. }
  177. ctx, cancelFunc := context.WithCancel(context.Background())
  178. testSession := testConsumerGroupSession{ctx: ctx}
  179. require.NoError(t, c.Setup(testSession))
  180. _, ok := <-c.ready
  181. assert.False(t, ok)
  182. viewData, err := view.RetrieveData(statPartitionStart.Name())
  183. require.NoError(t, err)
  184. assert.Equal(t, 1, len(viewData))
  185. distData := viewData[0].Data.(*view.SumData)
  186. assert.Equal(t, float64(1), distData.Value)
  187. require.NoError(t, c.Cleanup(testSession))
  188. viewData, err = view.RetrieveData(statPartitionClose.Name())
  189. require.NoError(t, err)
  190. assert.Equal(t, 1, len(viewData))
  191. distData = viewData[0].Data.(*view.SumData)
  192. assert.Equal(t, float64(1), distData.Value)
  193. groupClaim := testConsumerGroupClaim{
  194. messageChan: make(chan *sarama.ConsumerMessage),
  195. }
  196. defer close(groupClaim.messageChan)
  197. wg := sync.WaitGroup{}
  198. wg.Add(1)
  199. go func() {
  200. require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
  201. wg.Done()
  202. }()
  203. groupClaim.messageChan <- &sarama.ConsumerMessage{}
  204. cancelFunc()
  205. wg.Wait()
  206. }
  207. func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) {
  208. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  209. require.NoError(t, err)
  210. c := tracesConsumerGroupHandler{
  211. unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
  212. logger: zap.NewNop(),
  213. ready: make(chan bool),
  214. nextConsumer: consumertest.NewNop(),
  215. obsrecv: obsrecv,
  216. headerExtractor: &nopHeaderExtractor{},
  217. }
  218. wg := sync.WaitGroup{}
  219. wg.Add(1)
  220. groupClaim := &testConsumerGroupClaim{
  221. messageChan: make(chan *sarama.ConsumerMessage),
  222. }
  223. go func() {
  224. err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
  225. require.Error(t, err)
  226. wg.Done()
  227. }()
  228. groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
  229. close(groupClaim.messageChan)
  230. wg.Wait()
  231. }
  232. func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
  233. consumerError := errors.New("failed to consume")
  234. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  235. require.NoError(t, err)
  236. c := tracesConsumerGroupHandler{
  237. unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
  238. logger: zap.NewNop(),
  239. ready: make(chan bool),
  240. nextConsumer: consumertest.NewErr(consumerError),
  241. obsrecv: obsrecv,
  242. headerExtractor: &nopHeaderExtractor{},
  243. }
  244. wg := sync.WaitGroup{}
  245. wg.Add(1)
  246. groupClaim := &testConsumerGroupClaim{
  247. messageChan: make(chan *sarama.ConsumerMessage),
  248. }
  249. go func() {
  250. e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
  251. assert.EqualError(t, e, consumerError.Error())
  252. wg.Done()
  253. }()
  254. td := ptrace.NewTraces()
  255. td.ResourceSpans().AppendEmpty()
  256. unmarshaler := &ptrace.ProtoMarshaler{}
  257. bts, err := unmarshaler.MarshalTraces(td)
  258. require.NoError(t, err)
  259. groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
  260. close(groupClaim.messageChan)
  261. wg.Wait()
  262. }
  263. func TestNewMetricsReceiver_version_err(t *testing.T) {
  264. c := Config{
  265. Encoding: defaultEncoding,
  266. ProtocolVersion: "none",
  267. }
  268. r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
  269. assert.Error(t, err)
  270. assert.Nil(t, r)
  271. }
  272. func TestNewMetricsReceiver_encoding_err(t *testing.T) {
  273. c := Config{
  274. Encoding: "foo",
  275. }
  276. r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
  277. require.Error(t, err)
  278. assert.Nil(t, r)
  279. assert.EqualError(t, err, errUnrecognizedEncoding.Error())
  280. }
  281. func TestNewMetricsExporter_err_auth_type(t *testing.T) {
  282. c := Config{
  283. ProtocolVersion: "2.0.0",
  284. Authentication: kafka.Authentication{
  285. TLS: &configtls.TLSClientSetting{
  286. TLSSetting: configtls.TLSSetting{
  287. CAFile: "/doesnotexist",
  288. },
  289. },
  290. },
  291. Encoding: defaultEncoding,
  292. Metadata: kafkaexporter.Metadata{
  293. Full: false,
  294. },
  295. }
  296. r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
  297. assert.Error(t, err)
  298. assert.Contains(t, err.Error(), "failed to load TLS config")
  299. assert.Nil(t, r)
  300. }
  301. func TestNewMetricsReceiver_initial_offset_err(t *testing.T) {
  302. c := Config{
  303. InitialOffset: "foo",
  304. Encoding: defaultEncoding,
  305. }
  306. r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
  307. require.Error(t, err)
  308. assert.Nil(t, r)
  309. assert.EqualError(t, err, errInvalidInitialOffset.Error())
  310. }
  311. func TestMetricsReceiverStart(t *testing.T) {
  312. c := kafkaMetricsConsumer{
  313. nextConsumer: consumertest.NewNop(),
  314. settings: receivertest.NewNopCreateSettings(),
  315. consumerGroup: &testConsumerGroup{},
  316. }
  317. require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
  318. require.NoError(t, c.Shutdown(context.Background()))
  319. }
  320. func TestMetricsReceiverStartConsume(t *testing.T) {
  321. c := kafkaMetricsConsumer{
  322. nextConsumer: consumertest.NewNop(),
  323. settings: receivertest.NewNopCreateSettings(),
  324. consumerGroup: &testConsumerGroup{},
  325. }
  326. ctx, cancelFunc := context.WithCancel(context.Background())
  327. c.cancelConsumeLoop = cancelFunc
  328. require.NoError(t, c.Shutdown(context.Background()))
  329. err := c.consumeLoop(ctx, &logsConsumerGroupHandler{
  330. ready: make(chan bool),
  331. })
  332. assert.EqualError(t, err, context.Canceled.Error())
  333. }
  334. func TestMetricsReceiver_error(t *testing.T) {
  335. zcore, logObserver := observer.New(zapcore.ErrorLevel)
  336. logger := zap.New(zcore)
  337. settings := receivertest.NewNopCreateSettings()
  338. settings.Logger = logger
  339. expectedErr := errors.New("handler error")
  340. c := kafkaMetricsConsumer{
  341. nextConsumer: consumertest.NewNop(),
  342. settings: settings,
  343. consumerGroup: &testConsumerGroup{err: expectedErr},
  344. }
  345. require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
  346. require.NoError(t, c.Shutdown(context.Background()))
  347. assert.Eventually(t, func() bool {
  348. return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
  349. }, 10*time.Second, time.Millisecond*100)
  350. }
  351. func TestMetricsConsumerGroupHandler(t *testing.T) {
  352. view.Unregister(metricViews()...)
  353. views := metricViews()
  354. require.NoError(t, view.Register(views...))
  355. defer view.Unregister(views...)
  356. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  357. require.NoError(t, err)
  358. c := metricsConsumerGroupHandler{
  359. unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
  360. logger: zap.NewNop(),
  361. ready: make(chan bool),
  362. nextConsumer: consumertest.NewNop(),
  363. obsrecv: obsrecv,
  364. headerExtractor: &nopHeaderExtractor{},
  365. }
  366. testSession := testConsumerGroupSession{ctx: context.Background()}
  367. require.NoError(t, c.Setup(testSession))
  368. _, ok := <-c.ready
  369. assert.False(t, ok)
  370. viewData, err := view.RetrieveData(statPartitionStart.Name())
  371. require.NoError(t, err)
  372. assert.Equal(t, 1, len(viewData))
  373. distData := viewData[0].Data.(*view.SumData)
  374. assert.Equal(t, float64(1), distData.Value)
  375. require.NoError(t, c.Cleanup(testSession))
  376. viewData, err = view.RetrieveData(statPartitionClose.Name())
  377. require.NoError(t, err)
  378. assert.Equal(t, 1, len(viewData))
  379. distData = viewData[0].Data.(*view.SumData)
  380. assert.Equal(t, float64(1), distData.Value)
  381. groupClaim := testConsumerGroupClaim{
  382. messageChan: make(chan *sarama.ConsumerMessage),
  383. }
  384. wg := sync.WaitGroup{}
  385. wg.Add(1)
  386. go func() {
  387. require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
  388. wg.Done()
  389. }()
  390. groupClaim.messageChan <- &sarama.ConsumerMessage{}
  391. close(groupClaim.messageChan)
  392. wg.Wait()
  393. }
  394. func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
  395. view.Unregister(metricViews()...)
  396. views := metricViews()
  397. require.NoError(t, view.Register(views...))
  398. defer view.Unregister(views...)
  399. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  400. require.NoError(t, err)
  401. c := metricsConsumerGroupHandler{
  402. unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
  403. logger: zap.NewNop(),
  404. ready: make(chan bool),
  405. nextConsumer: consumertest.NewNop(),
  406. obsrecv: obsrecv,
  407. headerExtractor: &nopHeaderExtractor{},
  408. }
  409. ctx, cancelFunc := context.WithCancel(context.Background())
  410. testSession := testConsumerGroupSession{ctx: ctx}
  411. require.NoError(t, c.Setup(testSession))
  412. _, ok := <-c.ready
  413. assert.False(t, ok)
  414. viewData, err := view.RetrieveData(statPartitionStart.Name())
  415. require.NoError(t, err)
  416. assert.Equal(t, 1, len(viewData))
  417. distData := viewData[0].Data.(*view.SumData)
  418. assert.Equal(t, float64(1), distData.Value)
  419. require.NoError(t, c.Cleanup(testSession))
  420. viewData, err = view.RetrieveData(statPartitionClose.Name())
  421. require.NoError(t, err)
  422. assert.Equal(t, 1, len(viewData))
  423. distData = viewData[0].Data.(*view.SumData)
  424. assert.Equal(t, float64(1), distData.Value)
  425. groupClaim := testConsumerGroupClaim{
  426. messageChan: make(chan *sarama.ConsumerMessage),
  427. }
  428. defer close(groupClaim.messageChan)
  429. wg := sync.WaitGroup{}
  430. wg.Add(1)
  431. go func() {
  432. require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
  433. wg.Done()
  434. }()
  435. groupClaim.messageChan <- &sarama.ConsumerMessage{}
  436. cancelFunc()
  437. wg.Wait()
  438. }
  439. func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) {
  440. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  441. require.NoError(t, err)
  442. c := metricsConsumerGroupHandler{
  443. unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
  444. logger: zap.NewNop(),
  445. ready: make(chan bool),
  446. nextConsumer: consumertest.NewNop(),
  447. obsrecv: obsrecv,
  448. headerExtractor: &nopHeaderExtractor{},
  449. }
  450. wg := sync.WaitGroup{}
  451. wg.Add(1)
  452. groupClaim := &testConsumerGroupClaim{
  453. messageChan: make(chan *sarama.ConsumerMessage),
  454. }
  455. go func() {
  456. err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
  457. require.Error(t, err)
  458. wg.Done()
  459. }()
  460. groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
  461. close(groupClaim.messageChan)
  462. wg.Wait()
  463. }
  464. func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
  465. consumerError := errors.New("failed to consume")
  466. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  467. require.NoError(t, err)
  468. c := metricsConsumerGroupHandler{
  469. unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
  470. logger: zap.NewNop(),
  471. ready: make(chan bool),
  472. nextConsumer: consumertest.NewErr(consumerError),
  473. obsrecv: obsrecv,
  474. headerExtractor: &nopHeaderExtractor{},
  475. }
  476. wg := sync.WaitGroup{}
  477. wg.Add(1)
  478. groupClaim := &testConsumerGroupClaim{
  479. messageChan: make(chan *sarama.ConsumerMessage),
  480. }
  481. go func() {
  482. e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
  483. assert.EqualError(t, e, consumerError.Error())
  484. wg.Done()
  485. }()
  486. ld := testdata.GenerateMetricsOneMetric()
  487. unmarshaler := &pmetric.ProtoMarshaler{}
  488. bts, err := unmarshaler.MarshalMetrics(ld)
  489. require.NoError(t, err)
  490. groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
  491. close(groupClaim.messageChan)
  492. wg.Wait()
  493. }
  494. func TestNewLogsReceiver_version_err(t *testing.T) {
  495. c := Config{
  496. Encoding: defaultEncoding,
  497. ProtocolVersion: "none",
  498. }
  499. r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
  500. assert.Error(t, err)
  501. assert.Nil(t, r)
  502. }
  503. func TestNewLogsReceiver_encoding_err(t *testing.T) {
  504. c := Config{
  505. Encoding: "foo",
  506. }
  507. r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
  508. require.Error(t, err)
  509. assert.Nil(t, r)
  510. assert.EqualError(t, err, errUnrecognizedEncoding.Error())
  511. }
  512. func TestNewLogsExporter_err_auth_type(t *testing.T) {
  513. c := Config{
  514. ProtocolVersion: "2.0.0",
  515. Authentication: kafka.Authentication{
  516. TLS: &configtls.TLSClientSetting{
  517. TLSSetting: configtls.TLSSetting{
  518. CAFile: "/doesnotexist",
  519. },
  520. },
  521. },
  522. Encoding: defaultEncoding,
  523. Metadata: kafkaexporter.Metadata{
  524. Full: false,
  525. },
  526. }
  527. r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
  528. assert.Error(t, err)
  529. assert.Contains(t, err.Error(), "failed to load TLS config")
  530. assert.Nil(t, r)
  531. }
  532. func TestNewLogsReceiver_initial_offset_err(t *testing.T) {
  533. c := Config{
  534. InitialOffset: "foo",
  535. Encoding: defaultEncoding,
  536. }
  537. r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
  538. require.Error(t, err)
  539. assert.Nil(t, r)
  540. assert.EqualError(t, err, errInvalidInitialOffset.Error())
  541. }
  542. func TestLogsReceiverStart(t *testing.T) {
  543. c := kafkaLogsConsumer{
  544. nextConsumer: consumertest.NewNop(),
  545. settings: receivertest.NewNopCreateSettings(),
  546. consumerGroup: &testConsumerGroup{},
  547. }
  548. require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
  549. require.NoError(t, c.Shutdown(context.Background()))
  550. }
  551. func TestLogsReceiverStartConsume(t *testing.T) {
  552. c := kafkaLogsConsumer{
  553. nextConsumer: consumertest.NewNop(),
  554. settings: receivertest.NewNopCreateSettings(),
  555. consumerGroup: &testConsumerGroup{},
  556. }
  557. ctx, cancelFunc := context.WithCancel(context.Background())
  558. c.cancelConsumeLoop = cancelFunc
  559. require.NoError(t, c.Shutdown(context.Background()))
  560. err := c.consumeLoop(ctx, &logsConsumerGroupHandler{
  561. ready: make(chan bool),
  562. })
  563. assert.EqualError(t, err, context.Canceled.Error())
  564. }
  565. func TestLogsReceiver_error(t *testing.T) {
  566. zcore, logObserver := observer.New(zapcore.ErrorLevel)
  567. logger := zap.New(zcore)
  568. settings := receivertest.NewNopCreateSettings()
  569. settings.Logger = logger
  570. expectedErr := errors.New("handler error")
  571. c := kafkaLogsConsumer{
  572. nextConsumer: consumertest.NewNop(),
  573. settings: settings,
  574. consumerGroup: &testConsumerGroup{err: expectedErr},
  575. }
  576. require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
  577. require.NoError(t, c.Shutdown(context.Background()))
  578. assert.Eventually(t, func() bool {
  579. return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
  580. }, 10*time.Second, time.Millisecond*100)
  581. }
  582. func TestLogsConsumerGroupHandler(t *testing.T) {
  583. view.Unregister(metricViews()...)
  584. views := metricViews()
  585. require.NoError(t, view.Register(views...))
  586. defer view.Unregister(views...)
  587. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  588. require.NoError(t, err)
  589. c := logsConsumerGroupHandler{
  590. unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
  591. logger: zap.NewNop(),
  592. ready: make(chan bool),
  593. nextConsumer: consumertest.NewNop(),
  594. obsrecv: obsrecv,
  595. headerExtractor: &nopHeaderExtractor{},
  596. }
  597. testSession := testConsumerGroupSession{ctx: context.Background()}
  598. require.NoError(t, c.Setup(testSession))
  599. _, ok := <-c.ready
  600. assert.False(t, ok)
  601. viewData, err := view.RetrieveData(statPartitionStart.Name())
  602. require.NoError(t, err)
  603. assert.Equal(t, 1, len(viewData))
  604. distData := viewData[0].Data.(*view.SumData)
  605. assert.Equal(t, float64(1), distData.Value)
  606. require.NoError(t, c.Cleanup(testSession))
  607. viewData, err = view.RetrieveData(statPartitionClose.Name())
  608. require.NoError(t, err)
  609. assert.Equal(t, 1, len(viewData))
  610. distData = viewData[0].Data.(*view.SumData)
  611. assert.Equal(t, float64(1), distData.Value)
  612. groupClaim := testConsumerGroupClaim{
  613. messageChan: make(chan *sarama.ConsumerMessage),
  614. }
  615. wg := sync.WaitGroup{}
  616. wg.Add(1)
  617. go func() {
  618. require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
  619. wg.Done()
  620. }()
  621. groupClaim.messageChan <- &sarama.ConsumerMessage{}
  622. close(groupClaim.messageChan)
  623. wg.Wait()
  624. }
  625. func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
  626. view.Unregister(metricViews()...)
  627. views := metricViews()
  628. require.NoError(t, view.Register(views...))
  629. defer view.Unregister(views...)
  630. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  631. require.NoError(t, err)
  632. c := logsConsumerGroupHandler{
  633. unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
  634. logger: zap.NewNop(),
  635. ready: make(chan bool),
  636. nextConsumer: consumertest.NewNop(),
  637. obsrecv: obsrecv,
  638. headerExtractor: &nopHeaderExtractor{},
  639. }
  640. ctx, cancelFunc := context.WithCancel(context.Background())
  641. testSession := testConsumerGroupSession{ctx: ctx}
  642. require.NoError(t, c.Setup(testSession))
  643. _, ok := <-c.ready
  644. assert.False(t, ok)
  645. viewData, err := view.RetrieveData(statPartitionStart.Name())
  646. require.NoError(t, err)
  647. assert.Equal(t, 1, len(viewData))
  648. distData := viewData[0].Data.(*view.SumData)
  649. assert.Equal(t, float64(1), distData.Value)
  650. require.NoError(t, c.Cleanup(testSession))
  651. viewData, err = view.RetrieveData(statPartitionClose.Name())
  652. require.NoError(t, err)
  653. assert.Equal(t, 1, len(viewData))
  654. distData = viewData[0].Data.(*view.SumData)
  655. assert.Equal(t, float64(1), distData.Value)
  656. groupClaim := testConsumerGroupClaim{
  657. messageChan: make(chan *sarama.ConsumerMessage),
  658. }
  659. defer close(groupClaim.messageChan)
  660. wg := sync.WaitGroup{}
  661. wg.Add(1)
  662. go func() {
  663. require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
  664. wg.Done()
  665. }()
  666. groupClaim.messageChan <- &sarama.ConsumerMessage{}
  667. cancelFunc()
  668. wg.Wait()
  669. }
  670. func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
  671. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  672. require.NoError(t, err)
  673. c := logsConsumerGroupHandler{
  674. unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
  675. logger: zap.NewNop(),
  676. ready: make(chan bool),
  677. nextConsumer: consumertest.NewNop(),
  678. obsrecv: obsrecv,
  679. headerExtractor: &nopHeaderExtractor{},
  680. }
  681. wg := sync.WaitGroup{}
  682. wg.Add(1)
  683. groupClaim := &testConsumerGroupClaim{
  684. messageChan: make(chan *sarama.ConsumerMessage),
  685. }
  686. go func() {
  687. err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
  688. require.Error(t, err)
  689. wg.Done()
  690. }()
  691. groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
  692. close(groupClaim.messageChan)
  693. wg.Wait()
  694. }
  695. func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
  696. consumerError := errors.New("failed to consume")
  697. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  698. require.NoError(t, err)
  699. c := logsConsumerGroupHandler{
  700. unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
  701. logger: zap.NewNop(),
  702. ready: make(chan bool),
  703. nextConsumer: consumertest.NewErr(consumerError),
  704. obsrecv: obsrecv,
  705. headerExtractor: &nopHeaderExtractor{},
  706. }
  707. wg := sync.WaitGroup{}
  708. wg.Add(1)
  709. groupClaim := &testConsumerGroupClaim{
  710. messageChan: make(chan *sarama.ConsumerMessage),
  711. }
  712. go func() {
  713. e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
  714. assert.EqualError(t, e, consumerError.Error())
  715. wg.Done()
  716. }()
  717. ld := testdata.GenerateLogsOneLogRecord()
  718. unmarshaler := &plog.ProtoMarshaler{}
  719. bts, err := unmarshaler.MarshalLogs(ld)
  720. require.NoError(t, err)
  721. groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
  722. close(groupClaim.messageChan)
  723. wg.Wait()
  724. }
  725. // Test unmarshaler for different charsets and encodings.
  726. func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) {
  727. tests := []struct {
  728. name string
  729. text string
  730. enc string
  731. }{
  732. {
  733. name: "unmarshal test for Englist (ASCII characters) with text_utf8",
  734. text: "ASCII characters test",
  735. enc: "utf8",
  736. },
  737. {
  738. name: "unmarshal test for unicode with text_utf8",
  739. text: "UTF8 测试 測試 テスト 테스트 ☺️",
  740. enc: "utf8",
  741. },
  742. {
  743. name: "unmarshal test for Simplified Chinese with text_gbk",
  744. text: "GBK 简体中文解码测试",
  745. enc: "gbk",
  746. },
  747. {
  748. name: "unmarshal test for Japanese with text_shift_jis",
  749. text: "Shift_JIS 日本のデコードテスト",
  750. enc: "shift_jis",
  751. },
  752. {
  753. name: "unmarshal test for Korean with text_euc-kr",
  754. text: "EUC-KR 한국 디코딩 테스트",
  755. enc: "euc-kr",
  756. },
  757. }
  758. for _, test := range tests {
  759. t.Run(test.name, func(t *testing.T) {
  760. obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
  761. require.NoError(t, err)
  762. unmarshaler := newTextLogsUnmarshaler()
  763. unmarshaler, err = unmarshaler.WithEnc(test.enc)
  764. require.NoError(t, err)
  765. sink := &consumertest.LogsSink{}
  766. c := logsConsumerGroupHandler{
  767. unmarshaler: unmarshaler,
  768. logger: zap.NewNop(),
  769. ready: make(chan bool),
  770. nextConsumer: sink,
  771. obsrecv: obsrecv,
  772. headerExtractor: &nopHeaderExtractor{},
  773. }
  774. wg := sync.WaitGroup{}
  775. wg.Add(1)
  776. groupClaim := &testConsumerGroupClaim{
  777. messageChan: make(chan *sarama.ConsumerMessage),
  778. }
  779. go func() {
  780. err = c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
  781. assert.NoError(t, err)
  782. wg.Done()
  783. }()
  784. encCfg := textutils.NewEncodingConfig()
  785. encCfg.Encoding = test.enc
  786. enc, err := encCfg.Build()
  787. require.NoError(t, err)
  788. encoder := enc.Encoding.NewEncoder()
  789. encoded, err := encoder.Bytes([]byte(test.text))
  790. require.NoError(t, err)
  791. t1 := time.Now()
  792. groupClaim.messageChan <- &sarama.ConsumerMessage{Value: encoded}
  793. close(groupClaim.messageChan)
  794. wg.Wait()
  795. require.Equal(t, sink.LogRecordCount(), 1)
  796. log := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
  797. assert.Equal(t, log.Body().Str(), test.text)
  798. assert.LessOrEqual(t, t1, log.ObservedTimestamp().AsTime())
  799. assert.LessOrEqual(t, log.ObservedTimestamp().AsTime(), time.Now())
  800. })
  801. }
  802. }
  803. func TestGetLogsUnmarshaler_encoding_text(t *testing.T) {
  804. tests := []struct {
  805. name string
  806. encoding string
  807. }{
  808. {
  809. name: "default text encoding",
  810. encoding: "text",
  811. },
  812. {
  813. name: "utf-8 text encoding",
  814. encoding: "text_utf-8",
  815. },
  816. {
  817. name: "gbk text encoding",
  818. encoding: "text_gbk",
  819. },
  820. {
  821. name: "shift_jis text encoding, which contains an underline",
  822. encoding: "text_shift_jis",
  823. },
  824. }
  825. for _, test := range tests {
  826. t.Run(test.name, func(t *testing.T) {
  827. _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers())
  828. assert.NoError(t, err)
  829. })
  830. }
  831. }
  832. func TestGetLogsUnmarshaler_encoding_text_error(t *testing.T) {
  833. tests := []struct {
  834. name string
  835. encoding string
  836. }{
  837. {
  838. name: "text encoding has typo",
  839. encoding: "text_uft-8",
  840. },
  841. {
  842. name: "text encoding is a random string",
  843. encoding: "text_vnbqgoba156",
  844. },
  845. }
  846. for _, test := range tests {
  847. t.Run(test.name, func(t *testing.T) {
  848. _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers())
  849. assert.ErrorContains(t, err, fmt.Sprintf("unsupported encoding '%v'", test.encoding[5:]))
  850. })
  851. }
  852. }
  853. func TestCreateLogsReceiver_encoding_text_error(t *testing.T) {
  854. cfg := Config{
  855. Encoding: "text_uft-8",
  856. }
  857. _, err := newLogsReceiver(cfg, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
  858. // encoding error comes first
  859. assert.Error(t, err, "unsupported encoding")
  860. }
  861. func TestToSaramaInitialOffset_earliest(t *testing.T) {
  862. saramaInitialOffset, err := toSaramaInitialOffset(offsetEarliest)
  863. require.NoError(t, err)
  864. assert.Equal(t, sarama.OffsetOldest, saramaInitialOffset)
  865. }
  866. func TestToSaramaInitialOffset_latest(t *testing.T) {
  867. saramaInitialOffset, err := toSaramaInitialOffset(offsetLatest)
  868. require.NoError(t, err)
  869. assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset)
  870. }
  871. func TestToSaramaInitialOffset_default(t *testing.T) {
  872. saramaInitialOffset, err := toSaramaInitialOffset("")
  873. require.NoError(t, err)
  874. assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset)
  875. }
  876. func TestToSaramaInitialOffset_invalid(t *testing.T) {
  877. _, err := toSaramaInitialOffset("other")
  878. assert.Equal(t, err, errInvalidInitialOffset)
  879. }
  880. type testConsumerGroupClaim struct {
  881. messageChan chan *sarama.ConsumerMessage
  882. }
  883. var _ sarama.ConsumerGroupClaim = (*testConsumerGroupClaim)(nil)
  884. const (
  885. testTopic = "otlp_spans"
  886. testPartition = 5
  887. testInitialOffset = 6
  888. testHighWatermarkOffset = 4
  889. )
  890. func (t testConsumerGroupClaim) Topic() string {
  891. return testTopic
  892. }
  893. func (t testConsumerGroupClaim) Partition() int32 {
  894. return testPartition
  895. }
  896. func (t testConsumerGroupClaim) InitialOffset() int64 {
  897. return testInitialOffset
  898. }
  899. func (t testConsumerGroupClaim) HighWaterMarkOffset() int64 {
  900. return testHighWatermarkOffset
  901. }
  902. func (t testConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
  903. return t.messageChan
  904. }
  905. type testConsumerGroupSession struct {
  906. ctx context.Context
  907. }
  908. func (t testConsumerGroupSession) Commit() {
  909. }
  910. var _ sarama.ConsumerGroupSession = (*testConsumerGroupSession)(nil)
  911. func (t testConsumerGroupSession) Claims() map[string][]int32 {
  912. panic("implement me")
  913. }
  914. func (t testConsumerGroupSession) MemberID() string {
  915. panic("implement me")
  916. }
  917. func (t testConsumerGroupSession) GenerationID() int32 {
  918. panic("implement me")
  919. }
  920. func (t testConsumerGroupSession) MarkOffset(string, int32, int64, string) {
  921. }
  922. func (t testConsumerGroupSession) ResetOffset(string, int32, int64, string) {
  923. panic("implement me")
  924. }
  925. func (t testConsumerGroupSession) MarkMessage(*sarama.ConsumerMessage, string) {}
  926. func (t testConsumerGroupSession) Context() context.Context {
  927. return t.ctx
  928. }
  929. type testConsumerGroup struct {
  930. once sync.Once
  931. err error
  932. }
  933. var _ sarama.ConsumerGroup = (*testConsumerGroup)(nil)
  934. func (t *testConsumerGroup) Consume(ctx context.Context, _ []string, handler sarama.ConsumerGroupHandler) error {
  935. t.once.Do(func() {
  936. _ = handler.Setup(testConsumerGroupSession{ctx: ctx})
  937. })
  938. return t.err
  939. }
  940. func (t *testConsumerGroup) Errors() <-chan error {
  941. panic("implement me")
  942. }
  943. func (t *testConsumerGroup) Close() error {
  944. return nil
  945. }
  946. func (t *testConsumerGroup) Pause(_ map[string][]int32) {
  947. panic("implement me")
  948. }
  949. func (t *testConsumerGroup) PauseAll() {
  950. panic("implement me")
  951. }
  952. func (t *testConsumerGroup) Resume(_ map[string][]int32) {
  953. panic("implement me")
  954. }
  955. func (t *testConsumerGroup) ResumeAll() {
  956. panic("implement me")
  957. }