123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package kafkametricsreceiver
- import (
- "context"
- "fmt"
- "regexp"
- "testing"
- "github.com/IBM/sarama"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/receiver/receivertest"
- )
- func TestTopicShutdown(t *testing.T) {
- client := newMockClient()
- client.closed = false
- client.close = nil
- client.Mock.
- On("Close").Return(nil).
- On("Closed").Return(false)
- scraper := brokerScraper{
- client: client,
- settings: receivertest.NewNopCreateSettings(),
- config: Config{},
- }
- _ = scraper.shutdown(context.Background())
- client.AssertExpectations(t)
- }
- func TestTopicShutdown_closed(t *testing.T) {
- client := newMockClient()
- client.closed = true
- client.Mock.
- On("Closed").Return(true)
- scraper := topicScraper{
- client: client,
- settings: receivertest.NewNopCreateSettings(),
- config: Config{},
- }
- _ = scraper.shutdown(context.Background())
- client.AssertExpectations(t)
- }
- func TestTopicScraper_Name(t *testing.T) {
- s := topicScraper{}
- assert.Equal(t, s.Name(), topicsScraperName)
- }
- func TestTopicScraper_createsScraper(t *testing.T) {
- sc := sarama.NewConfig()
- newSaramaClient = mockNewSaramaClient
- ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopCreateSettings())
- assert.NoError(t, err)
- assert.NotNil(t, ms)
- }
- func TestTopicScraper_ScrapeHandlesError(t *testing.T) {
- newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
- return nil, fmt.Errorf("no scraper here")
- }
- sc := sarama.NewConfig()
- ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopCreateSettings())
- assert.NotNil(t, ms)
- assert.Nil(t, err)
- _, err = ms.Scrape(context.Background())
- assert.Error(t, err)
- }
- func TestTopicScraper_ShutdownHandlesNilClient(t *testing.T) {
- newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
- return nil, fmt.Errorf("no scraper here")
- }
- sc := sarama.NewConfig()
- ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopCreateSettings())
- assert.NotNil(t, ms)
- assert.Nil(t, err)
- err = ms.Shutdown(context.Background())
- assert.NoError(t, err)
- }
- func TestTopicScraper_startScraperCreatesClient(t *testing.T) {
- newSaramaClient = mockNewSaramaClient
- sc := sarama.NewConfig()
- ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopCreateSettings())
- assert.NotNil(t, ms)
- assert.NoError(t, err)
- err = ms.Start(context.Background(), nil)
- assert.NoError(t, err)
- }
- func TestTopicScraper_createScraperHandles_invalid_topicMatch(t *testing.T) {
- newSaramaClient = mockNewSaramaClient
- sc := sarama.NewConfig()
- ms, err := createTopicsScraper(context.Background(), Config{
- TopicMatch: "[",
- }, sc, receivertest.NewNopCreateSettings())
- assert.Error(t, err)
- assert.Nil(t, ms)
- }
- func TestTopicScraper_scrapes(t *testing.T) {
- client := newMockClient()
- var testOffset int64 = 5
- client.offset = testOffset
- config := createDefaultConfig().(*Config)
- match := regexp.MustCompile(config.TopicMatch)
- scraper := topicScraper{
- client: client,
- settings: receivertest.NewNopCreateSettings(),
- config: *config,
- topicFilter: match,
- }
- require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost()))
- md, err := scraper.scrape(context.Background())
- assert.NoError(t, err)
- require.Equal(t, 1, md.ResourceMetrics().Len())
- require.Equal(t, 1, md.ResourceMetrics().At(0).ScopeMetrics().Len())
- ms := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
- for i := 0; i < ms.Len(); i++ {
- m := ms.At(i)
- switch m.Name() {
- case "kafka.topic.partitions":
- assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testPartitions)))
- case "kafka.partition.current_offset":
- assert.Equal(t, m.Gauge().DataPoints().At(0).IntValue(), testOffset)
- case "kafka.partition.oldest_offset":
- assert.Equal(t, m.Gauge().DataPoints().At(0).IntValue(), testOffset)
- case "kafka.partition.replicas":
- assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testReplicas)))
- case "kafka.partition.replicas_in_sync":
- assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testReplicas)))
- }
- }
- }
- func TestTopicScraper_scrape_handlesTopicError(t *testing.T) {
- client := newMockClient()
- client.topics = nil
- config := createDefaultConfig().(*Config)
- match := regexp.MustCompile(config.TopicMatch)
- scraper := topicScraper{
- client: client,
- settings: receivertest.NewNopCreateSettings(),
- topicFilter: match,
- }
- _, err := scraper.scrape(context.Background())
- assert.Error(t, err)
- }
- func TestTopicScraper_scrape_handlesPartitionError(t *testing.T) {
- client := newMockClient()
- client.partitions = nil
- config := createDefaultConfig().(*Config)
- match := regexp.MustCompile(config.TopicMatch)
- scraper := topicScraper{
- client: client,
- settings: receivertest.NewNopCreateSettings(),
- topicFilter: match,
- }
- require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost()))
- _, err := scraper.scrape(context.Background())
- assert.Error(t, err)
- }
- func TestTopicScraper_scrape_handlesPartialScrapeErrors(t *testing.T) {
- client := newMockClient()
- client.replicas = nil
- client.inSyncReplicas = nil
- client.replicas = nil
- client.offset = -1
- config := createDefaultConfig().(*Config)
- match := regexp.MustCompile(config.TopicMatch)
- scraper := topicScraper{
- client: client,
- settings: receivertest.NewNopCreateSettings(),
- topicFilter: match,
- }
- require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost()))
- _, err := scraper.scrape(context.Background())
- assert.Error(t, err)
- }
|