metrics_receiver_target_allocator_test.go 17 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. //go:build !race
  4. package prometheusreceiver
  5. import (
  6. "context"
  7. "encoding/json"
  8. "net/http"
  9. "net/http/httptest"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. commonconfig "github.com/prometheus/common/config"
  16. "github.com/prometheus/common/model"
  17. promConfig "github.com/prometheus/prometheus/config"
  18. promHTTP "github.com/prometheus/prometheus/discovery/http"
  19. "github.com/stretchr/testify/require"
  20. "go.opentelemetry.io/collector/component/componenttest"
  21. "go.opentelemetry.io/collector/consumer/consumertest"
  22. "go.opentelemetry.io/collector/receiver/receivertest"
  23. )
  24. type MockTargetAllocator struct {
  25. mu sync.Mutex // mu protects the fields below.
  26. endpoints map[string][]mockTargetAllocatorResponse
  27. accessIndex map[string]*atomic.Int32
  28. wg *sync.WaitGroup
  29. srv *httptest.Server
  30. waitIndex map[string]int
  31. }
  32. type mockTargetAllocatorResponse struct {
  33. code int
  34. data []byte
  35. }
  36. type mockTargetAllocatorResponseRaw struct {
  37. code int
  38. data any
  39. }
  40. type hTTPSDResponse struct {
  41. Targets []string `json:"targets"`
  42. Labels map[model.LabelName]model.LabelValue `json:"labels"`
  43. }
  44. type expectedTestResultJobMap struct {
  45. Targets []string
  46. Labels model.LabelSet
  47. }
  48. type expectedTestResult struct {
  49. empty bool
  50. jobMap map[string]expectedTestResultJobMap
  51. }
  52. func (mta *MockTargetAllocator) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
  53. mta.mu.Lock()
  54. defer mta.mu.Unlock()
  55. iptr, ok := mta.accessIndex[req.URL.Path]
  56. if !ok {
  57. rw.WriteHeader(404)
  58. return
  59. }
  60. index := int(iptr.Load())
  61. iptr.Add(1)
  62. pages := mta.endpoints[req.URL.Path]
  63. if index >= len(pages) {
  64. rw.WriteHeader(404)
  65. return
  66. }
  67. rw.Header().Set("Content-Type", "application/json")
  68. rw.WriteHeader(pages[index].code)
  69. _, _ = rw.Write(pages[index].data)
  70. // release WaitGroup after all endpoints have been hit by Prometheus SD once. After that we will call them manually
  71. wait := mta.waitIndex[req.URL.Path]
  72. if index == wait {
  73. mta.wg.Done()
  74. }
  75. }
  76. func (mta *MockTargetAllocator) Start() {
  77. mta.srv.Start()
  78. }
  79. func (mta *MockTargetAllocator) Stop() {
  80. mta.srv.Close()
  81. }
  82. func transformTAResponseMap(rawResponses map[string][]mockTargetAllocatorResponseRaw) (map[string][]mockTargetAllocatorResponse, map[string]*atomic.Int32, error) {
  83. responsesMap := make(map[string][]mockTargetAllocatorResponse)
  84. responsesIndexMap := make(map[string]*atomic.Int32)
  85. for path, responsesRaw := range rawResponses {
  86. var responses []mockTargetAllocatorResponse
  87. for _, responseRaw := range responsesRaw {
  88. respBodyBytes, err := json.Marshal(responseRaw.data)
  89. if err != nil {
  90. return nil, nil, err
  91. }
  92. responses = append(responses, mockTargetAllocatorResponse{
  93. code: responseRaw.code,
  94. data: respBodyBytes,
  95. })
  96. }
  97. responsesMap[path] = responses
  98. v := &atomic.Int32{}
  99. responsesIndexMap[path] = v
  100. }
  101. return responsesMap, responsesIndexMap, nil
  102. }
  103. func setupMockTargetAllocator(responses Responses) (*MockTargetAllocator, error) {
  104. responsesMap, responsesIndexMap, err := transformTAResponseMap(responses.responses)
  105. if err != nil {
  106. return nil, err
  107. }
  108. mockTA := &MockTargetAllocator{
  109. endpoints: responsesMap,
  110. accessIndex: responsesIndexMap,
  111. waitIndex: responses.releaserMap,
  112. wg: &sync.WaitGroup{},
  113. }
  114. mockTA.srv = httptest.NewUnstartedServer(mockTA)
  115. mockTA.wg.Add(len(responsesMap))
  116. return mockTA, nil
  117. }
  118. func labelSetTargetsToList(sets []model.LabelSet) []string {
  119. result := make([]string, len(sets))
  120. for i, set := range sets {
  121. address := set["__address__"]
  122. result[i] = string(address)
  123. }
  124. return result
  125. }
  126. type Responses struct {
  127. releaserMap map[string]int
  128. responses map[string][]mockTargetAllocatorResponseRaw
  129. }
  130. func TestTargetAllocatorJobRetrieval(t *testing.T) {
  131. for _, tc := range []struct {
  132. desc string
  133. responses Responses
  134. cfg *Config
  135. want expectedTestResult
  136. }{
  137. {
  138. desc: "default",
  139. responses: Responses{
  140. responses: map[string][]mockTargetAllocatorResponseRaw{
  141. "/scrape_configs": {
  142. mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]any{
  143. "job1": {
  144. "job_name": "job1",
  145. "scrape_interval": "30s",
  146. "scrape_timeout": "30s",
  147. "metrics_path": "/metrics",
  148. "scheme": "http",
  149. "relabel_configs": nil,
  150. "metric_relabel_configs": nil,
  151. },
  152. "job2": {
  153. "job_name": "job2",
  154. "scrape_interval": "30s",
  155. "scrape_timeout": "30s",
  156. "metrics_path": "/metrics",
  157. "scheme": "http",
  158. "relabel_configs": nil,
  159. "metric_relabel_configs": nil,
  160. },
  161. }},
  162. },
  163. "/jobs/job1/targets": {
  164. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  165. {Targets: []string{"localhost:9090", "10.0.10.3:9100", "10.0.10.4:9100", "10.0.10.5:9100"},
  166. Labels: map[model.LabelName]model.LabelValue{
  167. "__meta_datacenter": "london",
  168. "__meta_prometheus_job": "node",
  169. }},
  170. }},
  171. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  172. {Targets: []string{"localhost:9090", "10.0.10.3:9100", "10.0.10.4:9100", "10.0.10.5:9100"},
  173. Labels: map[model.LabelName]model.LabelValue{
  174. "__meta_datacenter": "london",
  175. "__meta_prometheus_job": "node",
  176. }},
  177. }},
  178. },
  179. "/jobs/job2/targets": {
  180. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  181. {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"},
  182. Labels: map[model.LabelName]model.LabelValue{
  183. "__meta_datacenter": "london",
  184. "__meta_prometheus_job": "alertmanager",
  185. }},
  186. }},
  187. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  188. {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"},
  189. Labels: map[model.LabelName]model.LabelValue{
  190. "__meta_datacenter": "london",
  191. "__meta_prometheus_job": "alertmanager",
  192. }},
  193. }},
  194. },
  195. },
  196. },
  197. cfg: &Config{
  198. PrometheusConfig: &promConfig.Config{},
  199. TargetAllocator: &targetAllocator{
  200. Interval: 10 * time.Second,
  201. CollectorID: "collector-1",
  202. HTTPSDConfig: &promHTTP.SDConfig{
  203. HTTPClientConfig: commonconfig.HTTPClientConfig{
  204. BasicAuth: &commonconfig.BasicAuth{
  205. Username: "user",
  206. Password: "aPassword",
  207. },
  208. },
  209. RefreshInterval: model.Duration(60 * time.Second),
  210. },
  211. },
  212. },
  213. want: expectedTestResult{
  214. empty: false,
  215. jobMap: map[string]expectedTestResultJobMap{
  216. "job1": {
  217. Targets: []string{"localhost:9090", "10.0.10.3:9100", "10.0.10.4:9100", "10.0.10.5:9100"},
  218. Labels: map[model.LabelName]model.LabelValue{
  219. "__meta_datacenter": "london",
  220. "__meta_prometheus_job": "node",
  221. },
  222. },
  223. "job2": {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"},
  224. Labels: map[model.LabelName]model.LabelValue{
  225. "__meta_datacenter": "london",
  226. "__meta_prometheus_job": "alertmanager",
  227. }},
  228. },
  229. },
  230. },
  231. {
  232. desc: "update labels and targets",
  233. responses: Responses{
  234. responses: map[string][]mockTargetAllocatorResponseRaw{
  235. "/scrape_configs": {
  236. mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]any{
  237. "job1": {
  238. "job_name": "job1",
  239. "scrape_interval": "30s",
  240. "scrape_timeout": "30s",
  241. "metrics_path": "/metrics",
  242. "scheme": "http",
  243. "relabel_configs": nil,
  244. "metric_relabel_configs": nil,
  245. },
  246. "job2": {
  247. "job_name": "job2",
  248. "scrape_interval": "30s",
  249. "scrape_timeout": "30s",
  250. "metrics_path": "/metrics",
  251. "scheme": "http",
  252. "relabel_configs": nil,
  253. "metric_relabel_configs": nil,
  254. },
  255. }},
  256. },
  257. "/jobs/job1/targets": {
  258. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  259. {Targets: []string{"localhost:9090", "10.0.10.3:9100", "10.0.10.4:9100", "10.0.10.5:9100"},
  260. Labels: map[model.LabelName]model.LabelValue{
  261. "__meta_datacenter": "london",
  262. "__meta_prometheus_job": "node",
  263. }},
  264. }},
  265. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  266. {Targets: []string{"localhost:9090"},
  267. Labels: map[model.LabelName]model.LabelValue{
  268. "__meta_datacenter": "london",
  269. "__meta_prometheus_job": "node",
  270. "test": "aTest",
  271. }},
  272. }},
  273. },
  274. "/jobs/job2/targets": {
  275. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  276. {Targets: []string{"10.0.40.3:9100"},
  277. Labels: map[model.LabelName]model.LabelValue{
  278. "__meta_datacenter": "london",
  279. "__meta_prometheus_job": "alertmanager",
  280. }},
  281. }},
  282. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  283. {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"},
  284. Labels: map[model.LabelName]model.LabelValue{
  285. "__meta_datacenter": "london",
  286. }},
  287. }},
  288. },
  289. },
  290. },
  291. cfg: &Config{
  292. PrometheusConfig: &promConfig.Config{},
  293. TargetAllocator: &targetAllocator{
  294. Interval: 10 * time.Second,
  295. CollectorID: "collector-1",
  296. HTTPSDConfig: &promHTTP.SDConfig{
  297. HTTPClientConfig: commonconfig.HTTPClientConfig{},
  298. RefreshInterval: model.Duration(60 * time.Second),
  299. },
  300. },
  301. },
  302. want: expectedTestResult{
  303. empty: false,
  304. jobMap: map[string]expectedTestResultJobMap{
  305. "job1": {
  306. Targets: []string{"localhost:9090"},
  307. Labels: map[model.LabelName]model.LabelValue{
  308. "__meta_datacenter": "london",
  309. "__meta_prometheus_job": "node",
  310. "test": "aTest",
  311. },
  312. },
  313. "job2": {Targets: []string{"10.0.40.2:9100", "10.0.40.3:9100"},
  314. Labels: map[model.LabelName]model.LabelValue{
  315. "__meta_datacenter": "london",
  316. }},
  317. },
  318. },
  319. },
  320. {
  321. desc: "update job list",
  322. responses: Responses{
  323. releaserMap: map[string]int{
  324. "/scrape_configs": 1,
  325. },
  326. responses: map[string][]mockTargetAllocatorResponseRaw{
  327. "/scrape_configs": {
  328. mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]any{
  329. "job1": {
  330. "job_name": "job1",
  331. "scrape_interval": "30s",
  332. "scrape_timeout": "30s",
  333. "metrics_path": "/metrics",
  334. "scheme": "http",
  335. "relabel_configs": nil,
  336. "metric_relabel_configs": nil,
  337. },
  338. "job2": {
  339. "job_name": "job2",
  340. "scrape_interval": "30s",
  341. "scrape_timeout": "30s",
  342. "metrics_path": "/metrics",
  343. "scheme": "http",
  344. "relabel_configs": nil,
  345. "metric_relabel_configs": nil,
  346. },
  347. }},
  348. mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]any{
  349. "job1": {
  350. "job_name": "job1",
  351. "scrape_interval": "30s",
  352. "scrape_timeout": "30s",
  353. "metrics_path": "/metrics",
  354. "scheme": "http",
  355. "relabel_configs": nil,
  356. "metric_relabel_configs": nil,
  357. },
  358. "job3": {
  359. "job_name": "job3",
  360. "scrape_interval": "30s",
  361. "scrape_timeout": "30s",
  362. "metrics_path": "/metrics",
  363. "scheme": "http",
  364. "relabel_configs": nil,
  365. "metric_relabel_configs": nil,
  366. },
  367. }},
  368. },
  369. "/jobs/job1/targets": {
  370. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  371. {Targets: []string{"localhost:9090"},
  372. Labels: map[model.LabelName]model.LabelValue{
  373. "__meta_datacenter": "london",
  374. "__meta_prometheus_job": "node",
  375. }},
  376. }},
  377. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  378. {Targets: []string{"localhost:9090"},
  379. Labels: map[model.LabelName]model.LabelValue{
  380. "__meta_datacenter": "london",
  381. "__meta_prometheus_job": "node",
  382. }},
  383. }},
  384. },
  385. "/jobs/job3/targets": {
  386. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  387. {Targets: []string{"10.0.40.3:9100"},
  388. Labels: map[model.LabelName]model.LabelValue{
  389. "__meta_datacenter": "london",
  390. "__meta_prometheus_job": "alertmanager",
  391. }},
  392. }},
  393. mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{
  394. {Targets: []string{"10.0.40.3:9100"},
  395. Labels: map[model.LabelName]model.LabelValue{
  396. "__meta_datacenter": "london",
  397. "__meta_prometheus_job": "alertmanager",
  398. }},
  399. }},
  400. },
  401. },
  402. },
  403. cfg: &Config{
  404. PrometheusConfig: &promConfig.Config{},
  405. TargetAllocator: &targetAllocator{
  406. Interval: 10 * time.Second,
  407. CollectorID: "collector-1",
  408. HTTPSDConfig: &promHTTP.SDConfig{
  409. HTTPClientConfig: commonconfig.HTTPClientConfig{},
  410. RefreshInterval: model.Duration(60 * time.Second),
  411. },
  412. },
  413. },
  414. want: expectedTestResult{
  415. empty: false,
  416. jobMap: map[string]expectedTestResultJobMap{
  417. "job1": {
  418. Targets: []string{"localhost:9090"},
  419. Labels: map[model.LabelName]model.LabelValue{
  420. "__meta_datacenter": "london",
  421. "__meta_prometheus_job": "node",
  422. },
  423. },
  424. "job3": {Targets: []string{"10.0.40.3:9100"},
  425. Labels: map[model.LabelName]model.LabelValue{
  426. "__meta_datacenter": "london",
  427. "__meta_prometheus_job": "alertmanager",
  428. }},
  429. },
  430. },
  431. },
  432. {
  433. desc: "endpoint is not reachable",
  434. responses: Responses{
  435. releaserMap: map[string]int{
  436. "/scrape_configs": 1, // we are too fast if we ignore the first wait a tick
  437. },
  438. responses: map[string][]mockTargetAllocatorResponseRaw{
  439. "/scrape_configs": {
  440. mockTargetAllocatorResponseRaw{code: 404, data: map[string]map[string]any{}},
  441. mockTargetAllocatorResponseRaw{code: 404, data: map[string]map[string]any{}},
  442. },
  443. },
  444. },
  445. cfg: &Config{
  446. PrometheusConfig: &promConfig.Config{},
  447. TargetAllocator: &targetAllocator{
  448. Interval: 50 * time.Millisecond,
  449. CollectorID: "collector-1",
  450. HTTPSDConfig: &promHTTP.SDConfig{
  451. HTTPClientConfig: commonconfig.HTTPClientConfig{},
  452. RefreshInterval: model.Duration(60 * time.Second),
  453. },
  454. },
  455. },
  456. want: expectedTestResult{
  457. empty: true,
  458. jobMap: map[string]expectedTestResultJobMap{},
  459. },
  460. },
  461. } {
  462. t.Run(tc.desc, func(t *testing.T) {
  463. ctx := context.Background()
  464. cms := new(consumertest.MetricsSink)
  465. allocator, err := setupMockTargetAllocator(tc.responses)
  466. require.NoError(t, err, "Failed to create allocator", tc.responses)
  467. allocator.Start()
  468. defer allocator.Stop()
  469. tc.cfg.TargetAllocator.Endpoint = allocator.srv.URL // set service URL with the automatic generated one
  470. receiver := newPrometheusReceiver(receivertest.NewNopCreateSettings(), tc.cfg, cms)
  471. require.NoError(t, receiver.Start(ctx, componenttest.NewNopHost()))
  472. allocator.wg.Wait()
  473. providers := receiver.discoveryManager.Providers()
  474. if tc.want.empty {
  475. // if no base config is supplied and the job retrieval fails then no configuration should be found
  476. require.Len(t, providers, 0)
  477. return
  478. }
  479. require.NotNil(t, providers)
  480. for _, provider := range providers {
  481. require.IsType(t, &promHTTP.Discovery{}, provider.Discoverer())
  482. httpDiscovery := provider.Discoverer().(*promHTTP.Discovery)
  483. refresh, err := httpDiscovery.Refresh(ctx)
  484. require.NoError(t, err)
  485. // are http configs applied?
  486. sdConfig := provider.Config().(*promHTTP.SDConfig)
  487. require.Equal(t, tc.cfg.TargetAllocator.HTTPSDConfig.HTTPClientConfig, sdConfig.HTTPClientConfig)
  488. for _, group := range refresh {
  489. found := false
  490. for job, s := range tc.want.jobMap {
  491. // find correct job to compare to.
  492. if !strings.Contains(group.Source, job) {
  493. continue
  494. }
  495. // compare targets
  496. require.Equal(t, s.Targets, labelSetTargetsToList(group.Targets))
  497. // compare labels and add __meta_url as this label gets automatically added by the SD.
  498. // which is identical to the source url
  499. s.Labels["__meta_url"] = model.LabelValue(sdConfig.URL)
  500. require.Equal(t, s.Labels, group.Labels)
  501. found = true
  502. }
  503. require.True(t, found, "Returned job is not defined in expected values", group)
  504. }
  505. }
  506. })
  507. }
  508. }