1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
- import (
- "context"
- "errors"
- "fmt"
- "time"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/pdata/pcommon"
- "go.opentelemetry.io/collector/pdata/pmetric"
- "go.opentelemetry.io/collector/receiver"
- "go.opentelemetry.io/collector/receiver/scrapererror"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/metadata"
- )
- var (
- errClientNotInit = errors.New("client not initialized")
- jobmanagerFailedFetch = "Failed to fetch jobmanager metrics"
- taskmanagerFailedFetch = "Failed to fetch taskmanager metrics"
- jobsFailedFetch = "Failed to fetch jobs metrics"
- subtasksFailedFetch = "Failed to fetch subtasks metrics"
- )
- type flinkmetricsScraper struct {
- client client
- cfg *Config
- settings component.TelemetrySettings
- mb *metadata.MetricsBuilder
- }
- func newflinkScraper(config *Config, settings receiver.CreateSettings) *flinkmetricsScraper {
- return &flinkmetricsScraper{
- settings: settings.TelemetrySettings,
- cfg: config,
- mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings),
- }
- }
- func (s *flinkmetricsScraper) start(_ context.Context, host component.Host) error {
- httpClient, err := newClient(s.cfg, host, s.settings, s.settings.Logger)
- if err != nil {
- return fmt.Errorf("create client: %w", err)
- }
- s.client = httpClient
- return nil
- }
- func (s *flinkmetricsScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
- // Validate we don't attempt to scrape without initializing the client
- if s.client == nil {
- return pmetric.NewMetrics(), errClientNotInit
- }
- now := pcommon.NewTimestampFromTime(time.Now())
- var scraperErrors scrapererror.ScrapeErrors
- jobmanagerMetrics, err := s.client.GetJobmanagerMetrics(ctx)
- if err != nil {
- s.settings.Logger.Error(jobmanagerFailedFetch, zap.Error(err))
- scraperErrors.AddPartial(1, fmt.Errorf("%s %w", jobmanagerFailedFetch, err))
- }
- taskmanagersMetrics, err := s.client.GetTaskmanagersMetrics(ctx)
- if err != nil {
- s.settings.Logger.Error(taskmanagerFailedFetch, zap.Error(err))
- scraperErrors.AddPartial(1, fmt.Errorf("%s %w", taskmanagerFailedFetch, err))
- }
- jobsMetrics, err := s.client.GetJobsMetrics(ctx)
- if err != nil {
- s.settings.Logger.Error(jobsFailedFetch, zap.Error(err))
- scraperErrors.AddPartial(1, fmt.Errorf("%s %w", jobsFailedFetch, err))
- }
- subtasksMetrics, err := s.client.GetSubtasksMetrics(ctx)
- if err != nil {
- s.settings.Logger.Error(subtasksFailedFetch, zap.Error(err))
- scraperErrors.AddPartial(1, fmt.Errorf("%s %w", subtasksFailedFetch, err))
- }
- s.processJobmanagerMetrics(now, jobmanagerMetrics)
- s.processTaskmanagerMetrics(now, taskmanagersMetrics)
- s.processJobsMetrics(now, jobsMetrics)
- s.processSubtaskMetrics(now, subtasksMetrics)
- return s.mb.Emit(), scraperErrors.Combine()
- }
|