123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package apachesparkreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver"
- import (
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strconv"
- "go.opentelemetry.io/collector/component"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver/internal/models"
- )
- const (
- metricsPath = "/metrics/json"
- applicationsPath = "/api/v1/applications"
- )
- type client interface {
- Get(path string) ([]byte, error)
- ClusterStats() (*models.ClusterProperties, error)
- Applications() ([]models.Application, error)
- StageStats(appID string) ([]models.Stage, error)
- ExecutorStats(appID string) ([]models.Executor, error)
- JobStats(appID string) ([]models.Job, error)
- }
- var _ client = (*apacheSparkClient)(nil)
- type apacheSparkClient struct {
- client *http.Client
- cfg *Config
- logger *zap.Logger
- }
- // newApacheSparkClient creates a new client to make requests for the Apache Spark receiver.
- func newApacheSparkClient(cfg *Config, host component.Host, settings component.TelemetrySettings) (client, error) {
- client, err := cfg.ToClient(host, settings)
- if err != nil {
- return nil, fmt.Errorf("failed to create HTTP Client: %w", err)
- }
- return &apacheSparkClient{
- client: client,
- cfg: cfg,
- logger: settings.Logger,
- }, nil
- }
- // Get issues an authorized Get requests to the specified URL.
- func (c *apacheSparkClient) Get(path string) ([]byte, error) {
- req, err := c.buildReq(path)
- if err != nil {
- return nil, err
- }
- resp, err := c.client.Do(req)
- if err != nil {
- return nil, err
- }
- defer func() {
- if err = resp.Body.Close(); err != nil {
- c.logger.Warn("failed to close response body", zap.Error(err))
- }
- }()
- if resp.StatusCode != http.StatusOK {
- if resp.StatusCode >= 400 {
- c.logger.Error("apachespark", zap.Error(err), zap.String("status_code", strconv.Itoa(resp.StatusCode)))
- }
- return nil, fmt.Errorf("request GET %s failed - %q", req.URL.String(), resp.Status)
- }
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- return nil, fmt.Errorf("failed to read response body %w", err)
- }
- return body, nil
- }
- func (c *apacheSparkClient) ClusterStats() (*models.ClusterProperties, error) {
- body, err := c.Get(metricsPath)
- if err != nil {
- return nil, err
- }
- var clusterStats *models.ClusterProperties
- err = json.Unmarshal(body, &clusterStats)
- if err != nil {
- return nil, err
- }
- return clusterStats, nil
- }
- func (c *apacheSparkClient) Applications() ([]models.Application, error) {
- body, err := c.Get(applicationsPath)
- if err != nil {
- return nil, err
- }
- var apps []models.Application
- err = json.Unmarshal(body, &apps)
- if err != nil {
- return nil, err
- }
- return apps, nil
- }
- func (c *apacheSparkClient) StageStats(appID string) ([]models.Stage, error) {
- stagePath := fmt.Sprintf("%s/%s/stages", applicationsPath, appID)
- body, err := c.Get(stagePath)
- if err != nil {
- return nil, err
- }
- var stageStats []models.Stage
- err = json.Unmarshal(body, &stageStats)
- if err != nil {
- return nil, err
- }
- return stageStats, nil
- }
- func (c *apacheSparkClient) ExecutorStats(appID string) ([]models.Executor, error) {
- executorPath := fmt.Sprintf("%s/%s/executors", applicationsPath, appID)
- body, err := c.Get(executorPath)
- if err != nil {
- return nil, err
- }
- var executorStats []models.Executor
- err = json.Unmarshal(body, &executorStats)
- if err != nil {
- return nil, err
- }
- return executorStats, nil
- }
- func (c *apacheSparkClient) JobStats(appID string) ([]models.Job, error) {
- jobPath := fmt.Sprintf("%s/%s/jobs", applicationsPath, appID)
- body, err := c.Get(jobPath)
- if err != nil {
- return nil, err
- }
- var jobStats []models.Job
- err = json.Unmarshal(body, &jobStats)
- if err != nil {
- return nil, err
- }
- return jobStats, nil
- }
- func (c *apacheSparkClient) buildReq(path string) (*http.Request, error) {
- url := c.cfg.Endpoint + path
- req, err := http.NewRequest(http.MethodGet, url, nil)
- if err != nil {
- return nil, err
- }
- return req, nil
- }
|