123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "os"
- "strings"
- "go.opentelemetry.io/collector/component"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/models"
- )
- // The API endpoints required to collect metrics.
- const (
- // jobmanagerMetricEndpoint gets jobmanager metrics.
- jobmanagerMetricEndpoint = "/jobmanager/metrics"
- // taskmanagersEndpoint gets taskmanager IDs.
- taskmanagersEndpoint = "/taskmanagers"
- // taskmanagersMetricEndpoint gets taskmanager using a taskmanager ID.
- taskmanagersMetricEndpoint = "/taskmanagers/%s/metrics"
- // jobsEndpoint gets job IDs.
- jobsEndpoint = "/jobs"
- // jobsOverviewEndpoint gets job IDs with associated Job names.
- jobsOverviewEndpoint = "/jobs/overview"
- // jobsWithIDEndpoint gets vertex IDs using a job ID.
- jobsWithIDEndpoint = "/jobs/%s"
- // jobsMetricEndpoint gets job metrics using a job ID.
- jobsMetricEndpoint = "/jobs/%s/metrics"
- // verticesEndpoint gets subtask index's using a job and vertex ID.
- verticesEndpoint = "/jobs/%s/vertices/%s"
- // subtaskMetricEndpoint gets subtask metrics using a job ID, vertex ID and subtask index.
- subtaskMetricEndpoint = "/jobs/%s/vertices/%s/subtasks/%v/metrics"
- )
- type client interface {
- GetJobmanagerMetrics(ctx context.Context) (*models.JobmanagerMetrics, error)
- GetTaskmanagersMetrics(ctx context.Context) ([]*models.TaskmanagerMetrics, error)
- GetJobsMetrics(ctx context.Context) ([]*models.JobMetrics, error)
- GetSubtasksMetrics(ctx context.Context) ([]*models.SubtaskMetrics, error)
- }
- type flinkClient struct {
- client *http.Client
- hostEndpoint string
- hostName string
- logger *zap.Logger
- }
- func newClient(cfg *Config, host component.Host, settings component.TelemetrySettings, logger *zap.Logger) (client, error) {
- httpClient, err := cfg.ToClient(host, settings)
- if err != nil {
- return nil, fmt.Errorf("failed to create HTTP Client: %w", err)
- }
- hostName, err := getHostname()
- if err != nil {
- return nil, err
- }
- return &flinkClient{
- client: httpClient,
- hostName: hostName,
- hostEndpoint: cfg.Endpoint,
- logger: logger,
- }, nil
- }
- func (c *flinkClient) get(ctx context.Context, path string) ([]byte, error) {
- // Construct endpoint and create request
- url := c.hostEndpoint + path
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
- if err != nil {
- return nil, fmt.Errorf("failed to create get request for path %s: %w", path, err)
- }
- // Make request
- resp, err := c.client.Do(req)
- if err != nil {
- return nil, fmt.Errorf("failed to make http request: %w", err)
- }
- // Defer body close
- defer func() {
- if closeErr := resp.Body.Close(); closeErr != nil {
- c.logger.Warn("failed to close response body", zap.Error(closeErr))
- }
- }()
- // Check for OK status code
- if resp.StatusCode != http.StatusOK {
- c.logger.Debug("flink API non-200", zap.Error(err), zap.Int("status_code", resp.StatusCode))
- // Attempt to extract the error payload
- payloadData, err := io.ReadAll(resp.Body)
- if err != nil {
- c.logger.Debug("failed to read payload error message", zap.Error(err))
- } else {
- c.logger.Debug("flink API Error", zap.ByteString("api_error", payloadData))
- }
- return nil, fmt.Errorf("non 200 code returned %d", resp.StatusCode)
- }
- return io.ReadAll(resp.Body)
- }
- // getMetrics makes a request to a metric endpoint to get the metric names, the another request building a query to get the metric values.
- func (c *flinkClient) getMetrics(ctx context.Context, path string) (*models.MetricsResponse, error) {
- // Get the metric names
- var metrics *models.MetricsResponse
- body, err := c.get(ctx, path)
- if err != nil {
- c.logger.Debug("failed to retrieve metric names", zap.Error(err))
- return nil, err
- }
- // Populates the metric names
- err = json.Unmarshal(body, &metrics)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
- }
- // Construct a get query parameter using comma-separated list of string values to select specific metrics
- query := make([]string, len(*metrics))
- for i, metricName := range *metrics {
- query[i] = metricName.ID
- }
- metricsPath := path + "?get=" + strings.Join(query, ",")
- // Get the metric values using the query
- body, err = c.get(ctx, metricsPath)
- if err != nil {
- c.logger.Debug("failed to retrieve metric values", zap.Error(err))
- return nil, err
- }
- // Populates metric values
- err = json.Unmarshal(body, &metrics)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
- }
- return metrics, nil
- }
- // GetJobManagerMetrics gets the jobmanager metrics.
- func (c *flinkClient) GetJobmanagerMetrics(ctx context.Context) (*models.JobmanagerMetrics, error) {
- // Get the metric names and values for jobmanager
- metrics, err := c.getMetrics(ctx, jobmanagerMetricEndpoint)
- if err != nil {
- return nil, err
- }
- // Add a hostname used to identify between multiple jobmanager instances
- return &models.JobmanagerMetrics{
- Host: c.hostName,
- Metrics: *metrics,
- }, nil
- }
- // GetTaskmanagersMetrics gets the Taskmanager metrics for each taskmanager.
- func (c *flinkClient) GetTaskmanagersMetrics(ctx context.Context) ([]*models.TaskmanagerMetrics, error) {
- // Get the taskmanager id list
- var taskmanagerIDs *models.TaskmanagerIDsResponse
- body, err := c.get(ctx, taskmanagersEndpoint)
- if err != nil {
- c.logger.Debug("failed to retrieve taskmanager IDs", zap.Error(err))
- return nil, err
- }
- // Populates taskmanager id names
- err = json.Unmarshal(body, &taskmanagerIDs)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
- }
- // Get taskmanager metrics for each taskmanager id
- return c.getTaskmanagersMetricsByIDs(ctx, taskmanagerIDs)
- }
- // getTaskmanagersMetricsByIDs gets taskmanager metrics for each task manager id.
- func (c *flinkClient) getTaskmanagersMetricsByIDs(ctx context.Context, taskmanagerIDs *models.TaskmanagerIDsResponse) ([]*models.TaskmanagerMetrics, error) {
- taskmanagerInstances := make([]*models.TaskmanagerMetrics, len(taskmanagerIDs.Taskmanagers))
- for i, taskmanager := range taskmanagerIDs.Taskmanagers {
- query := fmt.Sprintf(taskmanagersMetricEndpoint, taskmanager.ID)
- metrics, err := c.getMetrics(ctx, query)
- if err != nil {
- return nil, err
- }
- taskmanagerInstance := &models.TaskmanagerMetrics{
- TaskmanagerID: getTaskmanagerID(taskmanager.ID),
- Host: getTaskmanagerHost(taskmanager.ID),
- Metrics: *metrics,
- }
- taskmanagerInstances[i] = taskmanagerInstance
- }
- return taskmanagerInstances, nil
- }
- // GetJobsMetrics gets the job metrics for each job.
- func (c *flinkClient) GetJobsMetrics(ctx context.Context) ([]*models.JobMetrics, error) {
- // Get the job id and name list
- var jobIDs *models.JobOverviewResponse
- body, err := c.get(ctx, jobsOverviewEndpoint)
- if err != nil {
- c.logger.Debug("failed to retrieve job IDs", zap.Error(err))
- return nil, err
- }
- // Populates job id and names
- err = json.Unmarshal(body, &jobIDs)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
- }
- // Get job metrics for each job id
- return c.getJobsMetricsByIDs(ctx, jobIDs)
- }
- // getJobsMetricsByIDs gets jobs metrics for each job id.
- func (c *flinkClient) getJobsMetricsByIDs(ctx context.Context, jobIDs *models.JobOverviewResponse) ([]*models.JobMetrics, error) {
- jobInstances := make([]*models.JobMetrics, len(jobIDs.Jobs))
- for i, job := range jobIDs.Jobs {
- query := fmt.Sprintf(jobsMetricEndpoint, job.Jid)
- metrics, err := c.getMetrics(ctx, query)
- if err != nil {
- return nil, err
- }
- jobInstance := models.JobMetrics{
- Host: c.hostName,
- JobName: job.Name,
- Metrics: *metrics,
- }
- jobInstances[i] = &jobInstance
- }
- return jobInstances, nil
- }
- // GetSubtasksMetrics gets subtask metrics for each job id, vertex id and subtask index.
- func (c *flinkClient) GetSubtasksMetrics(ctx context.Context) ([]*models.SubtaskMetrics, error) {
- // Get the job id's
- var jobsResponse *models.JobsResponse
- body, err := c.get(ctx, jobsEndpoint)
- if err != nil {
- c.logger.Debug("failed to retrieve job IDs", zap.Error(err))
- return nil, err
- }
- // Populates the job id
- err = json.Unmarshal(body, &jobsResponse)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
- }
- return c.getSubtasksMetricsByIDs(ctx, jobsResponse)
- }
- // getSubtasksMetricsByIDs gets subtask metrics for each job id, vertex id and subtask index.
- func (c *flinkClient) getSubtasksMetricsByIDs(ctx context.Context, jobsResponse *models.JobsResponse) ([]*models.SubtaskMetrics, error) {
- var subtaskInstances []*models.SubtaskMetrics
- // Get vertices for each job
- for _, job := range jobsResponse.Jobs {
- var jobsWithIDResponse *models.JobsWithIDResponse
- query := fmt.Sprintf(jobsWithIDEndpoint, job.ID)
- body, err := c.get(ctx, query)
- if err != nil {
- c.logger.Debug("failed to retrieve job with ID", zap.Error(err))
- return nil, err
- }
- // Populates the job response with vertices info
- err = json.Unmarshal(body, &jobsWithIDResponse)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
- }
- // Gets subtask info for each vertex id
- for _, vertex := range jobsWithIDResponse.Vertices {
- var vertexResponse *models.VerticesResponse
- query := fmt.Sprintf(verticesEndpoint, job.ID, vertex.ID)
- body, err = c.get(ctx, query)
- if err != nil {
- c.logger.Debug("failed to retrieve vertex with ID", zap.Error(err))
- return nil, err
- }
- // Populates the vertex response with subtask info
- err = json.Unmarshal(body, &vertexResponse)
- if err != nil {
- return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
- }
- // Gets subtask metrics for each vertex id
- for _, subtask := range vertexResponse.Subtasks {
- query := fmt.Sprintf(subtaskMetricEndpoint, job.ID, vertex.ID, subtask.Subtask)
- subtaskMetrics, err := c.getMetrics(ctx, query)
- if err != nil {
- c.logger.Debug("failed to retrieve subtasks metrics", zap.Error(err))
- return nil, err
- }
- // Stores subtask info with additional attribute values to uniquely identify metrics
- subtaskInstances = append(subtaskInstances,
- &models.SubtaskMetrics{
- Host: getTaskmanagerHost(subtask.TaskmanagerID),
- TaskmanagerID: getTaskmanagerID(subtask.TaskmanagerID),
- JobName: jobsWithIDResponse.Name,
- TaskName: vertex.Name,
- SubtaskIndex: fmt.Sprintf("%v", subtask.Subtask),
- Metrics: *subtaskMetrics,
- })
- }
- }
- }
- return subtaskInstances, nil
- }
- // Override for testing
- var osHostname = os.Hostname
- func getHostname() (string, error) {
- host, err := osHostname()
- if err != nil {
- return "", err
- }
- return host, nil
- }
- // Override for testing
- var taskmanagerHost = strings.Split
- func getTaskmanagerHost(id string) string {
- host := taskmanagerHost(id, ":")
- return host[0]
- }
- func reflect(s string) string {
- return s
- }
- // Override for testing
- var taskmanagerID = reflect
- func getTaskmanagerID(id string) string {
- return taskmanagerID(id)
- }
|