topic_scraper_test.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkametricsreceiver
  4. import (
  5. "context"
  6. "fmt"
  7. "regexp"
  8. "testing"
  9. "github.com/IBM/sarama"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. "go.opentelemetry.io/collector/component/componenttest"
  13. "go.opentelemetry.io/collector/receiver/receivertest"
  14. )
  15. func TestTopicShutdown(t *testing.T) {
  16. client := newMockClient()
  17. client.closed = false
  18. client.close = nil
  19. client.Mock.
  20. On("Close").Return(nil).
  21. On("Closed").Return(false)
  22. scraper := brokerScraper{
  23. client: client,
  24. settings: receivertest.NewNopCreateSettings(),
  25. config: Config{},
  26. }
  27. _ = scraper.shutdown(context.Background())
  28. client.AssertExpectations(t)
  29. }
  30. func TestTopicShutdown_closed(t *testing.T) {
  31. client := newMockClient()
  32. client.closed = true
  33. client.Mock.
  34. On("Closed").Return(true)
  35. scraper := topicScraper{
  36. client: client,
  37. settings: receivertest.NewNopCreateSettings(),
  38. config: Config{},
  39. }
  40. _ = scraper.shutdown(context.Background())
  41. client.AssertExpectations(t)
  42. }
  43. func TestTopicScraper_Name(t *testing.T) {
  44. s := topicScraper{}
  45. assert.Equal(t, s.Name(), topicsScraperName)
  46. }
  47. func TestTopicScraper_createsScraper(t *testing.T) {
  48. sc := sarama.NewConfig()
  49. newSaramaClient = mockNewSaramaClient
  50. ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopCreateSettings())
  51. assert.NoError(t, err)
  52. assert.NotNil(t, ms)
  53. }
  54. func TestTopicScraper_ScrapeHandlesError(t *testing.T) {
  55. newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
  56. return nil, fmt.Errorf("no scraper here")
  57. }
  58. sc := sarama.NewConfig()
  59. ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopCreateSettings())
  60. assert.NotNil(t, ms)
  61. assert.Nil(t, err)
  62. _, err = ms.Scrape(context.Background())
  63. assert.Error(t, err)
  64. }
  65. func TestTopicScraper_ShutdownHandlesNilClient(t *testing.T) {
  66. newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
  67. return nil, fmt.Errorf("no scraper here")
  68. }
  69. sc := sarama.NewConfig()
  70. ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopCreateSettings())
  71. assert.NotNil(t, ms)
  72. assert.Nil(t, err)
  73. err = ms.Shutdown(context.Background())
  74. assert.NoError(t, err)
  75. }
  76. func TestTopicScraper_startScraperCreatesClient(t *testing.T) {
  77. newSaramaClient = mockNewSaramaClient
  78. sc := sarama.NewConfig()
  79. ms, err := createTopicsScraper(context.Background(), Config{}, sc, receivertest.NewNopCreateSettings())
  80. assert.NotNil(t, ms)
  81. assert.NoError(t, err)
  82. err = ms.Start(context.Background(), nil)
  83. assert.NoError(t, err)
  84. }
  85. func TestTopicScraper_createScraperHandles_invalid_topicMatch(t *testing.T) {
  86. newSaramaClient = mockNewSaramaClient
  87. sc := sarama.NewConfig()
  88. ms, err := createTopicsScraper(context.Background(), Config{
  89. TopicMatch: "[",
  90. }, sc, receivertest.NewNopCreateSettings())
  91. assert.Error(t, err)
  92. assert.Nil(t, ms)
  93. }
  94. func TestTopicScraper_scrapes(t *testing.T) {
  95. client := newMockClient()
  96. var testOffset int64 = 5
  97. client.offset = testOffset
  98. config := createDefaultConfig().(*Config)
  99. match := regexp.MustCompile(config.TopicMatch)
  100. scraper := topicScraper{
  101. client: client,
  102. settings: receivertest.NewNopCreateSettings(),
  103. config: *config,
  104. topicFilter: match,
  105. }
  106. require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost()))
  107. md, err := scraper.scrape(context.Background())
  108. assert.NoError(t, err)
  109. require.Equal(t, 1, md.ResourceMetrics().Len())
  110. require.Equal(t, 1, md.ResourceMetrics().At(0).ScopeMetrics().Len())
  111. ms := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
  112. for i := 0; i < ms.Len(); i++ {
  113. m := ms.At(i)
  114. switch m.Name() {
  115. case "kafka.topic.partitions":
  116. assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testPartitions)))
  117. case "kafka.partition.current_offset":
  118. assert.Equal(t, m.Gauge().DataPoints().At(0).IntValue(), testOffset)
  119. case "kafka.partition.oldest_offset":
  120. assert.Equal(t, m.Gauge().DataPoints().At(0).IntValue(), testOffset)
  121. case "kafka.partition.replicas":
  122. assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testReplicas)))
  123. case "kafka.partition.replicas_in_sync":
  124. assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testReplicas)))
  125. }
  126. }
  127. }
  128. func TestTopicScraper_scrape_handlesTopicError(t *testing.T) {
  129. client := newMockClient()
  130. client.topics = nil
  131. config := createDefaultConfig().(*Config)
  132. match := regexp.MustCompile(config.TopicMatch)
  133. scraper := topicScraper{
  134. client: client,
  135. settings: receivertest.NewNopCreateSettings(),
  136. topicFilter: match,
  137. }
  138. _, err := scraper.scrape(context.Background())
  139. assert.Error(t, err)
  140. }
  141. func TestTopicScraper_scrape_handlesPartitionError(t *testing.T) {
  142. client := newMockClient()
  143. client.partitions = nil
  144. config := createDefaultConfig().(*Config)
  145. match := regexp.MustCompile(config.TopicMatch)
  146. scraper := topicScraper{
  147. client: client,
  148. settings: receivertest.NewNopCreateSettings(),
  149. topicFilter: match,
  150. }
  151. require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost()))
  152. _, err := scraper.scrape(context.Background())
  153. assert.Error(t, err)
  154. }
  155. func TestTopicScraper_scrape_handlesPartialScrapeErrors(t *testing.T) {
  156. client := newMockClient()
  157. client.replicas = nil
  158. client.inSyncReplicas = nil
  159. client.replicas = nil
  160. client.offset = -1
  161. config := createDefaultConfig().(*Config)
  162. match := regexp.MustCompile(config.TopicMatch)
  163. scraper := topicScraper{
  164. client: client,
  165. settings: receivertest.NewNopCreateSettings(),
  166. topicFilter: match,
  167. }
  168. require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost()))
  169. _, err := scraper.scrape(context.Background())
  170. assert.Error(t, err)
  171. }