123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package kafkaexporter
- import (
- "context"
- "fmt"
- "testing"
- "github.com/IBM/sarama"
- "github.com/IBM/sarama/mocks"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/config/configtls"
- "go.opentelemetry.io/collector/exporter/exportertest"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/pdata/ptrace"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
- "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
- )
- func TestNewExporter_err_version(t *testing.T) {
- c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
- texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
- assert.Error(t, err)
- assert.Nil(t, texp)
- }
- func TestNewExporter_err_encoding(t *testing.T) {
- c := Config{Encoding: "foo"}
- texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- assert.Nil(t, texp)
- }
- func TestNewMetricsExporter_err_version(t *testing.T) {
- c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
- mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
- assert.Error(t, err)
- assert.Nil(t, mexp)
- }
- func TestNewMetricsExporter_err_encoding(t *testing.T) {
- c := Config{Encoding: "bar"}
- mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- assert.Nil(t, mexp)
- }
- func TestNewMetricsExporter_err_traces_encoding(t *testing.T) {
- c := Config{Encoding: "jaeger_proto"}
- mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- assert.Nil(t, mexp)
- }
- func TestNewLogsExporter_err_version(t *testing.T) {
- c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
- mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
- assert.Error(t, err)
- assert.Nil(t, mexp)
- }
- func TestNewLogsExporter_err_encoding(t *testing.T) {
- c := Config{Encoding: "bar"}
- mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- assert.Nil(t, mexp)
- }
- func TestNewLogsExporter_err_traces_encoding(t *testing.T) {
- c := Config{Encoding: "jaeger_proto"}
- mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
- assert.Nil(t, mexp)
- }
- func TestNewExporter_err_auth_type(t *testing.T) {
- c := Config{
- ProtocolVersion: "2.0.0",
- Authentication: kafka.Authentication{
- TLS: &configtls.TLSClientSetting{
- TLSSetting: configtls.TLSSetting{
- CAFile: "/doesnotexist",
- },
- },
- },
- Encoding: defaultEncoding,
- Metadata: Metadata{
- Full: false,
- },
- Producer: Producer{
- Compression: "none",
- },
- }
- texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, texp)
- mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, mexp)
- lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failed to load TLS config")
- assert.Nil(t, lexp)
- }
- func TestNewExporter_err_compression(t *testing.T) {
- c := Config{
- Encoding: defaultEncoding,
- Producer: Producer{
- Compression: "idk",
- },
- }
- texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value idk")
- assert.Nil(t, texp)
- }
- func TestTracesPusher(t *testing.T) {
- c := sarama.NewConfig()
- producer := mocks.NewSyncProducer(t, c)
- producer.ExpectSendMessageAndSucceed()
- p := kafkaTracesProducer{
- producer: producer,
- marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding),
- }
- t.Cleanup(func() {
- require.NoError(t, p.Close(context.Background()))
- })
- err := p.tracesPusher(context.Background(), testdata.GenerateTracesTwoSpansSameResource())
- require.NoError(t, err)
- }
- func TestTracesPusher_err(t *testing.T) {
- c := sarama.NewConfig()
- producer := mocks.NewSyncProducer(t, c)
- expErr := fmt.Errorf("failed to send")
- producer.ExpectSendMessageAndFail(expErr)
- p := kafkaTracesProducer{
- producer: producer,
- marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- }
- t.Cleanup(func() {
- require.NoError(t, p.Close(context.Background()))
- })
- td := testdata.GenerateTracesTwoSpansSameResource()
- err := p.tracesPusher(context.Background(), td)
- assert.EqualError(t, err, expErr.Error())
- }
- func TestTracesPusher_marshal_error(t *testing.T) {
- expErr := fmt.Errorf("failed to marshal")
- p := kafkaTracesProducer{
- marshaler: &tracesErrorMarshaler{err: expErr},
- logger: zap.NewNop(),
- }
- td := testdata.GenerateTracesTwoSpansSameResource()
- err := p.tracesPusher(context.Background(), td)
- require.Error(t, err)
- assert.Contains(t, err.Error(), expErr.Error())
- }
- func TestMetricsDataPusher(t *testing.T) {
- c := sarama.NewConfig()
- producer := mocks.NewSyncProducer(t, c)
- producer.ExpectSendMessageAndSucceed()
- p := kafkaMetricsProducer{
- producer: producer,
- marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding),
- }
- t.Cleanup(func() {
- require.NoError(t, p.Close(context.Background()))
- })
- err := p.metricsDataPusher(context.Background(), testdata.GenerateMetricsTwoMetrics())
- require.NoError(t, err)
- }
- func TestMetricsDataPusher_err(t *testing.T) {
- c := sarama.NewConfig()
- producer := mocks.NewSyncProducer(t, c)
- expErr := fmt.Errorf("failed to send")
- producer.ExpectSendMessageAndFail(expErr)
- p := kafkaMetricsProducer{
- producer: producer,
- marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- }
- t.Cleanup(func() {
- require.NoError(t, p.Close(context.Background()))
- })
- md := testdata.GenerateMetricsTwoMetrics()
- err := p.metricsDataPusher(context.Background(), md)
- assert.EqualError(t, err, expErr.Error())
- }
- func TestMetricsDataPusher_marshal_error(t *testing.T) {
- expErr := fmt.Errorf("failed to marshal")
- p := kafkaMetricsProducer{
- marshaler: &metricsErrorMarshaler{err: expErr},
- logger: zap.NewNop(),
- }
- md := testdata.GenerateMetricsTwoMetrics()
- err := p.metricsDataPusher(context.Background(), md)
- require.Error(t, err)
- assert.Contains(t, err.Error(), expErr.Error())
- }
- func TestLogsDataPusher(t *testing.T) {
- c := sarama.NewConfig()
- producer := mocks.NewSyncProducer(t, c)
- producer.ExpectSendMessageAndSucceed()
- p := kafkaLogsProducer{
- producer: producer,
- marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding),
- }
- t.Cleanup(func() {
- require.NoError(t, p.Close(context.Background()))
- })
- err := p.logsDataPusher(context.Background(), testdata.GenerateLogsOneLogRecord())
- require.NoError(t, err)
- }
- func TestLogsDataPusher_err(t *testing.T) {
- c := sarama.NewConfig()
- producer := mocks.NewSyncProducer(t, c)
- expErr := fmt.Errorf("failed to send")
- producer.ExpectSendMessageAndFail(expErr)
- p := kafkaLogsProducer{
- producer: producer,
- marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding),
- logger: zap.NewNop(),
- }
- t.Cleanup(func() {
- require.NoError(t, p.Close(context.Background()))
- })
- ld := testdata.GenerateLogsOneLogRecord()
- err := p.logsDataPusher(context.Background(), ld)
- assert.EqualError(t, err, expErr.Error())
- }
- func TestLogsDataPusher_marshal_error(t *testing.T) {
- expErr := fmt.Errorf("failed to marshal")
- p := kafkaLogsProducer{
- marshaler: &logsErrorMarshaler{err: expErr},
- logger: zap.NewNop(),
- }
- ld := testdata.GenerateLogsOneLogRecord()
- err := p.logsDataPusher(context.Background(), ld)
- require.Error(t, err)
- assert.Contains(t, err.Error(), expErr.Error())
- }
- type tracesErrorMarshaler struct {
- err error
- }
- type metricsErrorMarshaler struct {
- err error
- }
- type logsErrorMarshaler struct {
- err error
- }
- func (e metricsErrorMarshaler) Marshal(_ pmetric.Metrics, _ string) ([]*sarama.ProducerMessage, error) {
- return nil, e.err
- }
- func (e metricsErrorMarshaler) Encoding() string {
- panic("implement me")
- }
- var _ TracesMarshaler = (*tracesErrorMarshaler)(nil)
- func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*sarama.ProducerMessage, error) {
- return nil, e.err
- }
- func (e tracesErrorMarshaler) Encoding() string {
- panic("implement me")
- }
- func (e logsErrorMarshaler) Marshal(_ plog.Logs, _ string) ([]*sarama.ProducerMessage, error) {
- return nil, e.err
- }
- func (e logsErrorMarshaler) Encoding() string {
- panic("implement me")
- }
|