exporter_test.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package prometheusremotewriteexporter
  4. import (
  5. "context"
  6. "io"
  7. "net/http"
  8. "net/http/httptest"
  9. "net/url"
  10. "sync"
  11. "testing"
  12. "time"
  13. "github.com/gogo/protobuf/proto"
  14. "github.com/golang/snappy"
  15. "github.com/prometheus/prometheus/model/value"
  16. "github.com/prometheus/prometheus/prompb"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. "go.opentelemetry.io/collector/component"
  20. "go.opentelemetry.io/collector/component/componenttest"
  21. "go.opentelemetry.io/collector/config/confighttp"
  22. "go.opentelemetry.io/collector/config/configtls"
  23. "go.opentelemetry.io/collector/consumer/consumererror"
  24. "go.opentelemetry.io/collector/exporter"
  25. "go.opentelemetry.io/collector/exporter/exporterhelper"
  26. "go.opentelemetry.io/collector/exporter/exportertest"
  27. "go.opentelemetry.io/collector/pdata/pmetric"
  28. "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
  29. )
  30. // Test_NewPRWExporter checks that a new exporter instance with non-nil fields is initialized
  31. func Test_NewPRWExporter(t *testing.T) {
  32. cfg := &Config{
  33. TimeoutSettings: exporterhelper.TimeoutSettings{},
  34. RetrySettings: exporterhelper.RetrySettings{},
  35. Namespace: "",
  36. ExternalLabels: map[string]string{},
  37. HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: ""},
  38. TargetInfo: &TargetInfo{
  39. Enabled: true,
  40. },
  41. CreatedMetric: &CreatedMetric{
  42. Enabled: false,
  43. },
  44. }
  45. buildInfo := component.BuildInfo{
  46. Description: "OpenTelemetry Collector",
  47. Version: "1.0",
  48. }
  49. set := exportertest.NewNopCreateSettings()
  50. set.BuildInfo = buildInfo
  51. tests := []struct {
  52. name string
  53. config *Config
  54. namespace string
  55. endpoint string
  56. concurrency int
  57. externalLabels map[string]string
  58. returnErrorOnCreate bool
  59. set exporter.CreateSettings
  60. }{
  61. {
  62. name: "invalid_URL",
  63. config: cfg,
  64. namespace: "test",
  65. endpoint: "invalid URL",
  66. concurrency: 5,
  67. externalLabels: map[string]string{"Key1": "Val1"},
  68. returnErrorOnCreate: true,
  69. set: set,
  70. },
  71. {
  72. name: "invalid_labels_case",
  73. config: cfg,
  74. namespace: "test",
  75. endpoint: "http://some.url:9411/api/prom/push",
  76. concurrency: 5,
  77. externalLabels: map[string]string{"Key1": ""},
  78. returnErrorOnCreate: true,
  79. set: set,
  80. },
  81. {
  82. name: "success_case",
  83. config: cfg,
  84. namespace: "test",
  85. endpoint: "http://some.url:9411/api/prom/push",
  86. concurrency: 5,
  87. externalLabels: map[string]string{"Key1": "Val1"},
  88. set: set,
  89. },
  90. {
  91. name: "success_case_no_labels",
  92. config: cfg,
  93. namespace: "test",
  94. endpoint: "http://some.url:9411/api/prom/push",
  95. concurrency: 5,
  96. externalLabels: map[string]string{},
  97. set: set,
  98. },
  99. }
  100. for _, tt := range tests {
  101. t.Run(tt.name, func(t *testing.T) {
  102. cfg.HTTPClientSettings.Endpoint = tt.endpoint
  103. cfg.ExternalLabels = tt.externalLabels
  104. cfg.Namespace = tt.namespace
  105. cfg.RemoteWriteQueue.NumConsumers = 1
  106. prwe, err := newPRWExporter(cfg, tt.set)
  107. if tt.returnErrorOnCreate {
  108. assert.Error(t, err)
  109. return
  110. }
  111. assert.NoError(t, err)
  112. require.NotNil(t, prwe)
  113. assert.NotNil(t, prwe.exporterSettings)
  114. assert.NotNil(t, prwe.endpointURL)
  115. assert.NotNil(t, prwe.closeChan)
  116. assert.NotNil(t, prwe.wg)
  117. assert.NotNil(t, prwe.userAgentHeader)
  118. assert.NotNil(t, prwe.clientSettings)
  119. })
  120. }
  121. }
  122. // Test_Start checks if the client is properly created as expected.
  123. func Test_Start(t *testing.T) {
  124. cfg := &Config{
  125. TimeoutSettings: exporterhelper.TimeoutSettings{},
  126. RetrySettings: exporterhelper.RetrySettings{},
  127. MaxBatchSizeBytes: 3000000,
  128. Namespace: "",
  129. ExternalLabels: map[string]string{},
  130. TargetInfo: &TargetInfo{
  131. Enabled: true,
  132. },
  133. CreatedMetric: &CreatedMetric{
  134. Enabled: false,
  135. },
  136. }
  137. buildInfo := component.BuildInfo{
  138. Description: "OpenTelemetry Collector",
  139. Version: "1.0",
  140. }
  141. set := exportertest.NewNopCreateSettings()
  142. set.BuildInfo = buildInfo
  143. tests := []struct {
  144. name string
  145. config *Config
  146. namespace string
  147. concurrency int
  148. externalLabels map[string]string
  149. returnErrorOnStartUp bool
  150. set exporter.CreateSettings
  151. endpoint string
  152. clientSettings confighttp.HTTPClientSettings
  153. }{
  154. {
  155. name: "success_case",
  156. config: cfg,
  157. namespace: "test",
  158. concurrency: 5,
  159. externalLabels: map[string]string{"Key1": "Val1"},
  160. set: set,
  161. clientSettings: confighttp.HTTPClientSettings{Endpoint: "https://some.url:9411/api/prom/push"},
  162. },
  163. {
  164. name: "invalid_tls",
  165. config: cfg,
  166. namespace: "test",
  167. concurrency: 5,
  168. externalLabels: map[string]string{"Key1": "Val1"},
  169. set: set,
  170. returnErrorOnStartUp: true,
  171. clientSettings: confighttp.HTTPClientSettings{
  172. Endpoint: "https://some.url:9411/api/prom/push",
  173. TLSSetting: configtls.TLSClientSetting{
  174. TLSSetting: configtls.TLSSetting{
  175. CAFile: "non-existent file",
  176. CertFile: "",
  177. KeyFile: "",
  178. },
  179. Insecure: false,
  180. ServerName: "",
  181. },
  182. },
  183. },
  184. }
  185. for _, tt := range tests {
  186. t.Run(tt.name, func(t *testing.T) {
  187. cfg.ExternalLabels = tt.externalLabels
  188. cfg.Namespace = tt.namespace
  189. cfg.RemoteWriteQueue.NumConsumers = 1
  190. cfg.HTTPClientSettings = tt.clientSettings
  191. prwe, err := newPRWExporter(cfg, tt.set)
  192. assert.NoError(t, err)
  193. assert.NotNil(t, prwe)
  194. err = prwe.Start(context.Background(), componenttest.NewNopHost())
  195. if tt.returnErrorOnStartUp {
  196. assert.Error(t, err)
  197. return
  198. }
  199. assert.NotNil(t, prwe.client)
  200. })
  201. }
  202. }
  203. // Test_Shutdown checks after Shutdown is called, incoming calls to PushMetrics return error.
  204. func Test_Shutdown(t *testing.T) {
  205. prwe := &prwExporter{
  206. wg: new(sync.WaitGroup),
  207. closeChan: make(chan struct{}),
  208. }
  209. wg := new(sync.WaitGroup)
  210. err := prwe.Shutdown(context.Background())
  211. require.NoError(t, err)
  212. errChan := make(chan error, 5)
  213. for i := 0; i < 5; i++ {
  214. wg.Add(1)
  215. go func() {
  216. defer wg.Done()
  217. errChan <- prwe.PushMetrics(context.Background(), pmetric.NewMetrics())
  218. }()
  219. }
  220. wg.Wait()
  221. close(errChan)
  222. for ok := range errChan {
  223. assert.Error(t, ok)
  224. }
  225. }
  226. // Test whether or not the Server receives the correct TimeSeries.
  227. // Currently considering making this test an iterative for loop of multiple TimeSeries much akin to Test_PushMetrics
  228. func Test_export(t *testing.T) {
  229. // First we will instantiate a dummy TimeSeries instance to pass into both the export call and compare the http request
  230. labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22)
  231. sample1 := getSample(floatVal1, msTime1)
  232. sample2 := getSample(floatVal2, msTime2)
  233. ts1 := getTimeSeries(labels, sample1, sample2)
  234. handleFunc := func(w http.ResponseWriter, r *http.Request, code int) {
  235. // The following is a handler function that reads the sent httpRequest, unmarshal, and checks if the WriteRequest
  236. // preserves the TimeSeries data correctly
  237. body, err := io.ReadAll(r.Body)
  238. if err != nil {
  239. t.Fatal(err)
  240. }
  241. require.NotNil(t, body)
  242. // Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
  243. assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
  244. assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
  245. assert.Equal(t, "opentelemetry-collector/1.0", r.Header.Get("User-Agent"))
  246. writeReq := &prompb.WriteRequest{}
  247. var unzipped []byte
  248. dest, err := snappy.Decode(unzipped, body)
  249. require.NoError(t, err)
  250. ok := proto.Unmarshal(dest, writeReq)
  251. require.NoError(t, ok)
  252. assert.EqualValues(t, 1, len(writeReq.Timeseries))
  253. require.NotNil(t, writeReq.GetTimeseries())
  254. assert.Equal(t, *ts1, writeReq.GetTimeseries()[0])
  255. w.WriteHeader(code)
  256. }
  257. // Create in test table format to check if different HTTP response codes or server errors
  258. // are properly identified
  259. tests := []struct {
  260. name string
  261. ts prompb.TimeSeries
  262. serverUp bool
  263. httpResponseCode int
  264. returnErrorOnCreate bool
  265. }{
  266. {"success_case",
  267. *ts1,
  268. true,
  269. http.StatusAccepted,
  270. false,
  271. },
  272. {
  273. "server_no_response_case",
  274. *ts1,
  275. false,
  276. http.StatusAccepted,
  277. true,
  278. }, {
  279. "error_status_code_case",
  280. *ts1,
  281. true,
  282. http.StatusForbidden,
  283. true,
  284. },
  285. }
  286. for _, tt := range tests {
  287. t.Run(tt.name, func(t *testing.T) {
  288. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  289. if handleFunc != nil {
  290. handleFunc(w, r, tt.httpResponseCode)
  291. }
  292. }))
  293. defer server.Close()
  294. serverURL, uErr := url.Parse(server.URL)
  295. assert.NoError(t, uErr)
  296. if !tt.serverUp {
  297. server.Close()
  298. }
  299. err := runExportPipeline(ts1, serverURL)
  300. if tt.returnErrorOnCreate {
  301. assert.Error(t, err)
  302. return
  303. }
  304. assert.NoError(t, err)
  305. })
  306. }
  307. }
  308. func TestNoMetricsNoError(t *testing.T) {
  309. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  310. w.WriteHeader(http.StatusAccepted)
  311. }))
  312. defer server.Close()
  313. serverURL, uErr := url.Parse(server.URL)
  314. assert.NoError(t, uErr)
  315. assert.NoError(t, runExportPipeline(nil, serverURL))
  316. }
  317. func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
  318. // First we will construct a TimeSeries array from the testutils package
  319. testmap := make(map[string]*prompb.TimeSeries)
  320. if ts != nil {
  321. testmap["test"] = ts
  322. }
  323. cfg := createDefaultConfig().(*Config)
  324. cfg.HTTPClientSettings.Endpoint = endpoint.String()
  325. cfg.RemoteWriteQueue.NumConsumers = 1
  326. cfg.RetrySettings = exporterhelper.RetrySettings{
  327. Enabled: true,
  328. InitialInterval: 100 * time.Millisecond, // Shorter initial interval
  329. MaxInterval: 1 * time.Second, // Shorter max interval
  330. MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
  331. }
  332. buildInfo := component.BuildInfo{
  333. Description: "OpenTelemetry Collector",
  334. Version: "1.0",
  335. }
  336. set := exportertest.NewNopCreateSettings()
  337. set.BuildInfo = buildInfo
  338. // after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint
  339. prwe, err := newPRWExporter(cfg, set)
  340. if err != nil {
  341. return err
  342. }
  343. if err = prwe.Start(context.Background(), componenttest.NewNopHost()); err != nil {
  344. return err
  345. }
  346. return prwe.handleExport(context.Background(), testmap, nil)
  347. }
  348. // Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
  349. // expected
  350. func Test_PushMetrics(t *testing.T) {
  351. invalidTypeBatch := testdata.GenerateMetricsMetricTypeInvalid()
  352. // success cases
  353. intSumBatch := testdata.GenerateMetricsManyMetricsSameResource(10)
  354. sumBatch := getMetricsFromMetricList(validMetrics1[validSum], validMetrics2[validSum])
  355. intGaugeBatch := getMetricsFromMetricList(validMetrics1[validIntGauge], validMetrics2[validIntGauge])
  356. doubleGaugeBatch := getMetricsFromMetricList(validMetrics1[validDoubleGauge], validMetrics2[validDoubleGauge])
  357. expHistogramBatch := getMetricsFromMetricList(
  358. getExpHistogramMetric("exponential_hist", lbs1, time1, &floatVal1, uint64(2), 2, []uint64{1, 1}),
  359. getExpHistogramMetric("exponential_hist", lbs2, time2, &floatVal2, uint64(2), 0, []uint64{2, 2}),
  360. )
  361. emptyExponentialHistogramBatch := getMetricsFromMetricList(
  362. getExpHistogramMetric("empty_exponential_hist", lbs1, time1, &floatValZero, uint64(0), 0, []uint64{}),
  363. getExpHistogramMetric("empty_exponential_hist", lbs1, time1, &floatValZero, uint64(0), 1, []uint64{}),
  364. getExpHistogramMetric("empty_exponential_hist", lbs2, time2, &floatValZero, uint64(0), 0, []uint64{}),
  365. getExpHistogramMetric("empty_exponential_hist_two", lbs2, time2, &floatValZero, uint64(0), 0, []uint64{}),
  366. )
  367. exponentialNoSumHistogramBatch := getMetricsFromMetricList(
  368. getExpHistogramMetric("no_sum_exponential_hist", lbs1, time1, nil, uint64(2), 0, []uint64{1, 1}),
  369. getExpHistogramMetric("no_sum_exponential_hist", lbs1, time2, nil, uint64(2), 0, []uint64{2, 2}),
  370. )
  371. histogramBatch := getMetricsFromMetricList(validMetrics1[validHistogram], validMetrics2[validHistogram])
  372. emptyDataPointHistogramBatch := getMetricsFromMetricList(validMetrics1[validEmptyHistogram], validMetrics2[validEmptyHistogram])
  373. histogramNoSumBatch := getMetricsFromMetricList(validMetrics1[validHistogramNoSum], validMetrics2[validHistogramNoSum])
  374. summaryBatch := getMetricsFromMetricList(validMetrics1[validSummary], validMetrics2[validSummary])
  375. // len(BucketCount) > len(ExplicitBounds)
  376. unmatchedBoundBucketHistBatch := getMetricsFromMetricList(validMetrics2[unmatchedBoundBucketHist])
  377. // fail cases
  378. emptyDoubleGaugeBatch := getMetricsFromMetricList(invalidMetrics[emptyGauge])
  379. emptyCumulativeSumBatch := getMetricsFromMetricList(invalidMetrics[emptyCumulativeSum])
  380. emptyCumulativeHistogramBatch := getMetricsFromMetricList(invalidMetrics[emptyCumulativeHistogram])
  381. emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary])
  382. // staleNaN cases
  383. staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram])
  384. staleNaNEmptyHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNEmptyHistogram])
  385. staleNaNSummaryBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSummary])
  386. staleNaNIntGaugeBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNIntGauge])
  387. staleNaNDoubleGaugeBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNDoubleGauge])
  388. staleNaNIntSumBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNIntSum])
  389. staleNaNSumBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNSum])
  390. checkFunc := func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) {
  391. body, err := io.ReadAll(r.Body)
  392. if err != nil {
  393. t.Fatal(err)
  394. }
  395. buf := make([]byte, len(body))
  396. dest, err := snappy.Decode(buf, body)
  397. assert.Equal(t, "0.1.0", r.Header.Get("x-prometheus-remote-write-version"))
  398. assert.Equal(t, "snappy", r.Header.Get("content-encoding"))
  399. assert.Equal(t, "opentelemetry-collector/1.0", r.Header.Get("User-Agent"))
  400. assert.NotNil(t, r.Header.Get("tenant-id"))
  401. require.NoError(t, err)
  402. wr := &prompb.WriteRequest{}
  403. ok := proto.Unmarshal(dest, wr)
  404. require.Nil(t, ok)
  405. assert.EqualValues(t, expected, len(wr.Timeseries))
  406. if isStaleMarker {
  407. assert.True(t, value.IsStaleNaN(wr.Timeseries[0].Samples[0].Value))
  408. }
  409. }
  410. tests := []struct {
  411. name string
  412. metrics pmetric.Metrics
  413. reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
  414. expectedTimeSeries int
  415. httpResponseCode int
  416. returnErr bool
  417. isStaleMarker bool
  418. skipForWAL bool
  419. }{
  420. {
  421. name: "invalid_type_case",
  422. metrics: invalidTypeBatch,
  423. httpResponseCode: http.StatusAccepted,
  424. returnErr: true,
  425. },
  426. {
  427. name: "intSum_case",
  428. metrics: intSumBatch,
  429. reqTestFunc: checkFunc,
  430. expectedTimeSeries: 5,
  431. httpResponseCode: http.StatusAccepted,
  432. },
  433. {
  434. name: "doubleSum_case",
  435. metrics: sumBatch,
  436. reqTestFunc: checkFunc,
  437. expectedTimeSeries: 2,
  438. httpResponseCode: http.StatusAccepted,
  439. },
  440. {
  441. name: "doubleGauge_case",
  442. metrics: doubleGaugeBatch,
  443. reqTestFunc: checkFunc,
  444. expectedTimeSeries: 2,
  445. httpResponseCode: http.StatusAccepted,
  446. },
  447. {
  448. name: "intGauge_case",
  449. metrics: intGaugeBatch,
  450. reqTestFunc: checkFunc,
  451. expectedTimeSeries: 2,
  452. httpResponseCode: http.StatusAccepted,
  453. },
  454. {
  455. name: "exponential_histogram_case",
  456. metrics: expHistogramBatch,
  457. reqTestFunc: checkFunc,
  458. expectedTimeSeries: 2,
  459. httpResponseCode: http.StatusAccepted,
  460. },
  461. {
  462. name: "valid_empty_exponential_histogram_case",
  463. metrics: emptyExponentialHistogramBatch,
  464. reqTestFunc: checkFunc,
  465. expectedTimeSeries: 3,
  466. httpResponseCode: http.StatusAccepted,
  467. },
  468. {
  469. name: "exponential_histogram_no_sum_case",
  470. metrics: exponentialNoSumHistogramBatch,
  471. reqTestFunc: checkFunc,
  472. expectedTimeSeries: 1,
  473. httpResponseCode: http.StatusAccepted,
  474. },
  475. {
  476. name: "histogram_case",
  477. metrics: histogramBatch,
  478. reqTestFunc: checkFunc,
  479. expectedTimeSeries: 12,
  480. httpResponseCode: http.StatusAccepted,
  481. },
  482. {
  483. name: "valid_empty_histogram_case",
  484. metrics: emptyDataPointHistogramBatch,
  485. reqTestFunc: checkFunc,
  486. expectedTimeSeries: 4,
  487. httpResponseCode: http.StatusAccepted,
  488. },
  489. {
  490. name: "histogram_no_sum_case",
  491. metrics: histogramNoSumBatch,
  492. reqTestFunc: checkFunc,
  493. expectedTimeSeries: 10,
  494. httpResponseCode: http.StatusAccepted,
  495. },
  496. {
  497. name: "summary_case",
  498. metrics: summaryBatch,
  499. reqTestFunc: checkFunc,
  500. expectedTimeSeries: 10,
  501. httpResponseCode: http.StatusAccepted,
  502. },
  503. {
  504. name: "unmatchedBoundBucketHist_case",
  505. metrics: unmatchedBoundBucketHistBatch,
  506. reqTestFunc: checkFunc,
  507. expectedTimeSeries: 5,
  508. httpResponseCode: http.StatusAccepted,
  509. },
  510. {
  511. name: "5xx_case",
  512. metrics: unmatchedBoundBucketHistBatch,
  513. reqTestFunc: checkFunc,
  514. expectedTimeSeries: 5,
  515. httpResponseCode: http.StatusServiceUnavailable,
  516. returnErr: true,
  517. // When using the WAL, it returns success once the data is persisted to the WAL
  518. skipForWAL: true,
  519. },
  520. {
  521. name: "emptyGauge_case",
  522. metrics: emptyDoubleGaugeBatch,
  523. reqTestFunc: checkFunc,
  524. httpResponseCode: http.StatusAccepted,
  525. returnErr: true,
  526. },
  527. {
  528. name: "emptyCumulativeSum_case",
  529. metrics: emptyCumulativeSumBatch,
  530. reqTestFunc: checkFunc,
  531. httpResponseCode: http.StatusAccepted,
  532. returnErr: true,
  533. },
  534. {
  535. name: "emptyCumulativeHistogram_case",
  536. metrics: emptyCumulativeHistogramBatch,
  537. reqTestFunc: checkFunc,
  538. httpResponseCode: http.StatusAccepted,
  539. returnErr: true,
  540. },
  541. {
  542. name: "emptySummary_case",
  543. metrics: emptySummaryBatch,
  544. reqTestFunc: checkFunc,
  545. httpResponseCode: http.StatusAccepted,
  546. returnErr: true,
  547. },
  548. {
  549. name: "staleNaNIntGauge_case",
  550. metrics: staleNaNIntGaugeBatch,
  551. reqTestFunc: checkFunc,
  552. expectedTimeSeries: 1,
  553. httpResponseCode: http.StatusAccepted,
  554. isStaleMarker: true,
  555. },
  556. {
  557. name: "staleNaNDoubleGauge_case",
  558. metrics: staleNaNDoubleGaugeBatch,
  559. reqTestFunc: checkFunc,
  560. expectedTimeSeries: 1,
  561. httpResponseCode: http.StatusAccepted,
  562. isStaleMarker: true,
  563. },
  564. {
  565. name: "staleNaNIntSum_case",
  566. metrics: staleNaNIntSumBatch,
  567. reqTestFunc: checkFunc,
  568. expectedTimeSeries: 1,
  569. httpResponseCode: http.StatusAccepted,
  570. isStaleMarker: true,
  571. },
  572. {
  573. name: "staleNaNSum_case",
  574. metrics: staleNaNSumBatch,
  575. reqTestFunc: checkFunc,
  576. expectedTimeSeries: 1,
  577. httpResponseCode: http.StatusAccepted,
  578. isStaleMarker: true,
  579. },
  580. {
  581. name: "staleNaNHistogram_case",
  582. metrics: staleNaNHistogramBatch,
  583. reqTestFunc: checkFunc,
  584. expectedTimeSeries: 6,
  585. httpResponseCode: http.StatusAccepted,
  586. isStaleMarker: true,
  587. },
  588. {
  589. name: "staleNaNEmptyHistogram_case",
  590. metrics: staleNaNEmptyHistogramBatch,
  591. reqTestFunc: checkFunc,
  592. expectedTimeSeries: 3,
  593. httpResponseCode: http.StatusAccepted,
  594. isStaleMarker: true,
  595. },
  596. {
  597. name: "staleNaNSummary_case",
  598. metrics: staleNaNSummaryBatch,
  599. reqTestFunc: checkFunc,
  600. expectedTimeSeries: 5,
  601. httpResponseCode: http.StatusAccepted,
  602. isStaleMarker: true,
  603. },
  604. }
  605. for _, useWAL := range []bool{true, false} {
  606. name := "NoWAL"
  607. if useWAL {
  608. name = "WAL"
  609. }
  610. t.Run(name, func(t *testing.T) {
  611. if useWAL {
  612. t.Skip("Flaky test, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9124")
  613. }
  614. for _, ttt := range tests {
  615. tt := ttt
  616. if useWAL && tt.skipForWAL {
  617. t.Skip("test not supported when using WAL")
  618. }
  619. t.Run(tt.name, func(t *testing.T) {
  620. t.Parallel()
  621. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  622. if tt.reqTestFunc != nil {
  623. tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker)
  624. }
  625. w.WriteHeader(tt.httpResponseCode)
  626. }))
  627. defer server.Close()
  628. // Adjusted retry settings for faster testing
  629. retrySettings := exporterhelper.RetrySettings{
  630. Enabled: true,
  631. InitialInterval: 100 * time.Millisecond, // Shorter initial interval
  632. MaxInterval: 1 * time.Second, // Shorter max interval
  633. MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
  634. }
  635. cfg := &Config{
  636. Namespace: "",
  637. HTTPClientSettings: confighttp.HTTPClientSettings{
  638. Endpoint: server.URL,
  639. // We almost read 0 bytes, so no need to tune ReadBufferSize.
  640. ReadBufferSize: 0,
  641. WriteBufferSize: 512 * 1024,
  642. },
  643. MaxBatchSizeBytes: 3000000,
  644. RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
  645. TargetInfo: &TargetInfo{
  646. Enabled: true,
  647. },
  648. CreatedMetric: &CreatedMetric{
  649. Enabled: true,
  650. },
  651. RetrySettings: retrySettings,
  652. }
  653. if useWAL {
  654. cfg.WAL = &WALConfig{
  655. Directory: t.TempDir(),
  656. }
  657. }
  658. assert.NotNil(t, cfg)
  659. buildInfo := component.BuildInfo{
  660. Description: "OpenTelemetry Collector",
  661. Version: "1.0",
  662. }
  663. set := exportertest.NewNopCreateSettings()
  664. set.BuildInfo = buildInfo
  665. prwe, nErr := newPRWExporter(cfg, set)
  666. require.NoError(t, nErr)
  667. ctx, cancel := context.WithCancel(context.Background())
  668. defer cancel()
  669. require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
  670. defer func() {
  671. require.NoError(t, prwe.Shutdown(ctx))
  672. }()
  673. err := prwe.PushMetrics(ctx, tt.metrics)
  674. if tt.returnErr {
  675. assert.Error(t, err)
  676. return
  677. }
  678. assert.NoError(t, err)
  679. })
  680. }
  681. })
  682. }
  683. }
  684. func Test_validateAndSanitizeExternalLabels(t *testing.T) {
  685. tests := []struct {
  686. name string
  687. inputLabels map[string]string
  688. expectedLabels map[string]string
  689. returnErrorOnCreate bool
  690. }{
  691. {"success_case_no_labels",
  692. map[string]string{},
  693. map[string]string{},
  694. false,
  695. },
  696. {"success_case_with_labels",
  697. map[string]string{"key1": "val1"},
  698. map[string]string{"key1": "val1"},
  699. false,
  700. },
  701. {"success_case_2_with_labels",
  702. map[string]string{"__key1__": "val1"},
  703. map[string]string{"__key1__": "val1"},
  704. false,
  705. },
  706. {"success_case_with_sanitized_labels",
  707. map[string]string{"__key1.key__": "val1"},
  708. map[string]string{"__key1_key__": "val1"},
  709. false,
  710. },
  711. {"labels_that_start_with_digit",
  712. map[string]string{"6key_": "val1"},
  713. map[string]string{"key_6key_": "val1"},
  714. false,
  715. },
  716. {"fail_case_empty_label",
  717. map[string]string{"": "val1"},
  718. map[string]string{},
  719. true,
  720. },
  721. }
  722. testsWithoutSanitizelabel := []struct {
  723. name string
  724. inputLabels map[string]string
  725. expectedLabels map[string]string
  726. returnErrorOnCreate bool
  727. }{
  728. {"success_case_no_labels",
  729. map[string]string{},
  730. map[string]string{},
  731. false,
  732. },
  733. {"success_case_with_labels",
  734. map[string]string{"key1": "val1"},
  735. map[string]string{"key1": "val1"},
  736. false,
  737. },
  738. {"success_case_2_with_labels",
  739. map[string]string{"__key1__": "val1"},
  740. map[string]string{"__key1__": "val1"},
  741. false,
  742. },
  743. {"success_case_with_sanitized_labels",
  744. map[string]string{"__key1.key__": "val1"},
  745. map[string]string{"__key1_key__": "val1"},
  746. false,
  747. },
  748. {"labels_that_start_with_digit",
  749. map[string]string{"6key_": "val1"},
  750. map[string]string{"key_6key_": "val1"},
  751. false,
  752. },
  753. {"fail_case_empty_label",
  754. map[string]string{"": "val1"},
  755. map[string]string{},
  756. true,
  757. },
  758. }
  759. // run tests
  760. for _, tt := range tests {
  761. cfg := createDefaultConfig().(*Config)
  762. cfg.ExternalLabels = tt.inputLabels
  763. t.Run(tt.name, func(t *testing.T) {
  764. newLabels, err := validateAndSanitizeExternalLabels(cfg)
  765. if tt.returnErrorOnCreate {
  766. assert.Error(t, err)
  767. return
  768. }
  769. assert.EqualValues(t, tt.expectedLabels, newLabels)
  770. assert.NoError(t, err)
  771. })
  772. }
  773. for _, tt := range testsWithoutSanitizelabel {
  774. cfg := createDefaultConfig().(*Config)
  775. // disable sanitizeLabel flag
  776. cfg.ExternalLabels = tt.inputLabels
  777. t.Run(tt.name, func(t *testing.T) {
  778. newLabels, err := validateAndSanitizeExternalLabels(cfg)
  779. if tt.returnErrorOnCreate {
  780. assert.Error(t, err)
  781. return
  782. }
  783. assert.EqualValues(t, tt.expectedLabels, newLabels)
  784. assert.NoError(t, err)
  785. })
  786. }
  787. }
  788. // Ensures that when we attach the Write-Ahead-Log(WAL) to the exporter,
  789. // that it successfully writes the serialized prompb.WriteRequests to the WAL,
  790. // and that we can retrieve those exact requests back from the WAL, when the
  791. // exporter starts up once again, that it picks up where it left off.
  792. func TestWALOnExporterRoundTrip(t *testing.T) {
  793. t.Skip("skipping test, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10142")
  794. if testing.Short() {
  795. t.Skip("This test could run for long")
  796. }
  797. // 1. Create a mock Prometheus Remote Write Exporter that'll just
  798. // receive the bytes uploaded to it by our exporter.
  799. uploadedBytesCh := make(chan []byte, 1)
  800. exiting := make(chan bool)
  801. prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
  802. uploaded, err2 := io.ReadAll(req.Body)
  803. assert.NoError(t, err2, "Error while reading from HTTP upload")
  804. select {
  805. case uploadedBytesCh <- uploaded:
  806. case <-exiting:
  807. return
  808. }
  809. }))
  810. defer prweServer.Close()
  811. // 2. Create the WAL configuration, create the
  812. // exporter and export some time series!
  813. tempDir := t.TempDir()
  814. cfg := &Config{
  815. Namespace: "test_ns",
  816. HTTPClientSettings: confighttp.HTTPClientSettings{
  817. Endpoint: prweServer.URL,
  818. },
  819. RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
  820. WAL: &WALConfig{
  821. Directory: tempDir,
  822. BufferSize: 1,
  823. },
  824. TargetInfo: &TargetInfo{
  825. Enabled: true,
  826. },
  827. CreatedMetric: &CreatedMetric{
  828. Enabled: false,
  829. },
  830. }
  831. set := exportertest.NewNopCreateSettings()
  832. set.BuildInfo = component.BuildInfo{
  833. Description: "OpenTelemetry Collector",
  834. Version: "1.0",
  835. }
  836. prwe, perr := newPRWExporter(cfg, set)
  837. assert.NoError(t, perr)
  838. nopHost := componenttest.NewNopHost()
  839. ctx := context.Background()
  840. require.NoError(t, prwe.Start(ctx, nopHost))
  841. t.Cleanup(func() {
  842. // This should have been shut down during the test
  843. // If it does not error then something went wrong.
  844. assert.Error(t, prwe.Shutdown(ctx))
  845. close(exiting)
  846. })
  847. require.NotNil(t, prwe.wal)
  848. ts1 := &prompb.TimeSeries{
  849. Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}},
  850. Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
  851. }
  852. ts2 := &prompb.TimeSeries{
  853. Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}},
  854. Samples: []prompb.Sample{{Value: 2, Timestamp: 200}},
  855. }
  856. tsMap := map[string]*prompb.TimeSeries{
  857. "timeseries1": ts1,
  858. "timeseries2": ts2,
  859. }
  860. errs := prwe.handleExport(ctx, tsMap, nil)
  861. assert.NoError(t, errs)
  862. // Shutdown after we've written to the WAL. This ensures that our
  863. // exported data in-flight will flushed flushed to the WAL before exiting.
  864. require.NoError(t, prwe.Shutdown(ctx))
  865. // 3. Let's now read back all of the WAL records and ensure
  866. // that all the prompb.WriteRequest values exist as we sent them.
  867. wal, _, werr := cfg.WAL.createWAL()
  868. assert.NoError(t, werr)
  869. assert.NotNil(t, wal)
  870. t.Cleanup(func() {
  871. assert.NoError(t, wal.Close())
  872. })
  873. // Read all the indices.
  874. firstIndex, ierr := wal.FirstIndex()
  875. assert.NoError(t, ierr)
  876. lastIndex, ierr := wal.LastIndex()
  877. assert.NoError(t, ierr)
  878. var reqs []*prompb.WriteRequest
  879. for i := firstIndex; i <= lastIndex; i++ {
  880. protoBlob, err := wal.Read(i)
  881. assert.NoError(t, err)
  882. assert.NotNil(t, protoBlob)
  883. req := new(prompb.WriteRequest)
  884. err = proto.Unmarshal(protoBlob, req)
  885. assert.NoError(t, err)
  886. reqs = append(reqs, req)
  887. }
  888. assert.Equal(t, 1, len(reqs))
  889. // We MUST have 2 time series as were passed into tsMap.
  890. gotFromWAL := reqs[0]
  891. assert.Equal(t, 2, len(gotFromWAL.Timeseries))
  892. want := &prompb.WriteRequest{
  893. Timeseries: orderBySampleTimestamp([]prompb.TimeSeries{
  894. *ts1, *ts2,
  895. }),
  896. }
  897. // Even after sorting timeseries, we need to sort them
  898. // also by Label to ensure deterministic ordering.
  899. orderByLabelValue(gotFromWAL)
  900. gotFromWAL.Timeseries = orderBySampleTimestamp(gotFromWAL.Timeseries)
  901. orderByLabelValue(want)
  902. assert.Equal(t, want, gotFromWAL)
  903. // 4. Finally, ensure that the bytes that were uploaded to the
  904. // Prometheus Remote Write endpoint are exactly as were saved in the WAL.
  905. // Read from that same WAL, export to the RWExporter server.
  906. prwe2, err := newPRWExporter(cfg, set)
  907. assert.NoError(t, err)
  908. require.NoError(t, prwe2.Start(ctx, nopHost))
  909. t.Cleanup(func() {
  910. assert.NoError(t, prwe2.Shutdown(ctx))
  911. })
  912. require.NotNil(t, prwe2.wal)
  913. snappyEncodedBytes := <-uploadedBytesCh
  914. decodeBuffer := make([]byte, len(snappyEncodedBytes))
  915. uploadedBytes, derr := snappy.Decode(decodeBuffer, snappyEncodedBytes)
  916. require.NoError(t, derr)
  917. gotFromUpload := new(prompb.WriteRequest)
  918. uerr := proto.Unmarshal(uploadedBytes, gotFromUpload)
  919. assert.NoError(t, uerr)
  920. gotFromUpload.Timeseries = orderBySampleTimestamp(gotFromUpload.Timeseries)
  921. // Even after sorting timeseries, we need to sort them
  922. // also by Label to ensure deterministic ordering.
  923. orderByLabelValue(gotFromUpload)
  924. // 4.1. Ensure that all the various combinations match up.
  925. // To ensure a deterministic ordering, sort the TimeSeries by Label Name.
  926. assert.Equal(t, want, gotFromUpload)
  927. assert.Equal(t, gotFromWAL, gotFromUpload)
  928. }
  929. func TestRetryOn5xx(t *testing.T) {
  930. // Create a mock HTTP server with a counter to simulate a 5xx error on the first attempt and a 2xx success on the second attempt
  931. attempts := 0
  932. mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  933. if attempts < 4 {
  934. attempts++
  935. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  936. } else {
  937. w.WriteHeader(http.StatusOK)
  938. }
  939. }))
  940. defer mockServer.Close()
  941. endpointURL, err := url.Parse(mockServer.URL)
  942. require.NoError(t, err)
  943. // Create the prwExporter
  944. exporter := &prwExporter{
  945. endpointURL: endpointURL,
  946. client: http.DefaultClient,
  947. retrySettings: exporterhelper.RetrySettings{
  948. Enabled: true,
  949. },
  950. }
  951. ctx := context.Background()
  952. // Execute the write request and verify that the exporter returns a non-permanent error on the first attempt.
  953. err = exporter.execute(ctx, &prompb.WriteRequest{})
  954. assert.NoError(t, err)
  955. assert.Equal(t, 4, attempts)
  956. }
  957. func TestNoRetryOn4xx(t *testing.T) {
  958. // Create a mock HTTP server with a counter to simulate a 4xx error
  959. attempts := 0
  960. mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  961. if attempts < 1 {
  962. attempts++
  963. http.Error(w, "Bad Request", http.StatusBadRequest)
  964. } else {
  965. w.WriteHeader(http.StatusOK)
  966. }
  967. }))
  968. defer mockServer.Close()
  969. endpointURL, err := url.Parse(mockServer.URL)
  970. require.NoError(t, err)
  971. // Create the prwExporter
  972. exporter := &prwExporter{
  973. endpointURL: endpointURL,
  974. client: http.DefaultClient,
  975. retrySettings: exporterhelper.RetrySettings{
  976. Enabled: true,
  977. },
  978. }
  979. ctx := context.Background()
  980. // Execute the write request and verify that the exporter returns an error due to the 4xx response.
  981. err = exporter.execute(ctx, &prompb.WriteRequest{})
  982. assert.Error(t, err)
  983. assert.True(t, consumererror.IsPermanent(err))
  984. assert.Equal(t, 1, attempts)
  985. }