scraper.go 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "time"
  9. "go.opentelemetry.io/collector/component"
  10. "go.opentelemetry.io/collector/pdata/pcommon"
  11. "go.opentelemetry.io/collector/pdata/pmetric"
  12. "go.opentelemetry.io/collector/receiver"
  13. "go.opentelemetry.io/collector/receiver/scrapererror"
  14. "go.uber.org/zap"
  15. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/metadata"
  16. )
  17. var (
  18. errClientNotInit = errors.New("client not initialized")
  19. jobmanagerFailedFetch = "Failed to fetch jobmanager metrics"
  20. taskmanagerFailedFetch = "Failed to fetch taskmanager metrics"
  21. jobsFailedFetch = "Failed to fetch jobs metrics"
  22. subtasksFailedFetch = "Failed to fetch subtasks metrics"
  23. )
  24. type flinkmetricsScraper struct {
  25. client client
  26. cfg *Config
  27. settings component.TelemetrySettings
  28. mb *metadata.MetricsBuilder
  29. }
  30. func newflinkScraper(config *Config, settings receiver.CreateSettings) *flinkmetricsScraper {
  31. return &flinkmetricsScraper{
  32. settings: settings.TelemetrySettings,
  33. cfg: config,
  34. mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings),
  35. }
  36. }
  37. func (s *flinkmetricsScraper) start(_ context.Context, host component.Host) error {
  38. httpClient, err := newClient(s.cfg, host, s.settings, s.settings.Logger)
  39. if err != nil {
  40. return fmt.Errorf("create client: %w", err)
  41. }
  42. s.client = httpClient
  43. return nil
  44. }
  45. func (s *flinkmetricsScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
  46. // Validate we don't attempt to scrape without initializing the client
  47. if s.client == nil {
  48. return pmetric.NewMetrics(), errClientNotInit
  49. }
  50. now := pcommon.NewTimestampFromTime(time.Now())
  51. var scraperErrors scrapererror.ScrapeErrors
  52. jobmanagerMetrics, err := s.client.GetJobmanagerMetrics(ctx)
  53. if err != nil {
  54. s.settings.Logger.Error(jobmanagerFailedFetch, zap.Error(err))
  55. scraperErrors.AddPartial(1, fmt.Errorf("%s %w", jobmanagerFailedFetch, err))
  56. }
  57. taskmanagersMetrics, err := s.client.GetTaskmanagersMetrics(ctx)
  58. if err != nil {
  59. s.settings.Logger.Error(taskmanagerFailedFetch, zap.Error(err))
  60. scraperErrors.AddPartial(1, fmt.Errorf("%s %w", taskmanagerFailedFetch, err))
  61. }
  62. jobsMetrics, err := s.client.GetJobsMetrics(ctx)
  63. if err != nil {
  64. s.settings.Logger.Error(jobsFailedFetch, zap.Error(err))
  65. scraperErrors.AddPartial(1, fmt.Errorf("%s %w", jobsFailedFetch, err))
  66. }
  67. subtasksMetrics, err := s.client.GetSubtasksMetrics(ctx)
  68. if err != nil {
  69. s.settings.Logger.Error(subtasksFailedFetch, zap.Error(err))
  70. scraperErrors.AddPartial(1, fmt.Errorf("%s %w", subtasksFailedFetch, err))
  71. }
  72. s.processJobmanagerMetrics(now, jobmanagerMetrics)
  73. s.processTaskmanagerMetrics(now, taskmanagersMetrics)
  74. s.processJobsMetrics(now, jobsMetrics)
  75. s.processSubtaskMetrics(now, subtasksMetrics)
  76. return s.mb.Emit(), scraperErrors.Combine()
  77. }