1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package kafkareceiver
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "testing"
- "time"
- "github.com/IBM/sarama"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opencensus.io/stats/view"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/config/configtls"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.opentelemetry.io/collector/receiver/receiverhelper"
- "go.opentelemetry.io/collector/receiver/receivertest"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- "go.uber.org/zap/zaptest/observer"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
- )
- func TestNewTracesReceiver_version_err(t *testing.T) {
- c := Config{
- Encoding: defaultEncoding,
- ProtocolVersion: "none",
- }
- r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Nil(t, r)
- }
- func TestNewTracesReceiver_encoding_err(t *testing.T) {
- c := Config{
- Encoding: "foo",
- }
- r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- }
- func TestNewTracesReceiver_err_auth_type(t *testing.T) {
- c := Config{
- ProtocolVersion: "2.0.0",
- Authentication: kafka.Authentication{
- TLS: &configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "/doesnotexist",
- },
- },
- },
- Encoding: defaultEncoding,
- Metadata: kafkaexporter.Metadata{
- Full: false,
- },
- }
- r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, r)
- }
- func TestNewTracesReceiver_initial_offset_err(t *testing.T) {
- c := Config{
- InitialOffset: "foo",
- Encoding: defaultEncoding,
- }
- r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errInvalidInitialOffset.Error())
- }
- func TestTracesReceiverStart(t *testing.T) {
- c := kafkaTracesConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- }
- func TestTracesReceiverStartConsume(t *testing.T) {
- c := kafkaTracesConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- c.cancelConsumeLoop = cancelFunc
- require.NoError(t, c.Shutdown(context.Background()))
- err := c.consumeLoop(ctx, &tracesConsumerGroupHandler{
- ready: make(chan bool),
- })
- assert.EqualError(t, err, context.Canceled.Error())
- }
- func TestTracesReceiver_error(t *testing.T) {
- zcore, logObserver := observer.New(zapcore.ErrorLevel)
- logger := zap.New(zcore)
- settings := receivertest.NewNopCreateSettings()
- settings.Logger = logger
- expectedErr := errors.New("handler error")
- c := kafkaTracesConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: settings,
- consumerGroup: &testConsumerGroup{err: expectedErr},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- assert.Eventually(t, func() bool {
- return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
- }, 10*time.Second, time.Millisecond*100)
- }
- func TestTracesConsumerGroupHandler(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := tracesConsumerGroupHandler{
- unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- testSession := testConsumerGroupSession{ctx: context.Background()}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := tracesConsumerGroupHandler{
- unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- testSession := testConsumerGroupSession{ctx: ctx}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- defer close(groupClaim.messageChan)
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- cancelFunc()
- wg.Wait()
- }
- func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := tracesConsumerGroupHandler{
- unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- require.Error(t, err)
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
- consumerError := errors.New("failed to consume")
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := tracesConsumerGroupHandler{
- unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewErr(consumerError),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- assert.EqualError(t, e, consumerError.Error())
- wg.Done()
- }()
- td := ptrace.NewTraces()
- td.ResourceSpans().AppendEmpty()
- unmarshaler := &ptrace.ProtoMarshaler{}
- bts, err := unmarshaler.MarshalTraces(td)
- require.NoError(t, err)
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestNewMetricsReceiver_version_err(t *testing.T) {
- c := Config{
- Encoding: defaultEncoding,
- ProtocolVersion: "none",
- }
- r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Nil(t, r)
- }
- func TestNewMetricsReceiver_encoding_err(t *testing.T) {
- c := Config{
- Encoding: "foo",
- }
- r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- }
- func TestNewMetricsExporter_err_auth_type(t *testing.T) {
- c := Config{
- ProtocolVersion: "2.0.0",
- Authentication: kafka.Authentication{
- TLS: &configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "/doesnotexist",
- },
- },
- },
- Encoding: defaultEncoding,
- Metadata: kafkaexporter.Metadata{
- Full: false,
- },
- }
- r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, r)
- }
- func TestNewMetricsReceiver_initial_offset_err(t *testing.T) {
- c := Config{
- InitialOffset: "foo",
- Encoding: defaultEncoding,
- }
- r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errInvalidInitialOffset.Error())
- }
- func TestMetricsReceiverStart(t *testing.T) {
- c := kafkaMetricsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- }
- func TestMetricsReceiverStartConsume(t *testing.T) {
- c := kafkaMetricsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- c.cancelConsumeLoop = cancelFunc
- require.NoError(t, c.Shutdown(context.Background()))
- err := c.consumeLoop(ctx, &logsConsumerGroupHandler{
- ready: make(chan bool),
- })
- assert.EqualError(t, err, context.Canceled.Error())
- }
- func TestMetricsReceiver_error(t *testing.T) {
- zcore, logObserver := observer.New(zapcore.ErrorLevel)
- logger := zap.New(zcore)
- settings := receivertest.NewNopCreateSettings()
- settings.Logger = logger
- expectedErr := errors.New("handler error")
- c := kafkaMetricsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: settings,
- consumerGroup: &testConsumerGroup{err: expectedErr},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- assert.Eventually(t, func() bool {
- return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
- }, 10*time.Second, time.Millisecond*100)
- }
- func TestMetricsConsumerGroupHandler(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := metricsConsumerGroupHandler{
- unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- testSession := testConsumerGroupSession{ctx: context.Background()}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := metricsConsumerGroupHandler{
- unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- testSession := testConsumerGroupSession{ctx: ctx}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- defer close(groupClaim.messageChan)
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- cancelFunc()
- wg.Wait()
- }
- func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := metricsConsumerGroupHandler{
- unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- require.Error(t, err)
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
- consumerError := errors.New("failed to consume")
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := metricsConsumerGroupHandler{
- unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewErr(consumerError),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- assert.EqualError(t, e, consumerError.Error())
- wg.Done()
- }()
- ld := testdata.GenerateMetricsOneMetric()
- unmarshaler := &pmetric.ProtoMarshaler{}
- bts, err := unmarshaler.MarshalMetrics(ld)
- require.NoError(t, err)
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestNewLogsReceiver_version_err(t *testing.T) {
- c := Config{
- Encoding: defaultEncoding,
- ProtocolVersion: "none",
- }
- r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Nil(t, r)
- }
- func TestNewLogsReceiver_encoding_err(t *testing.T) {
- c := Config{
- Encoding: "foo",
- }
- r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- }
- func TestNewLogsExporter_err_auth_type(t *testing.T) {
- c := Config{
- ProtocolVersion: "2.0.0",
- Authentication: kafka.Authentication{
- TLS: &configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "/doesnotexist",
- },
- },
- },
- Encoding: defaultEncoding,
- Metadata: kafkaexporter.Metadata{
- Full: false,
- },
- }
- r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, r)
- }
- func TestNewLogsReceiver_initial_offset_err(t *testing.T) {
- c := Config{
- InitialOffset: "foo",
- Encoding: defaultEncoding,
- }
- r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errInvalidInitialOffset.Error())
- }
- func TestLogsReceiverStart(t *testing.T) {
- c := kafkaLogsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- }
- func TestLogsReceiverStartConsume(t *testing.T) {
- c := kafkaLogsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- c.cancelConsumeLoop = cancelFunc
- require.NoError(t, c.Shutdown(context.Background()))
- err := c.consumeLoop(ctx, &logsConsumerGroupHandler{
- ready: make(chan bool),
- })
- assert.EqualError(t, err, context.Canceled.Error())
- }
- func TestLogsReceiver_error(t *testing.T) {
- zcore, logObserver := observer.New(zapcore.ErrorLevel)
- logger := zap.New(zcore)
- settings := receivertest.NewNopCreateSettings()
- settings.Logger = logger
- expectedErr := errors.New("handler error")
- c := kafkaLogsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: settings,
- consumerGroup: &testConsumerGroup{err: expectedErr},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- assert.Eventually(t, func() bool {
- return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
- }, 10*time.Second, time.Millisecond*100)
- }
- func TestLogsConsumerGroupHandler(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := logsConsumerGroupHandler{
- unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- testSession := testConsumerGroupSession{ctx: context.Background()}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := logsConsumerGroupHandler{
- unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- testSession := testConsumerGroupSession{ctx: ctx}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- defer close(groupClaim.messageChan)
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- cancelFunc()
- wg.Wait()
- }
- func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := logsConsumerGroupHandler{
- unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- require.Error(t, err)
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
- consumerError := errors.New("failed to consume")
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := logsConsumerGroupHandler{
- unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewErr(consumerError),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- assert.EqualError(t, e, consumerError.Error())
- wg.Done()
- }()
- ld := testdata.GenerateLogsOneLogRecord()
- unmarshaler := &plog.ProtoMarshaler{}
- bts, err := unmarshaler.MarshalLogs(ld)
- require.NoError(t, err)
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- // Test unmarshaler for different charsets and encodings.
- func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) {
- tests := []struct {
- name string
- text string
- enc string
- }{
- {
- name: "unmarshal test for Englist (ASCII characters) with text_utf8",
- text: "ASCII characters test",
- enc: "utf8",
- },
- {
- name: "unmarshal test for unicode with text_utf8",
- text: "UTF8 测试 測試 テスト 테스트 ☺️",
- enc: "utf8",
- },
- {
- name: "unmarshal test for Simplified Chinese with text_gbk",
- text: "GBK 简体中文解码测试",
- enc: "gbk",
- },
- {
- name: "unmarshal test for Japanese with text_shift_jis",
- text: "Shift_JIS 日本のデコードテスト",
- enc: "shift_jis",
- },
- {
- name: "unmarshal test for Korean with text_euc-kr",
- text: "EUC-KR 한국 디코딩 테스트",
- enc: "euc-kr",
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- unmarshaler := newTextLogsUnmarshaler()
- unmarshaler, err = unmarshaler.WithEnc(test.enc)
- require.NoError(t, err)
- sink := &consumertest.LogsSink{}
- c := logsConsumerGroupHandler{
- unmarshaler: unmarshaler,
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: sink,
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- err = c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- assert.NoError(t, err)
- wg.Done()
- }()
- encCfg := textutils.NewEncodingConfig()
- encCfg.Encoding = test.enc
- enc, err := encCfg.Build()
- require.NoError(t, err)
- encoder := enc.Encoding.NewEncoder()
- encoded, err := encoder.Bytes([]byte(test.text))
- require.NoError(t, err)
- t1 := time.Now()
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: encoded}
- close(groupClaim.messageChan)
- wg.Wait()
- require.Equal(t, sink.LogRecordCount(), 1)
- log := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
- assert.Equal(t, log.Body().Str(), test.text)
- assert.LessOrEqual(t, t1, log.ObservedTimestamp().AsTime())
- assert.LessOrEqual(t, log.ObservedTimestamp().AsTime(), time.Now())
- })
- }
- }
- func TestGetLogsUnmarshaler_encoding_text(t *testing.T) {
- tests := []struct {
- name string
- encoding string
- }{
- {
- name: "default text encoding",
- encoding: "text",
- },
- {
- name: "utf-8 text encoding",
- encoding: "text_utf-8",
- },
- {
- name: "gbk text encoding",
- encoding: "text_gbk",
- },
- {
- name: "shift_jis text encoding, which contains an underline",
- encoding: "text_shift_jis",
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers())
- assert.NoError(t, err)
- })
- }
- }
- func TestGetLogsUnmarshaler_encoding_text_error(t *testing.T) {
- tests := []struct {
- name string
- encoding string
- }{
- {
- name: "text encoding has typo",
- encoding: "text_uft-8",
- },
- {
- name: "text encoding is a random string",
- encoding: "text_vnbqgoba156",
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers())
- assert.ErrorContains(t, err, fmt.Sprintf("unsupported encoding '%v'", test.encoding[5:]))
- })
- }
- }
- func TestCreateLogsReceiver_encoding_text_error(t *testing.T) {
- cfg := Config{
- Encoding: "text_uft-8",
- }
- _, err := newLogsReceiver(cfg, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- // encoding error comes first
- assert.Error(t, err, "unsupported encoding")
- }
- func TestToSaramaInitialOffset_earliest(t *testing.T) {
- saramaInitialOffset, err := toSaramaInitialOffset(offsetEarliest)
- require.NoError(t, err)
- assert.Equal(t, sarama.OffsetOldest, saramaInitialOffset)
- }
- func TestToSaramaInitialOffset_latest(t *testing.T) {
- saramaInitialOffset, err := toSaramaInitialOffset(offsetLatest)
- require.NoError(t, err)
- assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset)
- }
- func TestToSaramaInitialOffset_default(t *testing.T) {
- saramaInitialOffset, err := toSaramaInitialOffset("")
- require.NoError(t, err)
- assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset)
- }
- func TestToSaramaInitialOffset_invalid(t *testing.T) {
- _, err := toSaramaInitialOffset("other")
- assert.Equal(t, err, errInvalidInitialOffset)
- }
- type testConsumerGroupClaim struct {
- messageChan chan *sarama.ConsumerMessage
- }
- var _ sarama.ConsumerGroupClaim = (*testConsumerGroupClaim)(nil)
- const (
- testTopic = "otlp_spans"
- testPartition = 5
- testInitialOffset = 6
- testHighWatermarkOffset = 4
- )
- func (t testConsumerGroupClaim) Topic() string {
- return testTopic
- }
- func (t testConsumerGroupClaim) Partition() int32 {
- return testPartition
- }
- func (t testConsumerGroupClaim) InitialOffset() int64 {
- return testInitialOffset
- }
- func (t testConsumerGroupClaim) HighWaterMarkOffset() int64 {
- return testHighWatermarkOffset
- }
- func (t testConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
- return t.messageChan
- }
- type testConsumerGroupSession struct {
- ctx context.Context
- }
- func (t testConsumerGroupSession) Commit() {
- }
- var _ sarama.ConsumerGroupSession = (*testConsumerGroupSession)(nil)
- func (t testConsumerGroupSession) Claims() map[string][]int32 {
- panic("implement me")
- }
- func (t testConsumerGroupSession) MemberID() string {
- panic("implement me")
- }
- func (t testConsumerGroupSession) GenerationID() int32 {
- panic("implement me")
- }
- func (t testConsumerGroupSession) MarkOffset(string, int32, int64, string) {
- }
- func (t testConsumerGroupSession) ResetOffset(string, int32, int64, string) {
- panic("implement me")
- }
- func (t testConsumerGroupSession) MarkMessage(*sarama.ConsumerMessage, string) {}
- func (t testConsumerGroupSession) Context() context.Context {
- return t.ctx
- }
- type testConsumerGroup struct {
- once sync.Once
- err error
- }
- var _ sarama.ConsumerGroup = (*testConsumerGroup)(nil)
- func (t *testConsumerGroup) Consume(ctx context.Context, _ []string, handler sarama.ConsumerGroupHandler) error {
- t.once.Do(func() {
- _ = handler.Setup(testConsumerGroupSession{ctx: ctx})
- })
- return t.err
- }
- func (t *testConsumerGroup) Errors() <-chan error {
- panic("implement me")
- }
- func (t *testConsumerGroup) Close() error {
- return nil
- }
- func (t *testConsumerGroup) Pause(_ map[string][]int32) {
- panic("implement me")
- }
- func (t *testConsumerGroup) PauseAll() {
- panic("implement me")
- }
- func (t *testConsumerGroup) Resume(_ map[string][]int32) {
- panic("implement me")
- }
- func (t *testConsumerGroup) ResumeAll() {
- panic("implement me")
- }
|