metrics_receiver_helper_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package prometheusreceiver
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/binary"
  8. "fmt"
  9. "log"
  10. "math"
  11. "net/http"
  12. "net/http/httptest"
  13. "net/url"
  14. "sync"
  15. "sync/atomic"
  16. "testing"
  17. "time"
  18. gokitlog "github.com/go-kit/log"
  19. "github.com/gogo/protobuf/proto"
  20. promcfg "github.com/prometheus/prometheus/config"
  21. "github.com/prometheus/prometheus/model/labels"
  22. "github.com/prometheus/prometheus/model/value"
  23. dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
  24. "github.com/prometheus/prometheus/scrape"
  25. "github.com/stretchr/testify/assert"
  26. "github.com/stretchr/testify/require"
  27. "go.opentelemetry.io/collector/component/componenttest"
  28. "go.opentelemetry.io/collector/consumer/consumertest"
  29. "go.opentelemetry.io/collector/pdata/pcommon"
  30. "go.opentelemetry.io/collector/pdata/pmetric"
  31. "go.opentelemetry.io/collector/receiver/receivertest"
  32. "gopkg.in/yaml.v2"
  33. "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
  34. )
  35. type mockPrometheusResponse struct {
  36. code int
  37. data string
  38. useOpenMetrics bool
  39. useProtoBuf bool // This overrides data and useOpenMetrics above
  40. buf []byte
  41. }
  42. type mockPrometheus struct {
  43. mu sync.Mutex // mu protects the fields below.
  44. endpoints map[string][]mockPrometheusResponse
  45. accessIndex map[string]*atomic.Int32
  46. wg *sync.WaitGroup
  47. srv *httptest.Server
  48. }
  49. func newMockPrometheus(endpoints map[string][]mockPrometheusResponse) *mockPrometheus {
  50. accessIndex := make(map[string]*atomic.Int32)
  51. wg := &sync.WaitGroup{}
  52. wg.Add(len(endpoints))
  53. for k := range endpoints {
  54. accessIndex[k] = &atomic.Int32{}
  55. }
  56. mp := &mockPrometheus{
  57. wg: wg,
  58. accessIndex: accessIndex,
  59. endpoints: endpoints,
  60. }
  61. srv := httptest.NewServer(mp)
  62. mp.srv = srv
  63. return mp
  64. }
  65. func (mp *mockPrometheus) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
  66. mp.mu.Lock()
  67. defer mp.mu.Unlock()
  68. iptr, ok := mp.accessIndex[req.URL.Path]
  69. if !ok {
  70. rw.WriteHeader(404)
  71. return
  72. }
  73. index := int(iptr.Load())
  74. iptr.Add(1)
  75. pages := mp.endpoints[req.URL.Path]
  76. if index >= len(pages) {
  77. if index == len(pages) {
  78. mp.wg.Done()
  79. }
  80. rw.WriteHeader(404)
  81. return
  82. }
  83. switch {
  84. case pages[index].useProtoBuf:
  85. rw.Header().Set("Content-Type", "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited")
  86. case pages[index].useOpenMetrics:
  87. rw.Header().Set("Content-Type", "application/openmetrics-text")
  88. }
  89. rw.WriteHeader(pages[index].code)
  90. if pages[index].useProtoBuf {
  91. _, _ = rw.Write(pages[index].buf)
  92. } else {
  93. _, _ = rw.Write([]byte(pages[index].data))
  94. }
  95. }
  96. func (mp *mockPrometheus) Close() {
  97. mp.srv.Close()
  98. }
  99. // -------------------------
  100. // EndToEnd Test and related
  101. // -------------------------
  102. var (
  103. expectedScrapeMetricCount = 5
  104. expectedExtraScrapeMetricCount = 8
  105. )
  106. type testData struct {
  107. name string
  108. relabeledJob string // Used when relabeling or honor_labels changes the target to something other than 'name'.
  109. pages []mockPrometheusResponse
  110. attributes pcommon.Map
  111. validateScrapes bool
  112. normalizedName bool
  113. validateFunc func(t *testing.T, td *testData, result []pmetric.ResourceMetrics)
  114. }
  115. // setupMockPrometheus to create a mocked prometheus based on targets, returning the server and a prometheus exporting
  116. // config
  117. func setupMockPrometheus(tds ...*testData) (*mockPrometheus, *promcfg.Config, error) {
  118. jobs := make([]map[string]any, 0, len(tds))
  119. endpoints := make(map[string][]mockPrometheusResponse)
  120. metricPaths := make([]string, len(tds))
  121. for i, t := range tds {
  122. metricPath := fmt.Sprintf("/%s/metrics", t.name)
  123. endpoints[metricPath] = t.pages
  124. metricPaths[i] = metricPath
  125. }
  126. mp := newMockPrometheus(endpoints)
  127. u, _ := url.Parse(mp.srv.URL)
  128. for i := 0; i < len(tds); i++ {
  129. job := make(map[string]any)
  130. job["job_name"] = tds[i].name
  131. job["metrics_path"] = metricPaths[i]
  132. job["scrape_interval"] = "1s"
  133. job["scrape_timeout"] = "500ms"
  134. job["static_configs"] = []map[string]any{{"targets": []string{u.Host}}}
  135. jobs = append(jobs, job)
  136. }
  137. if len(jobs) != len(tds) {
  138. log.Fatal("len(jobs) != len(targets), make sure job names are unique")
  139. }
  140. configP := make(map[string]any)
  141. configP["scrape_configs"] = jobs
  142. cfg, err := yaml.Marshal(&configP)
  143. if err != nil {
  144. return mp, nil, err
  145. }
  146. // update attributes value (will use for validation)
  147. l := []labels.Label{{Name: "__scheme__", Value: "http"}}
  148. for _, t := range tds {
  149. t.attributes = internal.CreateResource(t.name, u.Host, l).Attributes()
  150. }
  151. pCfg, err := promcfg.Load(string(cfg), false, gokitlog.NewNopLogger())
  152. return mp, pCfg, err
  153. }
  154. func waitForScrapeResults(t *testing.T, targets []*testData, cms *consumertest.MetricsSink) {
  155. assert.Eventually(t, func() bool {
  156. // This is the receiver's pov as to what should have been collected from the server
  157. metrics := cms.AllMetrics()
  158. pResults := splitMetricsByTarget(metrics)
  159. for _, target := range targets {
  160. want := 0
  161. name := target.name
  162. if target.relabeledJob != "" {
  163. name = target.relabeledJob
  164. }
  165. scrapes := pResults[name]
  166. // count the number of pages we expect for a target endpoint
  167. for _, p := range target.pages {
  168. if p.code != 404 {
  169. // only count target pages that are not 404, matching mock ServerHTTP func response logic
  170. want++
  171. }
  172. }
  173. if len(scrapes) < want {
  174. // If we don't have enough scrapes yet lets return false and wait for another tick
  175. return false
  176. }
  177. }
  178. return true
  179. }, 30*time.Second, 500*time.Millisecond)
  180. }
  181. func verifyNumValidScrapeResults(t *testing.T, td *testData, resourceMetrics []pmetric.ResourceMetrics) {
  182. want := 0
  183. for _, p := range td.pages {
  184. if p.code == 200 {
  185. want++
  186. }
  187. }
  188. require.LessOrEqual(t, want, len(resourceMetrics), "want at least %d valid scrapes, but got %d", want, len(resourceMetrics))
  189. }
  190. func verifyNumTotalScrapeResults(t *testing.T, td *testData, resourceMetrics []pmetric.ResourceMetrics) {
  191. want := 0
  192. for _, p := range td.pages {
  193. if p.code == 200 || p.code == 500 {
  194. want++
  195. }
  196. }
  197. require.LessOrEqual(t, want, len(resourceMetrics), "want at least %d total scrapes, but got %d", want, len(resourceMetrics))
  198. }
  199. func getMetrics(rm pmetric.ResourceMetrics) []pmetric.Metric {
  200. var metrics []pmetric.Metric
  201. ilms := rm.ScopeMetrics()
  202. for j := 0; j < ilms.Len(); j++ {
  203. metricSlice := ilms.At(j).Metrics()
  204. for i := 0; i < metricSlice.Len(); i++ {
  205. metrics = append(metrics, metricSlice.At(i))
  206. }
  207. }
  208. return metrics
  209. }
  210. func metricsCount(resourceMetric pmetric.ResourceMetrics) int {
  211. metricsCount := 0
  212. ilms := resourceMetric.ScopeMetrics()
  213. for j := 0; j < ilms.Len(); j++ {
  214. ilm := ilms.At(j)
  215. metricsCount += ilm.Metrics().Len()
  216. }
  217. return metricsCount
  218. }
  219. func getValidScrapes(t *testing.T, rms []pmetric.ResourceMetrics, normalizedNames bool) []pmetric.ResourceMetrics {
  220. var out []pmetric.ResourceMetrics
  221. // rms will include failed scrapes and scrapes that received no metrics but have internal scrape metrics, filter those out
  222. for i := 0; i < len(rms); i++ {
  223. allMetrics := getMetrics(rms[i])
  224. if expectedScrapeMetricCount < len(allMetrics) && countScrapeMetrics(allMetrics, normalizedNames) == expectedScrapeMetricCount ||
  225. expectedExtraScrapeMetricCount < len(allMetrics) && countScrapeMetrics(allMetrics, normalizedNames) == expectedExtraScrapeMetricCount {
  226. if isFirstFailedScrape(allMetrics, normalizedNames) {
  227. continue
  228. }
  229. assertUp(t, 1, allMetrics)
  230. out = append(out, rms[i])
  231. } else {
  232. assertUp(t, 0, allMetrics)
  233. }
  234. }
  235. return out
  236. }
  237. func isFirstFailedScrape(metrics []pmetric.Metric, normalizedNames bool) bool {
  238. for _, m := range metrics {
  239. if m.Name() == "up" {
  240. if m.Gauge().DataPoints().At(0).DoubleValue() == 1 { // assumed up will not have multiple datapoints
  241. return false
  242. }
  243. }
  244. }
  245. for _, m := range metrics {
  246. if isDefaultMetrics(m, normalizedNames) || isExtraScrapeMetrics(m) {
  247. continue
  248. }
  249. switch m.Type() {
  250. case pmetric.MetricTypeGauge:
  251. for i := 0; i < m.Gauge().DataPoints().Len(); i++ {
  252. if !m.Gauge().DataPoints().At(i).Flags().NoRecordedValue() {
  253. return false
  254. }
  255. }
  256. case pmetric.MetricTypeSum:
  257. for i := 0; i < m.Sum().DataPoints().Len(); i++ {
  258. if !m.Sum().DataPoints().At(i).Flags().NoRecordedValue() {
  259. return false
  260. }
  261. }
  262. case pmetric.MetricTypeHistogram:
  263. for i := 0; i < m.Histogram().DataPoints().Len(); i++ {
  264. if !m.Histogram().DataPoints().At(i).Flags().NoRecordedValue() {
  265. return false
  266. }
  267. }
  268. case pmetric.MetricTypeSummary:
  269. for i := 0; i < m.Summary().DataPoints().Len(); i++ {
  270. if !m.Summary().DataPoints().At(i).Flags().NoRecordedValue() {
  271. return false
  272. }
  273. }
  274. case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram:
  275. }
  276. }
  277. return true
  278. }
  279. func assertUp(t *testing.T, expected float64, metrics []pmetric.Metric) {
  280. for _, m := range metrics {
  281. if m.Name() == "up" {
  282. assert.Equal(t, expected, m.Gauge().DataPoints().At(0).DoubleValue()) // (assumed up will not have multiple datapoints)
  283. return
  284. }
  285. }
  286. t.Error("No 'up' metric found")
  287. }
  288. func countScrapeMetricsRM(got pmetric.ResourceMetrics, normalizedNames bool) int {
  289. n := 0
  290. ilms := got.ScopeMetrics()
  291. for j := 0; j < ilms.Len(); j++ {
  292. ilm := ilms.At(j)
  293. for i := 0; i < ilm.Metrics().Len(); i++ {
  294. if isDefaultMetrics(ilm.Metrics().At(i), normalizedNames) {
  295. n++
  296. }
  297. }
  298. }
  299. return n
  300. }
  301. func countScrapeMetrics(metrics []pmetric.Metric, normalizedNames bool) int {
  302. n := 0
  303. for _, m := range metrics {
  304. if isDefaultMetrics(m, normalizedNames) || isExtraScrapeMetrics(m) {
  305. n++
  306. }
  307. }
  308. return n
  309. }
  310. func isDefaultMetrics(m pmetric.Metric, normalizedNames bool) bool {
  311. switch m.Name() {
  312. case "up", "scrape_samples_scraped", "scrape_samples_post_metric_relabeling", "scrape_series_added":
  313. return true
  314. // if normalizedNames is true, we expect unit `_seconds` to be trimmed.
  315. case "scrape_duration_seconds":
  316. return !normalizedNames
  317. case "scrape_duration":
  318. return normalizedNames
  319. default:
  320. }
  321. return false
  322. }
  323. func isExtraScrapeMetrics(m pmetric.Metric) bool {
  324. switch m.Name() {
  325. case "scrape_body_size_bytes", "scrape_sample_limit", "scrape_timeout_seconds":
  326. return true
  327. default:
  328. return false
  329. }
  330. }
  331. type metricTypeComparator func(*testing.T, pmetric.Metric)
  332. type numberPointComparator func(*testing.T, pmetric.NumberDataPoint)
  333. type histogramPointComparator func(*testing.T, pmetric.HistogramDataPoint)
  334. type summaryPointComparator func(*testing.T, pmetric.SummaryDataPoint)
  335. type dataPointExpectation struct {
  336. numberPointComparator []numberPointComparator
  337. histogramPointComparator []histogramPointComparator
  338. summaryPointComparator []summaryPointComparator
  339. }
  340. type testExpectation func(*testing.T, pmetric.ResourceMetrics)
  341. func doCompare(t *testing.T, name string, want pcommon.Map, got pmetric.ResourceMetrics, expectations []testExpectation) {
  342. doCompareNormalized(t, name, want, got, expectations, false)
  343. }
  344. func doCompareNormalized(t *testing.T, name string, want pcommon.Map, got pmetric.ResourceMetrics, expectations []testExpectation, normalizedNames bool) {
  345. t.Run(name, func(t *testing.T) {
  346. assert.Equal(t, expectedScrapeMetricCount, countScrapeMetricsRM(got, normalizedNames))
  347. assert.Equal(t, want.Len(), got.Resource().Attributes().Len())
  348. for k, v := range want.AsRaw() {
  349. val, ok := got.Resource().Attributes().Get(k)
  350. assert.True(t, ok, "%q attribute is missing", k)
  351. if ok {
  352. assert.EqualValues(t, v, val.AsString())
  353. }
  354. }
  355. for _, e := range expectations {
  356. e(t, got)
  357. }
  358. })
  359. }
  360. func assertMetricPresent(name string, metricTypeExpectations metricTypeComparator, metricUnitExpectations metricTypeComparator, dataPointExpectations []dataPointExpectation) testExpectation {
  361. return func(t *testing.T, rm pmetric.ResourceMetrics) {
  362. allMetrics := getMetrics(rm)
  363. var present bool
  364. for _, m := range allMetrics {
  365. if name != m.Name() {
  366. continue
  367. }
  368. present = true
  369. metricTypeExpectations(t, m)
  370. metricUnitExpectations(t, m)
  371. for i, de := range dataPointExpectations {
  372. switch m.Type() {
  373. case pmetric.MetricTypeGauge:
  374. for _, npc := range de.numberPointComparator {
  375. require.Equal(t, m.Gauge().DataPoints().Len(), len(dataPointExpectations), "Expected number of data-points in Gauge metric '%s' does not match to testdata", name)
  376. npc(t, m.Gauge().DataPoints().At(i))
  377. }
  378. case pmetric.MetricTypeSum:
  379. for _, npc := range de.numberPointComparator {
  380. require.Equal(t, m.Sum().DataPoints().Len(), len(dataPointExpectations), "Expected number of data-points in Sum metric '%s' does not match to testdata", name)
  381. npc(t, m.Sum().DataPoints().At(i))
  382. }
  383. case pmetric.MetricTypeHistogram:
  384. for _, hpc := range de.histogramPointComparator {
  385. require.Equal(t, m.Histogram().DataPoints().Len(), len(dataPointExpectations), "Expected number of data-points in Histogram metric '%s' does not match to testdata", name)
  386. hpc(t, m.Histogram().DataPoints().At(i))
  387. }
  388. case pmetric.MetricTypeSummary:
  389. for _, spc := range de.summaryPointComparator {
  390. require.Equal(t, m.Summary().DataPoints().Len(), len(dataPointExpectations), "Expected number of data-points in Summary metric '%s' does not match to testdata", name)
  391. spc(t, m.Summary().DataPoints().At(i))
  392. }
  393. case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram:
  394. }
  395. }
  396. }
  397. require.True(t, present, "expected metric '%s' is not present", name)
  398. }
  399. }
  400. func assertMetricAbsent(name string) testExpectation {
  401. return func(t *testing.T, rm pmetric.ResourceMetrics) {
  402. allMetrics := getMetrics(rm)
  403. for _, m := range allMetrics {
  404. assert.NotEqual(t, name, m.Name(), "Metric is present, but was expected absent")
  405. }
  406. }
  407. }
  408. func compareMetricType(typ pmetric.MetricType) metricTypeComparator {
  409. return func(t *testing.T, metric pmetric.Metric) {
  410. assert.Equal(t, typ.String(), metric.Type().String(), "Metric type does not match")
  411. }
  412. }
  413. func compareMetricUnit(unit string) metricTypeComparator {
  414. return func(t *testing.T, metric pmetric.Metric) {
  415. assert.Equal(t, unit, metric.Unit(), "Metric unit does not match")
  416. }
  417. }
  418. func compareMetricIsMonotonic(isMonotonic bool) metricTypeComparator {
  419. return func(t *testing.T, metric pmetric.Metric) {
  420. assert.Equal(t, pmetric.MetricTypeSum.String(), metric.Type().String(), "IsMonotonic only exists for sums")
  421. assert.Equal(t, isMonotonic, metric.Sum().IsMonotonic(), "IsMonotonic does not match")
  422. }
  423. }
  424. func compareAttributes(attributes map[string]string) numberPointComparator {
  425. return func(t *testing.T, numberDataPoint pmetric.NumberDataPoint) {
  426. req := assert.Equal(t, len(attributes), numberDataPoint.Attributes().Len(), "Attributes length do not match")
  427. if req {
  428. for k, v := range attributes {
  429. val, ok := numberDataPoint.Attributes().Get(k)
  430. require.True(t, ok)
  431. assert.Equal(t, v, val.AsString(), "Attributes do not match")
  432. }
  433. }
  434. }
  435. }
  436. func compareSummaryAttributes(attributes map[string]string) summaryPointComparator {
  437. return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) {
  438. req := assert.Equal(t, len(attributes), summaryDataPoint.Attributes().Len(), "Summary attributes length do not match")
  439. if req {
  440. for k, v := range attributes {
  441. val, ok := summaryDataPoint.Attributes().Get(k)
  442. require.True(t, ok)
  443. assert.Equal(t, v, val.AsString(), "Summary attributes value do not match")
  444. }
  445. }
  446. }
  447. }
  448. func assertAttributesAbsent() numberPointComparator {
  449. return func(t *testing.T, numberDataPoint pmetric.NumberDataPoint) {
  450. assert.Equal(t, 0, numberDataPoint.Attributes().Len(), "Attributes length should be 0")
  451. }
  452. }
  453. func compareHistogramAttributes(attributes map[string]string) histogramPointComparator {
  454. return func(t *testing.T, histogramDataPoint pmetric.HistogramDataPoint) {
  455. req := assert.Equal(t, len(attributes), histogramDataPoint.Attributes().Len(), "Histogram attributes length do not match")
  456. if req {
  457. for k, v := range attributes {
  458. val, ok := histogramDataPoint.Attributes().Get(k)
  459. require.True(t, ok)
  460. assert.Equal(t, v, val.AsString(), "Histogram attributes value do not match")
  461. }
  462. }
  463. }
  464. }
  465. func assertNumberPointFlagNoRecordedValue() numberPointComparator {
  466. return func(t *testing.T, numberDataPoint pmetric.NumberDataPoint) {
  467. assert.True(t, numberDataPoint.Flags().NoRecordedValue(),
  468. "Datapoint flag for staleness marker not found as expected")
  469. }
  470. }
  471. func assertHistogramPointFlagNoRecordedValue() histogramPointComparator {
  472. return func(t *testing.T, histogramDataPoint pmetric.HistogramDataPoint) {
  473. assert.True(t, histogramDataPoint.Flags().NoRecordedValue(),
  474. "Datapoint flag for staleness marker not found as expected")
  475. }
  476. }
  477. func assertSummaryPointFlagNoRecordedValue() summaryPointComparator {
  478. return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) {
  479. assert.True(t, summaryDataPoint.Flags().NoRecordedValue(),
  480. "Datapoint flag for staleness marker not found as expected")
  481. }
  482. }
  483. func compareStartTimestamp(startTimeStamp pcommon.Timestamp) numberPointComparator {
  484. return func(t *testing.T, numberDataPoint pmetric.NumberDataPoint) {
  485. assert.Equal(t, startTimeStamp.String(), numberDataPoint.StartTimestamp().String(), "Start-Timestamp does not match")
  486. }
  487. }
  488. func compareTimestamp(timeStamp pcommon.Timestamp) numberPointComparator {
  489. return func(t *testing.T, numberDataPoint pmetric.NumberDataPoint) {
  490. assert.Equal(t, timeStamp.String(), numberDataPoint.Timestamp().String(), "Timestamp does not match")
  491. }
  492. }
  493. func compareHistogramTimestamp(timeStamp pcommon.Timestamp) histogramPointComparator {
  494. return func(t *testing.T, histogramDataPoint pmetric.HistogramDataPoint) {
  495. assert.Equal(t, timeStamp.String(), histogramDataPoint.Timestamp().String(), "Histogram Timestamp does not match")
  496. }
  497. }
  498. func compareHistogramStartTimestamp(timeStamp pcommon.Timestamp) histogramPointComparator {
  499. return func(t *testing.T, histogramDataPoint pmetric.HistogramDataPoint) {
  500. assert.Equal(t, timeStamp.String(), histogramDataPoint.StartTimestamp().String(), "Histogram Start-Timestamp does not match")
  501. }
  502. }
  503. func compareSummaryTimestamp(timeStamp pcommon.Timestamp) summaryPointComparator {
  504. return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) {
  505. assert.Equal(t, timeStamp.String(), summaryDataPoint.Timestamp().String(), "Summary Timestamp does not match")
  506. }
  507. }
  508. func compareSummaryStartTimestamp(timeStamp pcommon.Timestamp) summaryPointComparator {
  509. return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) {
  510. assert.Equal(t, timeStamp.String(), summaryDataPoint.StartTimestamp().String(), "Summary Start-Timestamp does not match")
  511. }
  512. }
  513. func compareDoubleValue(doubleVal float64) numberPointComparator {
  514. return func(t *testing.T, numberDataPoint pmetric.NumberDataPoint) {
  515. assert.Equal(t, doubleVal, numberDataPoint.DoubleValue(), "Metric double value does not match")
  516. }
  517. }
  518. func assertNormalNan() numberPointComparator {
  519. return func(t *testing.T, numberDataPoint pmetric.NumberDataPoint) {
  520. assert.True(t, math.Float64bits(numberDataPoint.DoubleValue()) == value.NormalNaN,
  521. "Metric double value is not normalNaN as expected")
  522. }
  523. }
  524. func compareHistogram(count uint64, sum float64, upperBounds []float64, buckets []uint64) histogramPointComparator {
  525. return func(t *testing.T, histogramDataPoint pmetric.HistogramDataPoint) {
  526. assert.Equal(t, count, histogramDataPoint.Count(), "Histogram count value does not match")
  527. assert.Equal(t, sum, histogramDataPoint.Sum(), "Histogram sum value does not match")
  528. assert.Equal(t, upperBounds, histogramDataPoint.ExplicitBounds().AsRaw(), "Histogram upper bounds values do not match")
  529. assert.Equal(t, buckets, histogramDataPoint.BucketCounts().AsRaw(), "Histogram bucket count values do not match")
  530. }
  531. }
  532. func compareSummary(count uint64, sum float64, quantiles [][]float64) summaryPointComparator {
  533. return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) {
  534. assert.Equal(t, count, summaryDataPoint.Count(), "Summary count value does not match")
  535. assert.Equal(t, sum, summaryDataPoint.Sum(), "Summary sum value does not match")
  536. req := assert.Equal(t, len(quantiles), summaryDataPoint.QuantileValues().Len())
  537. if req {
  538. for i := 0; i < summaryDataPoint.QuantileValues().Len(); i++ {
  539. assert.Equal(t, quantiles[i][0], summaryDataPoint.QuantileValues().At(i).Quantile(),
  540. "Summary quantile do not match")
  541. if math.IsNaN(quantiles[i][1]) {
  542. assert.True(t, math.Float64bits(summaryDataPoint.QuantileValues().At(i).Value()) == value.NormalNaN,
  543. "Summary quantile value is not normalNaN as expected")
  544. } else {
  545. assert.Equal(t, quantiles[i][1], summaryDataPoint.QuantileValues().At(i).Value(),
  546. "Summary quantile values do not match")
  547. }
  548. }
  549. }
  550. }
  551. }
  552. // starts prometheus receiver with custom config, retrieves metrics from MetricsSink
  553. func testComponent(t *testing.T, targets []*testData, alterConfig func(*Config), cfgMuts ...func(*promcfg.Config)) {
  554. ctx := context.Background()
  555. mp, cfg, err := setupMockPrometheus(targets...)
  556. for _, cfgMut := range cfgMuts {
  557. cfgMut(cfg)
  558. }
  559. require.Nilf(t, err, "Failed to create Prometheus config: %v", err)
  560. defer mp.Close()
  561. config := &Config{
  562. PrometheusConfig: cfg,
  563. StartTimeMetricRegex: "",
  564. }
  565. if alterConfig != nil {
  566. alterConfig(config)
  567. }
  568. cms := new(consumertest.MetricsSink)
  569. receiver := newPrometheusReceiver(receivertest.NewNopCreateSettings(), config, cms)
  570. require.NoError(t, receiver.Start(ctx, componenttest.NewNopHost()))
  571. // verify state after shutdown is called
  572. t.Cleanup(func() {
  573. // verify state after shutdown is called
  574. assert.Lenf(t, flattenTargets(receiver.scrapeManager.TargetsAll()), len(targets), "expected %v targets to be running", len(targets))
  575. require.NoError(t, receiver.Shutdown(context.Background()))
  576. assert.Len(t, flattenTargets(receiver.scrapeManager.TargetsAll()), 0, "expected scrape manager to have no targets")
  577. })
  578. // waitgroup Wait() is strictly from a server POV indicating the sufficient number and type of requests have been seen
  579. mp.wg.Wait()
  580. // Note:waitForScrapeResult is an attempt to address a possible race between waitgroup Done() being called in the ServerHTTP function
  581. // and when the receiver actually processes the http request responses into metrics.
  582. // this is a eventually timeout,tick that just waits for some condition.
  583. // however the condition to wait for may be suboptimal and may need to be adjusted.
  584. waitForScrapeResults(t, targets, cms)
  585. // This begins the processing of the scrapes collected by the receiver
  586. metrics := cms.AllMetrics()
  587. // split and store results by target name
  588. pResults := splitMetricsByTarget(metrics)
  589. lres, lep := len(pResults), len(mp.endpoints)
  590. // There may be an additional scrape entry between when the mock server provided
  591. // all responses and when we capture the metrics. It will be ignored later.
  592. assert.GreaterOrEqualf(t, lep, lres, "want at least %d targets, but got %v\n", lep, lres)
  593. // loop to validate outputs for each targets
  594. // Stop once we have evaluated all expected results, any others are superfluous.
  595. for _, target := range targets[:lep] {
  596. t.Run(target.name, func(t *testing.T) {
  597. name := target.name
  598. if target.relabeledJob != "" {
  599. name = target.relabeledJob
  600. }
  601. scrapes := pResults[name]
  602. if !target.validateScrapes {
  603. scrapes = getValidScrapes(t, pResults[name], target.normalizedName)
  604. }
  605. target.validateFunc(t, target, scrapes)
  606. })
  607. }
  608. }
  609. // flattenTargets takes a map of jobs to target and flattens to a list of targets
  610. func flattenTargets(targets map[string][]*scrape.Target) []*scrape.Target {
  611. var flatTargets []*scrape.Target
  612. for _, target := range targets {
  613. flatTargets = append(flatTargets, target...)
  614. }
  615. return flatTargets
  616. }
  617. func splitMetricsByTarget(metrics []pmetric.Metrics) map[string][]pmetric.ResourceMetrics {
  618. pResults := make(map[string][]pmetric.ResourceMetrics)
  619. for _, md := range metrics {
  620. rms := md.ResourceMetrics()
  621. for i := 0; i < rms.Len(); i++ {
  622. name, _ := rms.At(i).Resource().Attributes().Get("service.name")
  623. pResults[name.AsString()] = append(pResults[name.AsString()], rms.At(i))
  624. }
  625. }
  626. return pResults
  627. }
  628. func getTS(ms pmetric.MetricSlice) pcommon.Timestamp {
  629. if ms.Len() == 0 {
  630. return 0
  631. }
  632. m := ms.At(0)
  633. switch m.Type() {
  634. case pmetric.MetricTypeGauge:
  635. return m.Gauge().DataPoints().At(0).Timestamp()
  636. case pmetric.MetricTypeSum:
  637. return m.Sum().DataPoints().At(0).Timestamp()
  638. case pmetric.MetricTypeHistogram:
  639. return m.Histogram().DataPoints().At(0).Timestamp()
  640. case pmetric.MetricTypeSummary:
  641. return m.Summary().DataPoints().At(0).Timestamp()
  642. case pmetric.MetricTypeExponentialHistogram:
  643. return m.ExponentialHistogram().DataPoints().At(0).Timestamp()
  644. case pmetric.MetricTypeEmpty:
  645. }
  646. return 0
  647. }
  648. func prometheusMetricFamilyToProtoBuf(t *testing.T, buffer *bytes.Buffer, metricFamily *dto.MetricFamily) *bytes.Buffer {
  649. if buffer == nil {
  650. buffer = &bytes.Buffer{}
  651. }
  652. data, err := proto.Marshal(metricFamily)
  653. require.NoError(t, err)
  654. varintBuf := make([]byte, binary.MaxVarintLen32)
  655. varintLength := binary.PutUvarint(varintBuf, uint64(len(data)))
  656. _, err = buffer.Write(varintBuf[:varintLength])
  657. require.NoError(t, err)
  658. _, err = buffer.Write(data)
  659. require.NoError(t, err)
  660. return buffer
  661. }