123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package kafkaexporter
- import (
- "context"
- "errors"
- "net"
- "testing"
- "github.com/IBM/sarama"
- "github.com/stretchr/testify/assert"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/exporter/exportertest"
- "go.opentelemetry.io/collector/pdata/plog"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/pdata/ptrace"
- )
- // data is a simple means of allowing
- // interchangeability between the
- // different marshaller types
- type data interface {
- ptrace.Traces | plog.Logs | pmetric.Metrics
- }
- type mockMarshaler[Data data] struct {
- consume func(d Data, topic string) ([]*sarama.ProducerMessage, error)
- encoding string
- }
- func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding }
- func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) {
- if mm.consume != nil {
- return mm.consume(d, topic)
- }
- return nil, errors.New("not implemented")
- }
- func newMockMarshaler[Data data](encoding string) *mockMarshaler[Data] {
- return &mockMarshaler[Data]{encoding: encoding}
- }
- // applyConfigOption is used to modify values of the
- // the default exporter config to make it easier to
- // use the return in a test table set up
- func applyConfigOption(option func(conf *Config)) *Config {
- conf := createDefaultConfig().(*Config)
- option(conf)
- return conf
- }
- func TestCreateDefaultConfig(t *testing.T) {
- cfg := createDefaultConfig().(*Config)
- assert.NotNil(t, cfg, "failed to create default config")
- assert.NoError(t, componenttest.CheckConfigStruct(cfg))
- assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
- assert.Equal(t, "", cfg.Topic)
- }
- func TestCreateMetricExporter(t *testing.T) {
- t.Parallel()
- tests := []struct {
- name string
- conf *Config
- marshalers []MetricsMarshaler
- err error
- }{
- {
- name: "valid config (no validating broker)",
- conf: applyConfigOption(func(conf *Config) {
- // this disables contacting the broker so
- // we can successfully create the exporter
- conf.Metadata.Full = false
- conf.Brokers = []string{"invalid:9092"}
- conf.ProtocolVersion = "2.0.0"
- }),
- err: nil,
- },
- {
- name: "invalid config (validating broker)",
- conf: applyConfigOption(func(conf *Config) {
- conf.Brokers = []string{"invalid:9092"}
- conf.ProtocolVersion = "2.0.0"
- }),
- err: &net.DNSError{},
- },
- {
- name: "default_encoding",
- conf: applyConfigOption(func(conf *Config) {
- // Disabling broker check to ensure encoding work
- conf.Metadata.Full = false
- conf.Encoding = defaultEncoding
- }),
- marshalers: nil,
- err: nil,
- },
- {
- name: "custom_encoding",
- conf: applyConfigOption(func(conf *Config) {
- // Disabling broker check to ensure encoding work
- conf.Metadata.Full = false
- conf.Encoding = "custom"
- }),
- marshalers: []MetricsMarshaler{
- newMockMarshaler[pmetric.Metrics]("custom"),
- },
- err: nil,
- },
- }
- for _, tc := range tests {
- tc := tc
- t.Run(tc.name, func(t *testing.T) {
- t.Parallel()
- f := NewFactory(withMetricsMarshalers(tc.marshalers...))
- exporter, err := f.CreateMetricsExporter(
- context.Background(),
- exportertest.NewNopCreateSettings(),
- tc.conf,
- )
- if tc.err != nil {
- assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
- assert.Nil(t, exporter, "Must return nil value for invalid exporter")
- return
- }
- assert.NoError(t, err, "Must not error")
- assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
- })
- }
- }
- func TestCreateLogExporter(t *testing.T) {
- t.Parallel()
- tests := []struct {
- name string
- conf *Config
- marshalers []LogsMarshaler
- err error
- }{
- {
- name: "valid config (no validating broker)",
- conf: applyConfigOption(func(conf *Config) {
- // this disables contacting the broker so
- // we can successfully create the exporter
- conf.Metadata.Full = false
- conf.Brokers = []string{"invalid:9092"}
- conf.ProtocolVersion = "2.0.0"
- }),
- err: nil,
- },
- {
- name: "invalid config (validating broker)",
- conf: applyConfigOption(func(conf *Config) {
- conf.Brokers = []string{"invalid:9092"}
- conf.ProtocolVersion = "2.0.0"
- }),
- err: &net.DNSError{},
- },
- {
- name: "default_encoding",
- conf: applyConfigOption(func(conf *Config) {
- // Disabling broker check to ensure encoding work
- conf.Metadata.Full = false
- conf.Encoding = defaultEncoding
- }),
- marshalers: nil,
- err: nil,
- },
- {
- name: "custom_encoding",
- conf: applyConfigOption(func(conf *Config) {
- // Disabling broker check to ensure encoding work
- conf.Metadata.Full = false
- conf.Encoding = "custom"
- }),
- marshalers: []LogsMarshaler{
- newMockMarshaler[plog.Logs]("custom"),
- },
- err: nil,
- },
- }
- for _, tc := range tests {
- tc := tc
- t.Run(tc.name, func(t *testing.T) {
- t.Parallel()
- f := NewFactory(withLogsMarshalers(tc.marshalers...))
- exporter, err := f.CreateLogsExporter(
- context.Background(),
- exportertest.NewNopCreateSettings(),
- tc.conf,
- )
- if tc.err != nil {
- assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
- assert.Nil(t, exporter, "Must return nil value for invalid exporter")
- return
- }
- assert.NoError(t, err, "Must not error")
- assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
- })
- }
- }
- func TestCreateTraceExporter(t *testing.T) {
- t.Parallel()
- tests := []struct {
- name string
- conf *Config
- marshalers []TracesMarshaler
- err error
- }{
- {
- name: "valid config (no validating brokers)",
- conf: applyConfigOption(func(conf *Config) {
- conf.Metadata.Full = false
- conf.Brokers = []string{"invalid:9092"}
- conf.ProtocolVersion = "2.0.0"
- }),
- marshalers: nil,
- err: nil,
- },
- {
- name: "invalid config (validating brokers)",
- conf: applyConfigOption(func(conf *Config) {
- conf.Brokers = []string{"invalid:9092"}
- conf.ProtocolVersion = "2.0.0"
- }),
- marshalers: nil,
- err: &net.DNSError{},
- },
- {
- name: "default_encoding",
- conf: applyConfigOption(func(conf *Config) {
- // Disabling broker check to ensure encoding work
- conf.Metadata.Full = false
- conf.Encoding = defaultEncoding
- }),
- marshalers: nil,
- err: nil,
- },
- {
- name: "custom_encoding",
- conf: applyConfigOption(func(conf *Config) {
- // Disabling broker check to ensure encoding work
- conf.Metadata.Full = false
- conf.Encoding = "custom"
- }),
- marshalers: []TracesMarshaler{
- newMockMarshaler[ptrace.Traces]("custom"),
- },
- err: nil,
- },
- }
- for _, tc := range tests {
- tc := tc
- t.Run(tc.name, func(t *testing.T) {
- t.Parallel()
- f := NewFactory(withTracesMarshalers(tc.marshalers...))
- exporter, err := f.CreateTracesExporter(
- context.Background(),
- exportertest.NewNopCreateSettings(),
- tc.conf,
- )
- if tc.err != nil {
- assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
- assert.Nil(t, exporter, "Must return nil value for invalid exporter")
- return
- }
- assert.NoError(t, err, "Must not error")
- assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
- })
- }
- }
|