client.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
  4. import (
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "os"
  11. "strings"
  12. "go.opentelemetry.io/collector/component"
  13. "go.uber.org/zap"
  14. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/models"
  15. )
  16. // The API endpoints required to collect metrics.
  17. const (
  18. // jobmanagerMetricEndpoint gets jobmanager metrics.
  19. jobmanagerMetricEndpoint = "/jobmanager/metrics"
  20. // taskmanagersEndpoint gets taskmanager IDs.
  21. taskmanagersEndpoint = "/taskmanagers"
  22. // taskmanagersMetricEndpoint gets taskmanager using a taskmanager ID.
  23. taskmanagersMetricEndpoint = "/taskmanagers/%s/metrics"
  24. // jobsEndpoint gets job IDs.
  25. jobsEndpoint = "/jobs"
  26. // jobsOverviewEndpoint gets job IDs with associated Job names.
  27. jobsOverviewEndpoint = "/jobs/overview"
  28. // jobsWithIDEndpoint gets vertex IDs using a job ID.
  29. jobsWithIDEndpoint = "/jobs/%s"
  30. // jobsMetricEndpoint gets job metrics using a job ID.
  31. jobsMetricEndpoint = "/jobs/%s/metrics"
  32. // verticesEndpoint gets subtask index's using a job and vertex ID.
  33. verticesEndpoint = "/jobs/%s/vertices/%s"
  34. // subtaskMetricEndpoint gets subtask metrics using a job ID, vertex ID and subtask index.
  35. subtaskMetricEndpoint = "/jobs/%s/vertices/%s/subtasks/%v/metrics"
  36. )
  37. type client interface {
  38. GetJobmanagerMetrics(ctx context.Context) (*models.JobmanagerMetrics, error)
  39. GetTaskmanagersMetrics(ctx context.Context) ([]*models.TaskmanagerMetrics, error)
  40. GetJobsMetrics(ctx context.Context) ([]*models.JobMetrics, error)
  41. GetSubtasksMetrics(ctx context.Context) ([]*models.SubtaskMetrics, error)
  42. }
  43. type flinkClient struct {
  44. client *http.Client
  45. hostEndpoint string
  46. hostName string
  47. logger *zap.Logger
  48. }
  49. func newClient(cfg *Config, host component.Host, settings component.TelemetrySettings, logger *zap.Logger) (client, error) {
  50. httpClient, err := cfg.ToClient(host, settings)
  51. if err != nil {
  52. return nil, fmt.Errorf("failed to create HTTP Client: %w", err)
  53. }
  54. hostName, err := getHostname()
  55. if err != nil {
  56. return nil, err
  57. }
  58. return &flinkClient{
  59. client: httpClient,
  60. hostName: hostName,
  61. hostEndpoint: cfg.Endpoint,
  62. logger: logger,
  63. }, nil
  64. }
  65. func (c *flinkClient) get(ctx context.Context, path string) ([]byte, error) {
  66. // Construct endpoint and create request
  67. url := c.hostEndpoint + path
  68. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
  69. if err != nil {
  70. return nil, fmt.Errorf("failed to create get request for path %s: %w", path, err)
  71. }
  72. // Make request
  73. resp, err := c.client.Do(req)
  74. if err != nil {
  75. return nil, fmt.Errorf("failed to make http request: %w", err)
  76. }
  77. // Defer body close
  78. defer func() {
  79. if closeErr := resp.Body.Close(); closeErr != nil {
  80. c.logger.Warn("failed to close response body", zap.Error(closeErr))
  81. }
  82. }()
  83. // Check for OK status code
  84. if resp.StatusCode != http.StatusOK {
  85. c.logger.Debug("flink API non-200", zap.Error(err), zap.Int("status_code", resp.StatusCode))
  86. // Attempt to extract the error payload
  87. payloadData, err := io.ReadAll(resp.Body)
  88. if err != nil {
  89. c.logger.Debug("failed to read payload error message", zap.Error(err))
  90. } else {
  91. c.logger.Debug("flink API Error", zap.ByteString("api_error", payloadData))
  92. }
  93. return nil, fmt.Errorf("non 200 code returned %d", resp.StatusCode)
  94. }
  95. return io.ReadAll(resp.Body)
  96. }
  97. // getMetrics makes a request to a metric endpoint to get the metric names, the another request building a query to get the metric values.
  98. func (c *flinkClient) getMetrics(ctx context.Context, path string) (*models.MetricsResponse, error) {
  99. // Get the metric names
  100. var metrics *models.MetricsResponse
  101. body, err := c.get(ctx, path)
  102. if err != nil {
  103. c.logger.Debug("failed to retrieve metric names", zap.Error(err))
  104. return nil, err
  105. }
  106. // Populates the metric names
  107. err = json.Unmarshal(body, &metrics)
  108. if err != nil {
  109. return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
  110. }
  111. // Construct a get query parameter using comma-separated list of string values to select specific metrics
  112. query := make([]string, len(*metrics))
  113. for i, metricName := range *metrics {
  114. query[i] = metricName.ID
  115. }
  116. metricsPath := path + "?get=" + strings.Join(query, ",")
  117. // Get the metric values using the query
  118. body, err = c.get(ctx, metricsPath)
  119. if err != nil {
  120. c.logger.Debug("failed to retrieve metric values", zap.Error(err))
  121. return nil, err
  122. }
  123. // Populates metric values
  124. err = json.Unmarshal(body, &metrics)
  125. if err != nil {
  126. return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
  127. }
  128. return metrics, nil
  129. }
  130. // GetJobManagerMetrics gets the jobmanager metrics.
  131. func (c *flinkClient) GetJobmanagerMetrics(ctx context.Context) (*models.JobmanagerMetrics, error) {
  132. // Get the metric names and values for jobmanager
  133. metrics, err := c.getMetrics(ctx, jobmanagerMetricEndpoint)
  134. if err != nil {
  135. return nil, err
  136. }
  137. // Add a hostname used to identify between multiple jobmanager instances
  138. return &models.JobmanagerMetrics{
  139. Host: c.hostName,
  140. Metrics: *metrics,
  141. }, nil
  142. }
  143. // GetTaskmanagersMetrics gets the Taskmanager metrics for each taskmanager.
  144. func (c *flinkClient) GetTaskmanagersMetrics(ctx context.Context) ([]*models.TaskmanagerMetrics, error) {
  145. // Get the taskmanager id list
  146. var taskmanagerIDs *models.TaskmanagerIDsResponse
  147. body, err := c.get(ctx, taskmanagersEndpoint)
  148. if err != nil {
  149. c.logger.Debug("failed to retrieve taskmanager IDs", zap.Error(err))
  150. return nil, err
  151. }
  152. // Populates taskmanager id names
  153. err = json.Unmarshal(body, &taskmanagerIDs)
  154. if err != nil {
  155. return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
  156. }
  157. // Get taskmanager metrics for each taskmanager id
  158. return c.getTaskmanagersMetricsByIDs(ctx, taskmanagerIDs)
  159. }
  160. // getTaskmanagersMetricsByIDs gets taskmanager metrics for each task manager id.
  161. func (c *flinkClient) getTaskmanagersMetricsByIDs(ctx context.Context, taskmanagerIDs *models.TaskmanagerIDsResponse) ([]*models.TaskmanagerMetrics, error) {
  162. taskmanagerInstances := make([]*models.TaskmanagerMetrics, len(taskmanagerIDs.Taskmanagers))
  163. for i, taskmanager := range taskmanagerIDs.Taskmanagers {
  164. query := fmt.Sprintf(taskmanagersMetricEndpoint, taskmanager.ID)
  165. metrics, err := c.getMetrics(ctx, query)
  166. if err != nil {
  167. return nil, err
  168. }
  169. taskmanagerInstance := &models.TaskmanagerMetrics{
  170. TaskmanagerID: getTaskmanagerID(taskmanager.ID),
  171. Host: getTaskmanagerHost(taskmanager.ID),
  172. Metrics: *metrics,
  173. }
  174. taskmanagerInstances[i] = taskmanagerInstance
  175. }
  176. return taskmanagerInstances, nil
  177. }
  178. // GetJobsMetrics gets the job metrics for each job.
  179. func (c *flinkClient) GetJobsMetrics(ctx context.Context) ([]*models.JobMetrics, error) {
  180. // Get the job id and name list
  181. var jobIDs *models.JobOverviewResponse
  182. body, err := c.get(ctx, jobsOverviewEndpoint)
  183. if err != nil {
  184. c.logger.Debug("failed to retrieve job IDs", zap.Error(err))
  185. return nil, err
  186. }
  187. // Populates job id and names
  188. err = json.Unmarshal(body, &jobIDs)
  189. if err != nil {
  190. return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
  191. }
  192. // Get job metrics for each job id
  193. return c.getJobsMetricsByIDs(ctx, jobIDs)
  194. }
  195. // getJobsMetricsByIDs gets jobs metrics for each job id.
  196. func (c *flinkClient) getJobsMetricsByIDs(ctx context.Context, jobIDs *models.JobOverviewResponse) ([]*models.JobMetrics, error) {
  197. jobInstances := make([]*models.JobMetrics, len(jobIDs.Jobs))
  198. for i, job := range jobIDs.Jobs {
  199. query := fmt.Sprintf(jobsMetricEndpoint, job.Jid)
  200. metrics, err := c.getMetrics(ctx, query)
  201. if err != nil {
  202. return nil, err
  203. }
  204. jobInstance := models.JobMetrics{
  205. Host: c.hostName,
  206. JobName: job.Name,
  207. Metrics: *metrics,
  208. }
  209. jobInstances[i] = &jobInstance
  210. }
  211. return jobInstances, nil
  212. }
  213. // GetSubtasksMetrics gets subtask metrics for each job id, vertex id and subtask index.
  214. func (c *flinkClient) GetSubtasksMetrics(ctx context.Context) ([]*models.SubtaskMetrics, error) {
  215. // Get the job id's
  216. var jobsResponse *models.JobsResponse
  217. body, err := c.get(ctx, jobsEndpoint)
  218. if err != nil {
  219. c.logger.Debug("failed to retrieve job IDs", zap.Error(err))
  220. return nil, err
  221. }
  222. // Populates the job id
  223. err = json.Unmarshal(body, &jobsResponse)
  224. if err != nil {
  225. return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
  226. }
  227. return c.getSubtasksMetricsByIDs(ctx, jobsResponse)
  228. }
  229. // getSubtasksMetricsByIDs gets subtask metrics for each job id, vertex id and subtask index.
  230. func (c *flinkClient) getSubtasksMetricsByIDs(ctx context.Context, jobsResponse *models.JobsResponse) ([]*models.SubtaskMetrics, error) {
  231. var subtaskInstances []*models.SubtaskMetrics
  232. // Get vertices for each job
  233. for _, job := range jobsResponse.Jobs {
  234. var jobsWithIDResponse *models.JobsWithIDResponse
  235. query := fmt.Sprintf(jobsWithIDEndpoint, job.ID)
  236. body, err := c.get(ctx, query)
  237. if err != nil {
  238. c.logger.Debug("failed to retrieve job with ID", zap.Error(err))
  239. return nil, err
  240. }
  241. // Populates the job response with vertices info
  242. err = json.Unmarshal(body, &jobsWithIDResponse)
  243. if err != nil {
  244. return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
  245. }
  246. // Gets subtask info for each vertex id
  247. for _, vertex := range jobsWithIDResponse.Vertices {
  248. var vertexResponse *models.VerticesResponse
  249. query := fmt.Sprintf(verticesEndpoint, job.ID, vertex.ID)
  250. body, err = c.get(ctx, query)
  251. if err != nil {
  252. c.logger.Debug("failed to retrieve vertex with ID", zap.Error(err))
  253. return nil, err
  254. }
  255. // Populates the vertex response with subtask info
  256. err = json.Unmarshal(body, &vertexResponse)
  257. if err != nil {
  258. return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
  259. }
  260. // Gets subtask metrics for each vertex id
  261. for _, subtask := range vertexResponse.Subtasks {
  262. query := fmt.Sprintf(subtaskMetricEndpoint, job.ID, vertex.ID, subtask.Subtask)
  263. subtaskMetrics, err := c.getMetrics(ctx, query)
  264. if err != nil {
  265. c.logger.Debug("failed to retrieve subtasks metrics", zap.Error(err))
  266. return nil, err
  267. }
  268. // Stores subtask info with additional attribute values to uniquely identify metrics
  269. subtaskInstances = append(subtaskInstances,
  270. &models.SubtaskMetrics{
  271. Host: getTaskmanagerHost(subtask.TaskmanagerID),
  272. TaskmanagerID: getTaskmanagerID(subtask.TaskmanagerID),
  273. JobName: jobsWithIDResponse.Name,
  274. TaskName: vertex.Name,
  275. SubtaskIndex: fmt.Sprintf("%v", subtask.Subtask),
  276. Metrics: *subtaskMetrics,
  277. })
  278. }
  279. }
  280. }
  281. return subtaskInstances, nil
  282. }
  283. // Override for testing
  284. var osHostname = os.Hostname
  285. func getHostname() (string, error) {
  286. host, err := osHostname()
  287. if err != nil {
  288. return "", err
  289. }
  290. return host, nil
  291. }
  292. // Override for testing
  293. var taskmanagerHost = strings.Split
  294. func getTaskmanagerHost(id string) string {
  295. host := taskmanagerHost(id, ":")
  296. return host[0]
  297. }
  298. func reflect(s string) string {
  299. return s
  300. }
  301. // Override for testing
  302. var taskmanagerID = reflect
  303. func getTaskmanagerID(id string) string {
  304. return taskmanagerID(id)
  305. }