client_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
  4. import (
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "net/http"
  9. "net/http/httptest"
  10. "os"
  11. "path/filepath"
  12. "regexp"
  13. "testing"
  14. "github.com/stretchr/testify/require"
  15. "go.opentelemetry.io/collector/component"
  16. "go.opentelemetry.io/collector/component/componenttest"
  17. "go.opentelemetry.io/collector/config/confighttp"
  18. "go.opentelemetry.io/collector/config/configtls"
  19. "go.uber.org/zap"
  20. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/models"
  21. )
  22. const (
  23. // filenames for api responses
  24. jobsIDs = "jobs_ids.json"
  25. jobsMetricValues = "jobs_metric_values.json"
  26. jobsWithID = "jobs_with_id.json"
  27. subtaskMetricValues = "subtask_metric_values.json"
  28. vertices = "vertices.json"
  29. jobmanagerMetricValues = "jobmanager_metric_values.json"
  30. jobsOverview = "jobs_overview.json"
  31. taskmanagerIds = "taskmanager_ids.json"
  32. taskmanagerMetricValues = "taskmanager_metric_values.json"
  33. // regex for endpoint matching
  34. jobsWithIDRegex = "^/jobs/[a-z0-9]+$"
  35. taskmanagerMetricNamesRegex = "^/taskmanagers/[a-z0-9.:-]+/metrics$"
  36. verticesRegex = "^/jobs/[a-z0-9]+/vertices/[a-z0-9]+$"
  37. jobsMetricNamesRegex = "^/jobs/[a-z0-9]+/metrics$"
  38. subtaskMetricNamesRegex = "^/jobs/[a-z0-9]+/vertices/[a-z0-9]+/subtasks/[0-9]+/metrics$"
  39. taskmanagerIDsRegex = "^/taskmanagers$"
  40. apiResponses = "apiresponses"
  41. )
  42. func TestNewClient(t *testing.T) {
  43. testCase := []struct {
  44. desc string
  45. cfg *Config
  46. host component.Host
  47. settings component.TelemetrySettings
  48. logger *zap.Logger
  49. expectError error
  50. }{
  51. {
  52. desc: "Invalid HTTP config",
  53. cfg: &Config{
  54. HTTPClientSettings: confighttp.HTTPClientSettings{
  55. Endpoint: defaultEndpoint,
  56. TLSSetting: configtls.TLSClientSetting{
  57. TLSSetting: configtls.TLSSetting{
  58. CAFile: "/non/existent",
  59. },
  60. },
  61. },
  62. },
  63. host: componenttest.NewNopHost(),
  64. settings: componenttest.NewNopTelemetrySettings(),
  65. logger: zap.NewNop(),
  66. expectError: errors.New("failed to create HTTP Client"),
  67. },
  68. {
  69. desc: "Valid Configuration",
  70. cfg: &Config{
  71. HTTPClientSettings: confighttp.HTTPClientSettings{
  72. TLSSetting: configtls.TLSClientSetting{},
  73. Endpoint: defaultEndpoint,
  74. },
  75. },
  76. host: componenttest.NewNopHost(),
  77. settings: componenttest.NewNopTelemetrySettings(),
  78. logger: zap.NewNop(),
  79. expectError: nil,
  80. },
  81. }
  82. for _, tc := range testCase {
  83. t.Run(tc.desc, func(t *testing.T) {
  84. ac, err := newClient(tc.cfg, tc.host, tc.settings, tc.logger)
  85. if tc.expectError != nil {
  86. require.Nil(t, ac)
  87. require.Contains(t, err.Error(), tc.expectError.Error())
  88. } else {
  89. require.NoError(t, err)
  90. actualClient, ok := ac.(*flinkClient)
  91. require.True(t, ok)
  92. require.Equal(t, tc.cfg.Endpoint, actualClient.hostEndpoint)
  93. require.Equal(t, tc.logger, actualClient.logger)
  94. require.NotNil(t, actualClient.client)
  95. }
  96. })
  97. }
  98. }
  99. func createTestClient(t *testing.T, baseEndpoint string) client {
  100. t.Helper()
  101. cfg := createDefaultConfig().(*Config)
  102. cfg.Endpoint = baseEndpoint
  103. testClient, err := newClient(cfg, componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), zap.NewNop())
  104. require.NoError(t, err)
  105. return testClient
  106. }
  107. func TestGetJobmanagerMetrics(t *testing.T) {
  108. testCases := []struct {
  109. desc string
  110. testFunc func(*testing.T)
  111. }{
  112. {
  113. desc: "Non-200 Response",
  114. testFunc: func(t *testing.T) {
  115. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  116. w.WriteHeader(http.StatusUnauthorized)
  117. }))
  118. defer ts.Close()
  119. tc := createTestClient(t, ts.URL)
  120. metrics, err := tc.GetJobmanagerMetrics(context.Background())
  121. require.Nil(t, metrics)
  122. require.EqualError(t, err, "non 200 code returned 401")
  123. },
  124. },
  125. {
  126. desc: "Bad payload returned",
  127. testFunc: func(t *testing.T) {
  128. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  129. _, err := w.Write([]byte("{"))
  130. require.NoError(t, err)
  131. }))
  132. defer ts.Close()
  133. tc := createTestClient(t, ts.URL)
  134. metrics, err := tc.GetJobmanagerMetrics(context.Background())
  135. require.Nil(t, metrics)
  136. require.Contains(t, err.Error(), "failed to unmarshal response body")
  137. },
  138. },
  139. {
  140. desc: "Successful call",
  141. testFunc: func(t *testing.T) {
  142. jobmanagerMetricValuesData := loadAPIResponseData(t, apiResponses, jobmanagerMetricValues)
  143. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  144. _, err := w.Write(jobmanagerMetricValuesData)
  145. require.NoError(t, err)
  146. }))
  147. defer ts.Close()
  148. tc := createTestClient(t, ts.URL)
  149. // Load the valid data into a struct to compare
  150. var expected *models.MetricsResponse
  151. err := json.Unmarshal(jobmanagerMetricValuesData, &expected)
  152. require.NoError(t, err)
  153. actual, err := tc.GetJobmanagerMetrics(context.Background())
  154. require.NoError(t, err)
  155. require.Equal(t, expected, &actual.Metrics)
  156. hostname, err := os.Hostname()
  157. require.Nil(t, err)
  158. require.EqualValues(t, hostname, actual.Host)
  159. },
  160. },
  161. }
  162. for _, tc := range testCases {
  163. t.Run(tc.desc, tc.testFunc)
  164. }
  165. }
  166. func TestGetTaskmanagersMetrics(t *testing.T) {
  167. testCases := []struct {
  168. desc string
  169. testFunc func(*testing.T)
  170. }{
  171. {
  172. desc: "Non-200 Response",
  173. testFunc: func(t *testing.T) {
  174. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  175. w.WriteHeader(http.StatusUnauthorized)
  176. }))
  177. defer ts.Close()
  178. tc := createTestClient(t, ts.URL)
  179. metrics, err := tc.GetTaskmanagersMetrics(context.Background())
  180. require.Nil(t, metrics)
  181. require.EqualError(t, err, "non 200 code returned 401")
  182. },
  183. },
  184. {
  185. desc: "Bad taskmanagers payload returned",
  186. testFunc: func(t *testing.T) {
  187. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  188. _, err := w.Write([]byte(`{`))
  189. require.NoError(t, err)
  190. }))
  191. defer ts.Close()
  192. tc := createTestClient(t, ts.URL)
  193. metrics, err := tc.GetTaskmanagersMetrics(context.Background())
  194. require.Nil(t, metrics)
  195. require.Contains(t, err.Error(), "failed to unmarshal response body:")
  196. },
  197. },
  198. {
  199. desc: "Bad taskmanagers metrics payload returned",
  200. testFunc: func(t *testing.T) {
  201. taskmanagerIDs := loadAPIResponseData(t, apiResponses, taskmanagerIds)
  202. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  203. if match, _ := regexp.MatchString(taskmanagerIDsRegex, r.URL.Path); match {
  204. _, err := w.Write(taskmanagerIDs)
  205. require.NoError(t, err)
  206. return
  207. }
  208. _, err := w.Write([]byte("{"))
  209. require.NoError(t, err)
  210. }))
  211. defer ts.Close()
  212. tc := createTestClient(t, ts.URL)
  213. metrics, err := tc.GetTaskmanagersMetrics(context.Background())
  214. require.Nil(t, metrics)
  215. require.Contains(t, err.Error(), "failed to unmarshal response body:")
  216. },
  217. },
  218. {
  219. desc: "Successful call",
  220. testFunc: func(t *testing.T) {
  221. taskmanagerIDs := loadAPIResponseData(t, apiResponses, taskmanagerIds)
  222. taskmanagerMetricValuesData := loadAPIResponseData(t, apiResponses, taskmanagerMetricValues)
  223. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  224. if match, _ := regexp.MatchString(taskmanagerIDsRegex, r.URL.Path); match {
  225. _, err := w.Write(taskmanagerIDs)
  226. require.NoError(t, err)
  227. return
  228. }
  229. if match, _ := regexp.MatchString(taskmanagerMetricNamesRegex, r.URL.Path); match {
  230. _, err := w.Write(taskmanagerMetricValuesData)
  231. require.NoError(t, err)
  232. return
  233. }
  234. }))
  235. defer ts.Close()
  236. tc := createTestClient(t, ts.URL)
  237. // Load the valid data into a struct to compare
  238. var expected *models.MetricsResponse
  239. err := json.Unmarshal(taskmanagerMetricValuesData, &expected)
  240. require.NoError(t, err)
  241. actual, err := tc.GetTaskmanagersMetrics(context.Background())
  242. require.NoError(t, err)
  243. require.Len(t, actual, 1)
  244. require.Equal(t, expected, &actual[0].Metrics)
  245. require.EqualValues(t, "172.26.0.3", actual[0].Host)
  246. require.EqualValues(t, "172.26.0.3:34457-7b2520", actual[0].TaskmanagerID)
  247. },
  248. },
  249. }
  250. for _, tc := range testCases {
  251. t.Run(tc.desc, tc.testFunc)
  252. }
  253. }
  254. func TestGetJobsMetrics(t *testing.T) {
  255. testCases := []struct {
  256. desc string
  257. testFunc func(*testing.T)
  258. }{
  259. {
  260. desc: "Non-200 Response",
  261. testFunc: func(t *testing.T) {
  262. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  263. w.WriteHeader(http.StatusUnauthorized)
  264. }))
  265. defer ts.Close()
  266. tc := createTestClient(t, ts.URL)
  267. metrics, err := tc.GetJobsMetrics(context.Background())
  268. require.Nil(t, metrics)
  269. require.EqualError(t, err, "non 200 code returned 401")
  270. },
  271. },
  272. {
  273. desc: "Bad payload returned",
  274. testFunc: func(t *testing.T) {
  275. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  276. _, err := w.Write([]byte(`{`))
  277. require.NoError(t, err)
  278. }))
  279. defer ts.Close()
  280. tc := createTestClient(t, ts.URL)
  281. metrics, err := tc.GetJobsMetrics(context.Background())
  282. require.Nil(t, metrics)
  283. require.Contains(t, err.Error(), "failed to unmarshal response body")
  284. },
  285. },
  286. {
  287. desc: "bad payload returned call",
  288. testFunc: func(t *testing.T) {
  289. jobsOverviewData := loadAPIResponseData(t, apiResponses, jobsOverview)
  290. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  291. if r.URL.Path == jobsOverviewEndpoint {
  292. _, err := w.Write(jobsOverviewData)
  293. require.NoError(t, err)
  294. return
  295. }
  296. _, err := w.Write([]byte(`{`))
  297. require.NoError(t, err)
  298. }))
  299. defer ts.Close()
  300. tc := createTestClient(t, ts.URL)
  301. metrics, err := tc.GetJobsMetrics(context.Background())
  302. require.Nil(t, metrics)
  303. require.Contains(t, err.Error(), "failed to unmarshal response body")
  304. },
  305. },
  306. {
  307. desc: "Successful call",
  308. testFunc: func(t *testing.T) {
  309. jobsOverviewData := loadAPIResponseData(t, apiResponses, jobsOverview)
  310. jobsMetricValuesData := loadAPIResponseData(t, apiResponses, jobsMetricValues)
  311. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  312. if r.URL.Path == jobsOverviewEndpoint {
  313. _, err := w.Write(jobsOverviewData)
  314. require.NoError(t, err)
  315. return
  316. }
  317. if match, _ := regexp.MatchString(jobsMetricNamesRegex, r.URL.Path); match {
  318. _, err := w.Write(jobsMetricValuesData)
  319. require.NoError(t, err)
  320. return
  321. }
  322. }))
  323. defer ts.Close()
  324. tc := createTestClient(t, ts.URL)
  325. // Load the valid data into a struct to compare
  326. var expected *models.MetricsResponse
  327. err := json.Unmarshal(jobsMetricValuesData, &expected)
  328. require.NoError(t, err)
  329. actual, err := tc.GetJobsMetrics(context.Background())
  330. require.NoError(t, err)
  331. require.Len(t, actual, 1)
  332. require.Equal(t, expected, &actual[0].Metrics)
  333. require.EqualValues(t, "State machine job", actual[0].JobName)
  334. hostname, err := os.Hostname()
  335. require.Nil(t, err)
  336. require.EqualValues(t, hostname, actual[0].Host)
  337. },
  338. },
  339. }
  340. for _, tc := range testCases {
  341. t.Run(tc.desc, tc.testFunc)
  342. }
  343. }
  344. func TestGetSubtasksMetrics(t *testing.T) {
  345. testCases := []struct {
  346. desc string
  347. testFunc func(*testing.T)
  348. }{
  349. {
  350. desc: "Non-200 Response",
  351. testFunc: func(t *testing.T) {
  352. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  353. w.WriteHeader(http.StatusUnauthorized)
  354. }))
  355. defer ts.Close()
  356. tc := createTestClient(t, ts.URL)
  357. metrics, err := tc.GetSubtasksMetrics(context.Background())
  358. require.Nil(t, metrics)
  359. require.EqualError(t, err, "non 200 code returned 401")
  360. },
  361. },
  362. {
  363. desc: "Bad payload returned",
  364. testFunc: func(t *testing.T) {
  365. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  366. _, err := w.Write([]byte("{"))
  367. require.NoError(t, err)
  368. }))
  369. defer ts.Close()
  370. tc := createTestClient(t, ts.URL)
  371. metrics, err := tc.GetSubtasksMetrics(context.Background())
  372. require.Nil(t, metrics)
  373. require.Contains(t, err.Error(), "failed to unmarshal response body")
  374. },
  375. },
  376. {
  377. desc: "Bad payload jobs IDs returned",
  378. testFunc: func(t *testing.T) {
  379. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  380. jobsData := loadAPIResponseData(t, apiResponses, jobsIDs)
  381. if r.URL.Path == jobsEndpoint {
  382. _, err := w.Write(jobsData)
  383. require.NoError(t, err)
  384. return
  385. }
  386. _, err := w.Write([]byte("{"))
  387. require.NoError(t, err)
  388. }))
  389. defer ts.Close()
  390. tc := createTestClient(t, ts.URL)
  391. metrics, err := tc.GetSubtasksMetrics(context.Background())
  392. require.Nil(t, metrics)
  393. require.Contains(t, err.Error(), "failed to unmarshal response body")
  394. },
  395. },
  396. {
  397. desc: "Bad payload vertices IDs returned",
  398. testFunc: func(t *testing.T) {
  399. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  400. jobsData := loadAPIResponseData(t, apiResponses, jobsIDs)
  401. jobsWithIDData := loadAPIResponseData(t, apiResponses, jobsWithID)
  402. if r.URL.Path == jobsEndpoint {
  403. _, err := w.Write(jobsData)
  404. require.NoError(t, err)
  405. return
  406. }
  407. if match, _ := regexp.MatchString(jobsWithIDRegex, r.URL.Path); match {
  408. _, err := w.Write(jobsWithIDData)
  409. require.NoError(t, err)
  410. return
  411. }
  412. _, err := w.Write([]byte("{"))
  413. require.NoError(t, err)
  414. }))
  415. defer ts.Close()
  416. tc := createTestClient(t, ts.URL)
  417. metrics, err := tc.GetSubtasksMetrics(context.Background())
  418. require.Nil(t, metrics)
  419. require.Contains(t, err.Error(), "failed to unmarshal response body")
  420. },
  421. },
  422. {
  423. desc: "Bad payload subtask metrics returned",
  424. testFunc: func(t *testing.T) {
  425. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  426. jobsData := loadAPIResponseData(t, apiResponses, jobsIDs)
  427. jobsWithIDData := loadAPIResponseData(t, apiResponses, jobsWithID)
  428. verticesData := loadAPIResponseData(t, apiResponses, vertices)
  429. if r.URL.Path == jobsEndpoint {
  430. _, err := w.Write(jobsData)
  431. require.NoError(t, err)
  432. return
  433. }
  434. if match, _ := regexp.MatchString(jobsWithIDRegex, r.URL.Path); match {
  435. _, err := w.Write(jobsWithIDData)
  436. require.NoError(t, err)
  437. return
  438. }
  439. if match, _ := regexp.MatchString(verticesRegex, r.URL.Path); match {
  440. _, err := w.Write(verticesData)
  441. require.NoError(t, err)
  442. return
  443. }
  444. _, err := w.Write([]byte("{"))
  445. require.NoError(t, err)
  446. }))
  447. defer ts.Close()
  448. tc := createTestClient(t, ts.URL)
  449. metrics, err := tc.GetSubtasksMetrics(context.Background())
  450. require.Nil(t, metrics)
  451. require.Contains(t, err.Error(), "failed to unmarshal response body")
  452. },
  453. },
  454. {
  455. desc: "Successful call",
  456. testFunc: func(t *testing.T) {
  457. jobsData := loadAPIResponseData(t, apiResponses, jobsIDs)
  458. jobsWithIDData := loadAPIResponseData(t, apiResponses, jobsWithID)
  459. verticesData := loadAPIResponseData(t, apiResponses, vertices)
  460. subtaskMetricValuesData := loadAPIResponseData(t, apiResponses, subtaskMetricValues)
  461. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  462. if r.URL.Path == jobsEndpoint {
  463. _, err := w.Write(jobsData)
  464. require.NoError(t, err)
  465. return
  466. }
  467. if match, _ := regexp.MatchString(jobsWithIDRegex, r.URL.Path); match {
  468. _, err := w.Write(jobsWithIDData)
  469. require.NoError(t, err)
  470. return
  471. }
  472. if match, _ := regexp.MatchString(verticesRegex, r.URL.Path); match {
  473. _, err := w.Write(verticesData)
  474. require.NoError(t, err)
  475. return
  476. }
  477. if match, _ := regexp.MatchString(subtaskMetricNamesRegex, r.URL.Path); match {
  478. _, err := w.Write(subtaskMetricValuesData)
  479. require.NoError(t, err)
  480. return
  481. }
  482. }))
  483. defer ts.Close()
  484. tc := createTestClient(t, ts.URL)
  485. var e *models.JobsResponse
  486. _ = json.Unmarshal(jobsData, &e)
  487. require.EqualValues(t, e.Jobs[0].ID, "54a5c6e527e00e1bb861272a39fe13e4")
  488. // Load the valid data into a struct to compare
  489. var expected *models.MetricsResponse
  490. err := json.Unmarshal(subtaskMetricValuesData, &expected)
  491. require.NoError(t, err)
  492. actual, err := tc.GetSubtasksMetrics(context.Background())
  493. require.NoError(t, err)
  494. require.Len(t, actual, 2)
  495. require.Equal(t, expected, &actual[0].Metrics)
  496. require.EqualValues(t, "State machine job", actual[0].JobName)
  497. require.EqualValues(t, "172.26.0.3", actual[0].Host)
  498. // require.EqualValues(t, "flink-worker", actual[0].Host)
  499. require.EqualValues(t, "172.26.0.3:34457-7b2520", actual[0].TaskmanagerID)
  500. require.EqualValues(t, "Source: Custom Source", actual[0].TaskName)
  501. require.EqualValues(t, "0", actual[0].SubtaskIndex)
  502. },
  503. },
  504. }
  505. for _, tc := range testCases {
  506. t.Run(tc.desc, tc.testFunc)
  507. }
  508. }
  509. func loadAPIResponseData(t *testing.T, folder, fileName string) []byte {
  510. t.Helper()
  511. fullPath := filepath.Join("testdata", folder, fileName)
  512. data, err := os.ReadFile(fullPath)
  513. require.NoError(t, err)
  514. return data
  515. }