123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver"
- import (
- "context"
- "fmt"
- "regexp"
- "time"
- "github.com/IBM/sarama"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/scraperhelper"
- "go.uber.org/multierr"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
- )
- type consumerScraper struct {
- client sarama.Client
- settings receiver.CreateSettings
- groupFilter *regexp.Regexp
- topicFilter *regexp.Regexp
- clusterAdmin sarama.ClusterAdmin
- saramaConfig *sarama.Config
- config Config
- mb *metadata.MetricsBuilder
- }
- func (s *consumerScraper) Name() string {
- return consumersScraperName
- }
- func (s *consumerScraper) start(_ context.Context, _ component.Host) error {
- s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings)
- return nil
- }
- func (s *consumerScraper) shutdown(_ context.Context) error {
- if s.client != nil && !s.client.Closed() {
- return s.client.Close()
- }
- return nil
- }
- func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
- if s.client == nil {
- client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
- if err != nil {
- return pmetric.Metrics{}, fmt.Errorf("failed to create client in consumer scraper: %w", err)
- }
- clusterAdmin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig)
- if err != nil {
- if client != nil {
- _ = client.Close()
- }
- return pmetric.Metrics{}, fmt.Errorf("failed to create cluster admin in consumer scraper: %w", err)
- }
- s.client = client
- s.clusterAdmin = clusterAdmin
- }
- cgs, listErr := s.clusterAdmin.ListConsumerGroups()
- if listErr != nil {
- return pmetric.Metrics{}, listErr
- }
- var matchedGrpIds []string
- for grpID := range cgs {
- if s.groupFilter.MatchString(grpID) {
- matchedGrpIds = append(matchedGrpIds, grpID)
- }
- }
- allTopics, listErr := s.clusterAdmin.ListTopics()
- if listErr != nil {
- return pmetric.Metrics{}, listErr
- }
- matchedTopics := map[string]sarama.TopicDetail{}
- for t, d := range allTopics {
- if s.topicFilter.MatchString(t) {
- matchedTopics[t] = d
- }
- }
- var scrapeError error
- // partitionIds in matchedTopics
- topicPartitions := map[string][]int32{}
- // currentOffset for each partition in matchedTopics
- topicPartitionOffset := map[string]map[int32]int64{}
- for topic := range matchedTopics {
- topicPartitionOffset[topic] = map[int32]int64{}
- partitions, err := s.client.Partitions(topic)
- if err != nil {
- scrapeError = multierr.Append(scrapeError, err)
- continue
- }
- for _, p := range partitions {
- var offset int64
- offset, err = s.client.GetOffset(topic, p, sarama.OffsetNewest)
- if err != nil {
- scrapeError = multierr.Append(scrapeError, err)
- continue
- }
- topicPartitions[topic] = append(topicPartitions[topic], p)
- topicPartitionOffset[topic][p] = offset
- }
- }
- consumerGroups, listErr := s.clusterAdmin.DescribeConsumerGroups(matchedGrpIds)
- if listErr != nil {
- return pmetric.Metrics{}, listErr
- }
- now := pcommon.NewTimestampFromTime(time.Now())
- for _, group := range consumerGroups {
- s.mb.RecordKafkaConsumerGroupMembersDataPoint(now, int64(len(group.Members)), group.GroupId)
- groupOffsetFetchResponse, err := s.clusterAdmin.ListConsumerGroupOffsets(group.GroupId, topicPartitions)
- if err != nil {
- scrapeError = multierr.Append(scrapeError, err)
- continue
- }
- for topic, partitions := range groupOffsetFetchResponse.Blocks {
- // tracking matchedTopics consumed by this group
- // by checking if any of the blocks has an offset
- isConsumed := false
- for _, block := range partitions {
- if block.Offset != -1 {
- isConsumed = true
- break
- }
- }
- if isConsumed {
- var lagSum int64
- var offsetSum int64
- for partition, block := range partitions {
- consumerOffset := block.Offset
- offsetSum += consumerOffset
- s.mb.RecordKafkaConsumerGroupOffsetDataPoint(now, offsetSum, group.GroupId, topic, int64(partition))
- // default -1 to indicate no lag measured.
- var consumerLag int64 = -1
- if partitionOffset, ok := topicPartitionOffset[topic][partition]; ok {
- // only consider partitions with an offset
- if block.Offset != -1 {
- consumerLag = partitionOffset - consumerOffset
- lagSum += consumerLag
- }
- }
- s.mb.RecordKafkaConsumerGroupLagDataPoint(now, consumerLag, group.GroupId, topic, int64(partition))
- }
- s.mb.RecordKafkaConsumerGroupOffsetSumDataPoint(now, offsetSum, group.GroupId, topic)
- s.mb.RecordKafkaConsumerGroupLagSumDataPoint(now, lagSum, group.GroupId, topic)
- }
- }
- }
- return s.mb.Emit(), scrapeError
- }
- func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
- settings receiver.CreateSettings) (scraperhelper.Scraper, error) {
- groupFilter, err := regexp.Compile(cfg.GroupMatch)
- if err != nil {
- return nil, fmt.Errorf("failed to compile group_match: %w", err)
- }
- topicFilter, err := regexp.Compile(cfg.TopicMatch)
- if err != nil {
- return nil, fmt.Errorf("failed to compile topic filter: %w", err)
- }
- s := consumerScraper{
- settings: settings,
- groupFilter: groupFilter,
- topicFilter: topicFilter,
- config: cfg,
- saramaConfig: saramaConfig,
- }
- return scraperhelper.NewScraper(
- s.Name(),
- s.scrape,
- scraperhelper.WithStart(s.start),
- scraperhelper.WithShutdown(s.shutdown),
- )
- }
|