consumer_scraper.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver"
  4. import (
  5. "context"
  6. "fmt"
  7. "regexp"
  8. "time"
  9. "github.com/IBM/sarama"
  10. "go.opentelemetry.io/collector/component"
  11. "go.opentelemetry.io/collector/pdata/pcommon"
  12. "go.opentelemetry.io/collector/pdata/pmetric"
  13. "go.opentelemetry.io/collector/receiver"
  14. "go.opentelemetry.io/collector/receiver/scraperhelper"
  15. "go.uber.org/multierr"
  16. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
  17. )
  18. type consumerScraper struct {
  19. client sarama.Client
  20. settings receiver.CreateSettings
  21. groupFilter *regexp.Regexp
  22. topicFilter *regexp.Regexp
  23. clusterAdmin sarama.ClusterAdmin
  24. saramaConfig *sarama.Config
  25. config Config
  26. mb *metadata.MetricsBuilder
  27. }
  28. func (s *consumerScraper) Name() string {
  29. return consumersScraperName
  30. }
  31. func (s *consumerScraper) start(_ context.Context, _ component.Host) error {
  32. s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings)
  33. return nil
  34. }
  35. func (s *consumerScraper) shutdown(_ context.Context) error {
  36. if s.client != nil && !s.client.Closed() {
  37. return s.client.Close()
  38. }
  39. return nil
  40. }
  41. func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
  42. if s.client == nil {
  43. client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
  44. if err != nil {
  45. return pmetric.Metrics{}, fmt.Errorf("failed to create client in consumer scraper: %w", err)
  46. }
  47. clusterAdmin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig)
  48. if err != nil {
  49. if client != nil {
  50. _ = client.Close()
  51. }
  52. return pmetric.Metrics{}, fmt.Errorf("failed to create cluster admin in consumer scraper: %w", err)
  53. }
  54. s.client = client
  55. s.clusterAdmin = clusterAdmin
  56. }
  57. cgs, listErr := s.clusterAdmin.ListConsumerGroups()
  58. if listErr != nil {
  59. return pmetric.Metrics{}, listErr
  60. }
  61. var matchedGrpIds []string
  62. for grpID := range cgs {
  63. if s.groupFilter.MatchString(grpID) {
  64. matchedGrpIds = append(matchedGrpIds, grpID)
  65. }
  66. }
  67. allTopics, listErr := s.clusterAdmin.ListTopics()
  68. if listErr != nil {
  69. return pmetric.Metrics{}, listErr
  70. }
  71. matchedTopics := map[string]sarama.TopicDetail{}
  72. for t, d := range allTopics {
  73. if s.topicFilter.MatchString(t) {
  74. matchedTopics[t] = d
  75. }
  76. }
  77. var scrapeError error
  78. // partitionIds in matchedTopics
  79. topicPartitions := map[string][]int32{}
  80. // currentOffset for each partition in matchedTopics
  81. topicPartitionOffset := map[string]map[int32]int64{}
  82. for topic := range matchedTopics {
  83. topicPartitionOffset[topic] = map[int32]int64{}
  84. partitions, err := s.client.Partitions(topic)
  85. if err != nil {
  86. scrapeError = multierr.Append(scrapeError, err)
  87. continue
  88. }
  89. for _, p := range partitions {
  90. var offset int64
  91. offset, err = s.client.GetOffset(topic, p, sarama.OffsetNewest)
  92. if err != nil {
  93. scrapeError = multierr.Append(scrapeError, err)
  94. continue
  95. }
  96. topicPartitions[topic] = append(topicPartitions[topic], p)
  97. topicPartitionOffset[topic][p] = offset
  98. }
  99. }
  100. consumerGroups, listErr := s.clusterAdmin.DescribeConsumerGroups(matchedGrpIds)
  101. if listErr != nil {
  102. return pmetric.Metrics{}, listErr
  103. }
  104. now := pcommon.NewTimestampFromTime(time.Now())
  105. for _, group := range consumerGroups {
  106. s.mb.RecordKafkaConsumerGroupMembersDataPoint(now, int64(len(group.Members)), group.GroupId)
  107. groupOffsetFetchResponse, err := s.clusterAdmin.ListConsumerGroupOffsets(group.GroupId, topicPartitions)
  108. if err != nil {
  109. scrapeError = multierr.Append(scrapeError, err)
  110. continue
  111. }
  112. for topic, partitions := range groupOffsetFetchResponse.Blocks {
  113. // tracking matchedTopics consumed by this group
  114. // by checking if any of the blocks has an offset
  115. isConsumed := false
  116. for _, block := range partitions {
  117. if block.Offset != -1 {
  118. isConsumed = true
  119. break
  120. }
  121. }
  122. if isConsumed {
  123. var lagSum int64
  124. var offsetSum int64
  125. for partition, block := range partitions {
  126. consumerOffset := block.Offset
  127. offsetSum += consumerOffset
  128. s.mb.RecordKafkaConsumerGroupOffsetDataPoint(now, offsetSum, group.GroupId, topic, int64(partition))
  129. // default -1 to indicate no lag measured.
  130. var consumerLag int64 = -1
  131. if partitionOffset, ok := topicPartitionOffset[topic][partition]; ok {
  132. // only consider partitions with an offset
  133. if block.Offset != -1 {
  134. consumerLag = partitionOffset - consumerOffset
  135. lagSum += consumerLag
  136. }
  137. }
  138. s.mb.RecordKafkaConsumerGroupLagDataPoint(now, consumerLag, group.GroupId, topic, int64(partition))
  139. }
  140. s.mb.RecordKafkaConsumerGroupOffsetSumDataPoint(now, offsetSum, group.GroupId, topic)
  141. s.mb.RecordKafkaConsumerGroupLagSumDataPoint(now, lagSum, group.GroupId, topic)
  142. }
  143. }
  144. }
  145. return s.mb.Emit(), scrapeError
  146. }
  147. func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
  148. settings receiver.CreateSettings) (scraperhelper.Scraper, error) {
  149. groupFilter, err := regexp.Compile(cfg.GroupMatch)
  150. if err != nil {
  151. return nil, fmt.Errorf("failed to compile group_match: %w", err)
  152. }
  153. topicFilter, err := regexp.Compile(cfg.TopicMatch)
  154. if err != nil {
  155. return nil, fmt.Errorf("failed to compile topic filter: %w", err)
  156. }
  157. s := consumerScraper{
  158. settings: settings,
  159. groupFilter: groupFilter,
  160. topicFilter: topicFilter,
  161. config: cfg,
  162. saramaConfig: saramaConfig,
  163. }
  164. return scraperhelper.NewScraper(
  165. s.Name(),
  166. s.scrape,
  167. scraperhelper.WithStart(s.start),
  168. scraperhelper.WithShutdown(s.shutdown),
  169. )
  170. }