broker_scraper.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver"
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. "github.com/IBM/sarama"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/pdata/pcommon"
  11. "go.opentelemetry.io/collector/pdata/pmetric"
  12. "go.opentelemetry.io/collector/receiver"
  13. "go.opentelemetry.io/collector/receiver/scraperhelper"
  14. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
  15. )
  16. type brokerScraper struct {
  17. client sarama.Client
  18. settings receiver.CreateSettings
  19. config Config
  20. saramaConfig *sarama.Config
  21. mb *metadata.MetricsBuilder
  22. }
  23. func (s *brokerScraper) Name() string {
  24. return brokersScraperName
  25. }
  26. func (s *brokerScraper) start(_ context.Context, _ component.Host) error {
  27. s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings)
  28. return nil
  29. }
  30. func (s *brokerScraper) shutdown(context.Context) error {
  31. if s.client != nil && !s.client.Closed() {
  32. return s.client.Close()
  33. }
  34. return nil
  35. }
  36. func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
  37. if s.client == nil {
  38. client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
  39. if err != nil {
  40. return pmetric.Metrics{}, fmt.Errorf("failed to create client in brokers scraper: %w", err)
  41. }
  42. s.client = client
  43. }
  44. brokers := s.client.Brokers()
  45. s.mb.RecordKafkaBrokersDataPoint(pcommon.NewTimestampFromTime(time.Now()), int64(len(brokers)))
  46. return s.mb.Emit(), nil
  47. }
  48. func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
  49. settings receiver.CreateSettings) (scraperhelper.Scraper, error) {
  50. s := brokerScraper{
  51. settings: settings,
  52. config: cfg,
  53. saramaConfig: saramaConfig,
  54. }
  55. return scraperhelper.NewScraper(
  56. s.Name(),
  57. s.scrape,
  58. scraperhelper.WithStart(s.start),
  59. scraperhelper.WithShutdown(s.shutdown),
  60. )
  61. }