|
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package kafkareceiver
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "testing"
- "time"
- "github.com/IBM/sarama"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opencensus.io/stats/view"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/config/configtls"
- "go.opentelemetry.io/collector/consumer/consumertest"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.opentelemetry.io/collector/receiver/receiverhelper"
- "go.opentelemetry.io/collector/receiver/receivertest"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- "go.uber.org/zap/zaptest/observer"
- "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
- )
- func TestNewTracesReceiver_version_err(t *testing.T) {
- c := Config{
- Encoding: defaultEncoding,
- ProtocolVersion: "none",
- }
- r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Nil(t, r)
- }
- func TestNewTracesReceiver_encoding_err(t *testing.T) {
- c := Config{
- Encoding: "foo",
- }
- r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- }
- func TestNewTracesReceiver_err_auth_type(t *testing.T) {
- c := Config{
- ProtocolVersion: "2.0.0",
- Authentication: kafka.Authentication{
- TLS: &configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "/doesnotexist",
- },
- },
- },
- Encoding: defaultEncoding,
- Metadata: kafkaexporter.Metadata{
- Full: false,
- },
- }
- r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, r)
- }
- func TestNewTracesReceiver_initial_offset_err(t *testing.T) {
- c := Config{
- InitialOffset: "foo",
- Encoding: defaultEncoding,
- }
- r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errInvalidInitialOffset.Error())
- }
- func TestTracesReceiverStart(t *testing.T) {
- c := kafkaTracesConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- }
- func TestTracesReceiverStartConsume(t *testing.T) {
- c := kafkaTracesConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- c.cancelConsumeLoop = cancelFunc
- require.NoError(t, c.Shutdown(context.Background()))
- err := c.consumeLoop(ctx, &tracesConsumerGroupHandler{
- ready: make(chan bool),
- })
- assert.EqualError(t, err, context.Canceled.Error())
- }
- func TestTracesReceiver_error(t *testing.T) {
- zcore, logObserver := observer.New(zapcore.ErrorLevel)
- logger := zap.New(zcore)
- settings := receivertest.NewNopCreateSettings()
- settings.Logger = logger
- expectedErr := errors.New("handler error")
- c := kafkaTracesConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: settings,
- consumerGroup: &testConsumerGroup{err: expectedErr},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- assert.Eventually(t, func() bool {
- return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
- }, 10*time.Second, time.Millisecond*100)
- }
- func TestTracesConsumerGroupHandler(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := tracesConsumerGroupHandler{
- unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- testSession := testConsumerGroupSession{ctx: context.Background()}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := tracesConsumerGroupHandler{
- unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- testSession := testConsumerGroupSession{ctx: ctx}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- defer close(groupClaim.messageChan)
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- cancelFunc()
- wg.Wait()
- }
- func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := tracesConsumerGroupHandler{
- unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- require.Error(t, err)
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
- consumerError := errors.New("failed to consume")
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := tracesConsumerGroupHandler{
- unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewErr(consumerError),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- assert.EqualError(t, e, consumerError.Error())
- wg.Done()
- }()
- td := ptrace.NewTraces()
- td.ResourceSpans().AppendEmpty()
- unmarshaler := &ptrace.ProtoMarshaler{}
- bts, err := unmarshaler.MarshalTraces(td)
- require.NoError(t, err)
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestNewMetricsReceiver_version_err(t *testing.T) {
- c := Config{
- Encoding: defaultEncoding,
- ProtocolVersion: "none",
- }
- r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Nil(t, r)
- }
- func TestNewMetricsReceiver_encoding_err(t *testing.T) {
- c := Config{
- Encoding: "foo",
- }
- r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- }
- func TestNewMetricsExporter_err_auth_type(t *testing.T) {
- c := Config{
- ProtocolVersion: "2.0.0",
- Authentication: kafka.Authentication{
- TLS: &configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "/doesnotexist",
- },
- },
- },
- Encoding: defaultEncoding,
- Metadata: kafkaexporter.Metadata{
- Full: false,
- },
- }
- r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, r)
- }
- func TestNewMetricsReceiver_initial_offset_err(t *testing.T) {
- c := Config{
- InitialOffset: "foo",
- Encoding: defaultEncoding,
- }
- r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errInvalidInitialOffset.Error())
- }
- func TestMetricsReceiverStart(t *testing.T) {
- c := kafkaMetricsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- }
- func TestMetricsReceiverStartConsume(t *testing.T) {
- c := kafkaMetricsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- c.cancelConsumeLoop = cancelFunc
- require.NoError(t, c.Shutdown(context.Background()))
- err := c.consumeLoop(ctx, &logsConsumerGroupHandler{
- ready: make(chan bool),
- })
- assert.EqualError(t, err, context.Canceled.Error())
- }
- func TestMetricsReceiver_error(t *testing.T) {
- zcore, logObserver := observer.New(zapcore.ErrorLevel)
- logger := zap.New(zcore)
- settings := receivertest.NewNopCreateSettings()
- settings.Logger = logger
- expectedErr := errors.New("handler error")
- c := kafkaMetricsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: settings,
- consumerGroup: &testConsumerGroup{err: expectedErr},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- assert.Eventually(t, func() bool {
- return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
- }, 10*time.Second, time.Millisecond*100)
- }
- func TestMetricsConsumerGroupHandler(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := metricsConsumerGroupHandler{
- unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- testSession := testConsumerGroupSession{ctx: context.Background()}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := metricsConsumerGroupHandler{
- unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- testSession := testConsumerGroupSession{ctx: ctx}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- defer close(groupClaim.messageChan)
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- cancelFunc()
- wg.Wait()
- }
- func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := metricsConsumerGroupHandler{
- unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- require.Error(t, err)
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
- consumerError := errors.New("failed to consume")
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := metricsConsumerGroupHandler{
- unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewErr(consumerError),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- assert.EqualError(t, e, consumerError.Error())
- wg.Done()
- }()
- ld := testdata.GenerateMetricsOneMetric()
- unmarshaler := &pmetric.ProtoMarshaler{}
- bts, err := unmarshaler.MarshalMetrics(ld)
- require.NoError(t, err)
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestNewLogsReceiver_version_err(t *testing.T) {
- c := Config{
- Encoding: defaultEncoding,
- ProtocolVersion: "none",
- }
- r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Nil(t, r)
- }
- func TestNewLogsReceiver_encoding_err(t *testing.T) {
- c := Config{
- Encoding: "foo",
- }
- r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- }
- func TestNewLogsExporter_err_auth_type(t *testing.T) {
- c := Config{
- ProtocolVersion: "2.0.0",
- Authentication: kafka.Authentication{
- TLS: &configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "/doesnotexist",
- },
- },
- },
- Encoding: defaultEncoding,
- Metadata: kafkaexporter.Metadata{
- Full: false,
- },
- }
- r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, r)
- }
- func TestNewLogsReceiver_initial_offset_err(t *testing.T) {
- c := Config{
- InitialOffset: "foo",
- Encoding: defaultEncoding,
- }
- r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
- assert.EqualError(t, err, errInvalidInitialOffset.Error())
- }
- func TestLogsReceiverStart(t *testing.T) {
- c := kafkaLogsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- }
- func TestLogsReceiverStartConsume(t *testing.T) {
- c := kafkaLogsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: receivertest.NewNopCreateSettings(),
- consumerGroup: &testConsumerGroup{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- c.cancelConsumeLoop = cancelFunc
- require.NoError(t, c.Shutdown(context.Background()))
- err := c.consumeLoop(ctx, &logsConsumerGroupHandler{
- ready: make(chan bool),
- })
- assert.EqualError(t, err, context.Canceled.Error())
- }
- func TestLogsReceiver_error(t *testing.T) {
- zcore, logObserver := observer.New(zapcore.ErrorLevel)
- logger := zap.New(zcore)
- settings := receivertest.NewNopCreateSettings()
- settings.Logger = logger
- expectedErr := errors.New("handler error")
- c := kafkaLogsConsumer{
- nextConsumer: consumertest.NewNop(),
- settings: settings,
- consumerGroup: &testConsumerGroup{err: expectedErr},
- }
- require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
- require.NoError(t, c.Shutdown(context.Background()))
- assert.Eventually(t, func() bool {
- return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
- }, 10*time.Second, time.Millisecond*100)
- }
- func TestLogsConsumerGroupHandler(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := logsConsumerGroupHandler{
- unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- testSession := testConsumerGroupSession{ctx: context.Background()}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
- view.Unregister(metricViews()...)
- views := metricViews()
- require.NoError(t, view.Register(views...))
- defer view.Unregister(views...)
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := logsConsumerGroupHandler{
- unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- ctx, cancelFunc := context.WithCancel(context.Background())
- testSession := testConsumerGroupSession{ctx: ctx}
- require.NoError(t, c.Setup(testSession))
- _, ok := <-c.ready
- assert.False(t, ok)
- viewData, err := view.RetrieveData(statPartitionStart.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData := viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- require.NoError(t, c.Cleanup(testSession))
- viewData, err = view.RetrieveData(statPartitionClose.Name())
- require.NoError(t, err)
- assert.Equal(t, 1, len(viewData))
- distData = viewData[0].Data.(*view.SumData)
- assert.Equal(t, float64(1), distData.Value)
- groupClaim := testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- defer close(groupClaim.messageChan)
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- require.NoError(t, c.ConsumeClaim(testSession, groupClaim))
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{}
- cancelFunc()
- wg.Wait()
- }
- func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := logsConsumerGroupHandler{
- unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewNop(),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- require.Error(t, err)
- wg.Done()
- }()
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
- consumerError := errors.New("failed to consume")
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- c := logsConsumerGroupHandler{
- unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: consumertest.NewErr(consumerError),
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- assert.EqualError(t, e, consumerError.Error())
- wg.Done()
- }()
- ld := testdata.GenerateLogsOneLogRecord()
- unmarshaler := &plog.ProtoMarshaler{}
- bts, err := unmarshaler.MarshalLogs(ld)
- require.NoError(t, err)
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
- close(groupClaim.messageChan)
- wg.Wait()
- }
- // Test unmarshaler for different charsets and encodings.
- func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) {
- tests := []struct {
- name string
- text string
- enc string
- }{
- {
- name: "unmarshal test for Englist (ASCII characters) with text_utf8",
- text: "ASCII characters test",
- enc: "utf8",
- },
- {
- name: "unmarshal test for unicode with text_utf8",
- text: "UTF8 测试 測試 テスト 테스트 ☺️",
- enc: "utf8",
- },
- {
- name: "unmarshal test for Simplified Chinese with text_gbk",
- text: "GBK 简体中文解码测试",
- enc: "gbk",
- },
- {
- name: "unmarshal test for Japanese with text_shift_jis",
- text: "Shift_JIS 日本のデコードテスト",
- enc: "shift_jis",
- },
- {
- name: "unmarshal test for Korean with text_euc-kr",
- text: "EUC-KR 한국 디코딩 테스트",
- enc: "euc-kr",
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
- require.NoError(t, err)
- unmarshaler := newTextLogsUnmarshaler()
- unmarshaler, err = unmarshaler.WithEnc(test.enc)
- require.NoError(t, err)
- sink := &consumertest.LogsSink{}
- c := logsConsumerGroupHandler{
- unmarshaler: unmarshaler,
- logger: zap.NewNop(),
- ready: make(chan bool),
- nextConsumer: sink,
- obsrecv: obsrecv,
- headerExtractor: &nopHeaderExtractor{},
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- groupClaim := &testConsumerGroupClaim{
- messageChan: make(chan *sarama.ConsumerMessage),
- }
- go func() {
- err = c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
- assert.NoError(t, err)
- wg.Done()
- }()
- encCfg := textutils.NewEncodingConfig()
- encCfg.Encoding = test.enc
- enc, err := encCfg.Build()
- require.NoError(t, err)
- encoder := enc.Encoding.NewEncoder()
- encoded, err := encoder.Bytes([]byte(test.text))
- require.NoError(t, err)
- t1 := time.Now()
- groupClaim.messageChan <- &sarama.ConsumerMessage{Value: encoded}
- close(groupClaim.messageChan)
- wg.Wait()
- require.Equal(t, sink.LogRecordCount(), 1)
- log := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
- assert.Equal(t, log.Body().Str(), test.text)
- assert.LessOrEqual(t, t1, log.ObservedTimestamp().AsTime())
- assert.LessOrEqual(t, log.ObservedTimestamp().AsTime(), time.Now())
- })
- }
- }
- func TestGetLogsUnmarshaler_encoding_text(t *testing.T) {
- tests := []struct {
- name string
- encoding string
- }{
- {
- name: "default text encoding",
- encoding: "text",
- },
- {
- name: "utf-8 text encoding",
- encoding: "text_utf-8",
- },
- {
- name: "gbk text encoding",
- encoding: "text_gbk",
- },
- {
- name: "shift_jis text encoding, which contains an underline",
- encoding: "text_shift_jis",
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers())
- assert.NoError(t, err)
- })
- }
- }
- func TestGetLogsUnmarshaler_encoding_text_error(t *testing.T) {
- tests := []struct {
- name string
- encoding string
- }{
- {
- name: "text encoding has typo",
- encoding: "text_uft-8",
- },
- {
- name: "text encoding is a random string",
- encoding: "text_vnbqgoba156",
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers())
- assert.ErrorContains(t, err, fmt.Sprintf("unsupported encoding '%v'", test.encoding[5:]))
- })
- }
- }
- func TestCreateLogsReceiver_encoding_text_error(t *testing.T) {
- cfg := Config{
- Encoding: "text_uft-8",
- }
- _, err := newLogsReceiver(cfg, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
- // encoding error comes first
- assert.Error(t, err, "unsupported encoding")
- }
- func TestToSaramaInitialOffset_earliest(t *testing.T) {
- saramaInitialOffset, err := toSaramaInitialOffset(offsetEarliest)
- require.NoError(t, err)
- assert.Equal(t, sarama.OffsetOldest, saramaInitialOffset)
- }
- func TestToSaramaInitialOffset_latest(t *testing.T) {
- saramaInitialOffset, err := toSaramaInitialOffset(offsetLatest)
- require.NoError(t, err)
- assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset)
- }
- func TestToSaramaInitialOffset_default(t *testing.T) {
- saramaInitialOffset, err := toSaramaInitialOffset("")
- require.NoError(t, err)
- assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset)
- }
- func TestToSaramaInitialOffset_invalid(t *testing.T) {
- _, err := toSaramaInitialOffset("other")
- assert.Equal(t, err, errInvalidInitialOffset)
- }
- type testConsumerGroupClaim struct {
- messageChan chan *sarama.ConsumerMessage
- }
- var _ sarama.ConsumerGroupClaim = (*testConsumerGroupClaim)(nil)
- const (
- testTopic = "otlp_spans"
- testPartition = 5
- testInitialOffset = 6
- testHighWatermarkOffset = 4
- )
- func (t testConsumerGroupClaim) Topic() string {
- return testTopic
- }
- func (t testConsumerGroupClaim) Partition() int32 {
- return testPartition
- }
- func (t testConsumerGroupClaim) InitialOffset() int64 {
- return testInitialOffset
- }
- func (t testConsumerGroupClaim) HighWaterMarkOffset() int64 {
- return testHighWatermarkOffset
- }
- func (t testConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
- return t.messageChan
- }
- type testConsumerGroupSession struct {
- ctx context.Context
- }
- func (t testConsumerGroupSession) Commit() {
- }
- var _ sarama.ConsumerGroupSession = (*testConsumerGroupSession)(nil)
- func (t testConsumerGroupSession) Claims() map[string][]int32 {
- panic("implement me")
- }
- func (t testConsumerGroupSession) MemberID() string {
- panic("implement me")
- }
- func (t testConsumerGroupSession) GenerationID() int32 {
- panic("implement me")
- }
- func (t testConsumerGroupSession) MarkOffset(string, int32, int64, string) {
- }
- func (t testConsumerGroupSession) ResetOffset(string, int32, int64, string) {
- panic("implement me")
- }
- func (t testConsumerGroupSession) MarkMessage(*sarama.ConsumerMessage, string) {}
- func (t testConsumerGroupSession) Context() context.Context {
- return t.ctx
- }
- type testConsumerGroup struct {
- once sync.Once
- err error
- }
- var _ sarama.ConsumerGroup = (*testConsumerGroup)(nil)
- func (t *testConsumerGroup) Consume(ctx context.Context, _ []string, handler sarama.ConsumerGroupHandler) error {
- t.once.Do(func() {
- _ = handler.Setup(testConsumerGroupSession{ctx: ctx})
- })
- return t.err
- }
- func (t *testConsumerGroup) Errors() <-chan error {
- panic("implement me")
- }
- func (t *testConsumerGroup) Close() error {
- return nil
- }
- func (t *testConsumerGroup) Pause(_ map[string][]int32) {
- panic("implement me")
- }
- func (t *testConsumerGroup) PauseAll() {
- panic("implement me")
- }
- func (t *testConsumerGroup) Resume(_ map[string][]int32) {
- panic("implement me")
- }
- func (t *testConsumerGroup) ResumeAll() {
- panic("implement me")
- }
|