scraper_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package sqlqueryreceiver
  4. import (
  5. "context"
  6. "database/sql"
  7. "errors"
  8. "testing"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. "go.opentelemetry.io/collector/component/componenttest"
  12. "go.opentelemetry.io/collector/pdata/pcommon"
  13. "go.opentelemetry.io/collector/pdata/pmetric"
  14. "go.opentelemetry.io/collector/receiver/scrapererror"
  15. "go.uber.org/zap"
  16. )
  17. func TestScraper_ErrorOnStart(t *testing.T) {
  18. scrpr := scraper{
  19. dbProviderFunc: func() (*sql.DB, error) {
  20. return nil, errors.New("oops")
  21. },
  22. }
  23. err := scrpr.Start(context.Background(), componenttest.NewNopHost())
  24. require.Error(t, err)
  25. }
  26. func TestScraper_ClientErrorOnScrape(t *testing.T) {
  27. client := &fakeDBClient{
  28. err: errors.New("oops"),
  29. }
  30. scrpr := scraper{
  31. client: client,
  32. }
  33. _, err := scrpr.Scrape(context.Background())
  34. require.Error(t, err)
  35. }
  36. func TestScraper_RowToMetricErrorOnScrape_Float(t *testing.T) {
  37. client := &fakeDBClient{
  38. stringMaps: [][]stringMap{
  39. {{"myfloat": "blah"}},
  40. },
  41. }
  42. scrpr := scraper{
  43. client: client,
  44. query: Query{
  45. Metrics: []MetricCfg{{
  46. MetricName: "my.float",
  47. ValueColumn: "myfloat",
  48. Monotonic: true,
  49. ValueType: MetricValueTypeDouble,
  50. DataType: MetricTypeGauge,
  51. }},
  52. },
  53. }
  54. _, err := scrpr.Scrape(context.Background())
  55. assert.Error(t, err)
  56. }
  57. func TestScraper_RowToMetricErrorOnScrape_Int(t *testing.T) {
  58. client := &fakeDBClient{
  59. stringMaps: [][]stringMap{
  60. {{"myint": "blah"}},
  61. },
  62. }
  63. scrpr := scraper{
  64. client: client,
  65. query: Query{
  66. Metrics: []MetricCfg{{
  67. MetricName: "my.int",
  68. ValueColumn: "myint",
  69. Monotonic: true,
  70. ValueType: MetricValueTypeInt,
  71. DataType: MetricTypeGauge,
  72. }},
  73. },
  74. }
  75. _, err := scrpr.Scrape(context.Background())
  76. assert.Error(t, err)
  77. }
  78. func TestScraper_RowToMetricMultiErrorsOnScrape(t *testing.T) {
  79. client := &fakeDBClient{
  80. stringMaps: [][]stringMap{{
  81. {"myint": "foo"},
  82. {"myint": "bar"},
  83. }},
  84. }
  85. scrpr := scraper{
  86. client: client,
  87. query: Query{
  88. Metrics: []MetricCfg{{
  89. MetricName: "my.col",
  90. ValueColumn: "mycol",
  91. Monotonic: true,
  92. ValueType: MetricValueTypeInt,
  93. DataType: MetricTypeGauge,
  94. }},
  95. },
  96. }
  97. _, err := scrpr.Scrape(context.Background())
  98. assert.Error(t, err)
  99. }
  100. func TestScraper_SingleRow_MultiMetrics(t *testing.T) {
  101. scrpr := scraper{
  102. client: &fakeDBClient{
  103. stringMaps: [][]stringMap{{{
  104. "count": "42",
  105. "foo_name": "baz",
  106. "bar_name": "quux",
  107. }}},
  108. },
  109. query: Query{
  110. Metrics: []MetricCfg{
  111. {
  112. MetricName: "my.metric.1",
  113. ValueColumn: "count",
  114. AttributeColumns: []string{"foo_name", "bar_name"},
  115. ValueType: MetricValueTypeInt,
  116. DataType: MetricTypeGauge,
  117. },
  118. {
  119. MetricName: "my.metric.2",
  120. ValueColumn: "count",
  121. AttributeColumns: []string{"foo_name", "bar_name"},
  122. ValueType: MetricValueTypeInt,
  123. DataType: MetricTypeSum,
  124. Aggregation: MetricAggregationCumulative,
  125. },
  126. },
  127. },
  128. }
  129. metrics, err := scrpr.Scrape(context.Background())
  130. require.NoError(t, err)
  131. rms := metrics.ResourceMetrics()
  132. assert.Equal(t, 1, rms.Len())
  133. rm := rms.At(0)
  134. sms := rm.ScopeMetrics()
  135. assert.Equal(t, 1, sms.Len())
  136. sm := sms.At(0)
  137. ms := sm.Metrics()
  138. assert.Equal(t, 2, ms.Len())
  139. {
  140. gaugeMetric := ms.At(0)
  141. assert.Equal(t, "my.metric.1", gaugeMetric.Name())
  142. gauge := gaugeMetric.Gauge()
  143. dps := gauge.DataPoints()
  144. assert.Equal(t, 1, dps.Len())
  145. dp := dps.At(0)
  146. assert.EqualValues(t, 42, dp.IntValue())
  147. attrs := dp.Attributes()
  148. assert.Equal(t, 2, attrs.Len())
  149. fooVal, _ := attrs.Get("foo_name")
  150. assert.Equal(t, "baz", fooVal.AsString())
  151. barVal, _ := attrs.Get("bar_name")
  152. assert.Equal(t, "quux", barVal.AsString())
  153. }
  154. {
  155. sumMetric := ms.At(1)
  156. assert.Equal(t, "my.metric.2", sumMetric.Name())
  157. sum := sumMetric.Sum()
  158. dps := sum.DataPoints()
  159. assert.Equal(t, 1, dps.Len())
  160. dp := dps.At(0)
  161. assert.EqualValues(t, 42, dp.IntValue())
  162. attrs := dp.Attributes()
  163. assert.Equal(t, 2, attrs.Len())
  164. fooVal, _ := attrs.Get("foo_name")
  165. assert.Equal(t, "baz", fooVal.AsString())
  166. barVal, _ := attrs.Get("bar_name")
  167. assert.Equal(t, "quux", barVal.AsString())
  168. }
  169. }
  170. func TestScraper_MultiRow(t *testing.T) {
  171. client := &fakeDBClient{
  172. stringMaps: [][]stringMap{{
  173. {
  174. "count": "42",
  175. "genre": "action",
  176. },
  177. {
  178. "count": "111",
  179. "genre": "sci-fi",
  180. },
  181. }},
  182. }
  183. scrpr := scraper{
  184. client: client,
  185. query: Query{
  186. Metrics: []MetricCfg{
  187. {
  188. MetricName: "movie.genre",
  189. ValueColumn: "count",
  190. AttributeColumns: []string{"genre"},
  191. ValueType: MetricValueTypeInt,
  192. DataType: MetricTypeGauge,
  193. },
  194. },
  195. },
  196. }
  197. metrics, err := scrpr.Scrape(context.Background())
  198. require.NoError(t, err)
  199. ms := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
  200. {
  201. metric := ms.At(0)
  202. dp := metric.Gauge().DataPoints().At(0)
  203. assert.EqualValues(t, 42, dp.IntValue())
  204. val, _ := dp.Attributes().Get("genre")
  205. assert.Equal(t, "action", val.Str())
  206. }
  207. {
  208. metric := ms.At(1)
  209. dp := metric.Gauge().DataPoints().At(0)
  210. assert.EqualValues(t, 111, dp.IntValue())
  211. val, _ := dp.Attributes().Get("genre")
  212. assert.Equal(t, "sci-fi", val.Str())
  213. }
  214. }
  215. func TestScraper_MultiResults_CumulativeSum(t *testing.T) {
  216. client := &fakeDBClient{
  217. stringMaps: [][]stringMap{
  218. {{"count": "42"}},
  219. {{"count": "43"}},
  220. },
  221. }
  222. scrpr := scraper{
  223. client: client,
  224. query: Query{
  225. Metrics: []MetricCfg{{
  226. MetricName: "transaction.count",
  227. ValueColumn: "count",
  228. ValueType: MetricValueTypeInt,
  229. DataType: MetricTypeSum,
  230. Aggregation: MetricAggregationCumulative,
  231. }},
  232. },
  233. }
  234. assertTransactionCount(t, scrpr, 42, pmetric.AggregationTemporalityCumulative)
  235. assertTransactionCount(t, scrpr, 43, pmetric.AggregationTemporalityCumulative)
  236. }
  237. func TestScraper_MultiResults_DeltaSum(t *testing.T) {
  238. client := &fakeDBClient{
  239. stringMaps: [][]stringMap{
  240. {{"count": "42"}},
  241. {{"count": "43"}},
  242. },
  243. }
  244. scrpr := scraper{
  245. client: client,
  246. query: Query{
  247. Metrics: []MetricCfg{{
  248. MetricName: "transaction.count",
  249. ValueColumn: "count",
  250. ValueType: MetricValueTypeInt,
  251. DataType: MetricTypeSum,
  252. Aggregation: MetricAggregationDelta,
  253. }},
  254. },
  255. }
  256. assertTransactionCount(t, scrpr, 42, pmetric.AggregationTemporalityDelta)
  257. assertTransactionCount(t, scrpr, 43, pmetric.AggregationTemporalityDelta)
  258. }
  259. func assertTransactionCount(t *testing.T, scrpr scraper, expected int, agg pmetric.AggregationTemporality) {
  260. metrics, err := scrpr.Scrape(context.Background())
  261. require.NoError(t, err)
  262. metric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
  263. assert.Equal(t, "transaction.count", metric.Name())
  264. sum := metric.Sum()
  265. assert.Equal(
  266. t,
  267. agg,
  268. sum.AggregationTemporality(),
  269. )
  270. assert.EqualValues(t, expected, sum.DataPoints().At(0).IntValue())
  271. }
  272. func TestScraper_Float(t *testing.T) {
  273. client := &fakeDBClient{
  274. stringMaps: [][]stringMap{
  275. {{"myfloat": "123.4"}},
  276. },
  277. }
  278. scrpr := scraper{
  279. client: client,
  280. query: Query{
  281. Metrics: []MetricCfg{{
  282. MetricName: "my.float",
  283. ValueColumn: "myfloat",
  284. Monotonic: true,
  285. ValueType: MetricValueTypeDouble,
  286. DataType: MetricTypeGauge,
  287. }},
  288. },
  289. }
  290. metrics, err := scrpr.Scrape(context.Background())
  291. require.NoError(t, err)
  292. metric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
  293. assert.Equal(t, 123.4, metric.Gauge().DataPoints().At(0).DoubleValue())
  294. }
  295. func TestScraper_DescriptionAndUnit(t *testing.T) {
  296. client := &fakeDBClient{
  297. stringMaps: [][]stringMap{
  298. {{"mycol": "123"}},
  299. },
  300. }
  301. scrpr := scraper{
  302. client: client,
  303. query: Query{
  304. Metrics: []MetricCfg{{
  305. MetricName: "my.name",
  306. ValueColumn: "mycol",
  307. Description: "my description",
  308. Unit: "my-unit",
  309. }},
  310. },
  311. }
  312. metrics, err := scrpr.Scrape(context.Background())
  313. require.NoError(t, err)
  314. z := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
  315. assert.Equal(t, "my-unit", z.Unit())
  316. assert.Equal(t, "my description", z.Description())
  317. }
  318. func TestScraper_FakeDB_Warnings(t *testing.T) {
  319. db := fakeDB{rowVals: [][]any{{42, nil}}}
  320. logger := zap.NewNop()
  321. scrpr := scraper{
  322. client: newDbClient(db, "", logger),
  323. logger: logger,
  324. query: Query{
  325. Metrics: []MetricCfg{{
  326. MetricName: "my.name",
  327. ValueColumn: "col_0",
  328. Description: "my description",
  329. Unit: "my-unit",
  330. }},
  331. },
  332. }
  333. _, err := scrpr.Scrape(context.Background())
  334. require.NoError(t, err)
  335. }
  336. func TestScraper_FakeDB_MultiRows_Warnings(t *testing.T) {
  337. db := fakeDB{rowVals: [][]any{{42, nil}, {43, nil}}}
  338. logger := zap.NewNop()
  339. scrpr := scraper{
  340. client: newDbClient(db, "", logger),
  341. logger: logger,
  342. query: Query{
  343. Metrics: []MetricCfg{{
  344. MetricName: "my.col.0",
  345. ValueColumn: "col_0",
  346. Description: "my description 0",
  347. Unit: "my-unit-0",
  348. }},
  349. },
  350. }
  351. _, err := scrpr.Scrape(context.Background())
  352. // No error is expected because we're not actually asking for metrics from the
  353. // NULL column. Instead the errors from the NULL reads should just log warnings.
  354. assert.NoError(t, err)
  355. }
  356. func TestScraper_FakeDB_MultiRows_Error(t *testing.T) {
  357. db := fakeDB{rowVals: [][]any{{42, nil}, {43, nil}}}
  358. logger := zap.NewNop()
  359. scrpr := scraper{
  360. client: newDbClient(db, "", logger),
  361. logger: logger,
  362. query: Query{
  363. Metrics: []MetricCfg{{
  364. MetricName: "my.col.0",
  365. ValueColumn: "col_0",
  366. Description: "my description 0",
  367. Unit: "my-unit-0",
  368. }, {
  369. MetricName: "my.col.1",
  370. ValueColumn: "col_1",
  371. Description: "my description 1",
  372. Unit: "my-unit-1",
  373. },
  374. },
  375. },
  376. }
  377. _, err := scrpr.Scrape(context.Background())
  378. // We expect an error here not directly because of the NULL values but because
  379. // the column was also requested in Query.Metrics[1] but wasn't found. It's just
  380. // a partial scrape error though so it shouldn't cause a scraper shutdown.
  381. assert.Error(t, err)
  382. assert.True(t, scrapererror.IsPartialScrapeError(err))
  383. }
  384. func TestScraper_StartAndTSColumn(t *testing.T) {
  385. client := &fakeDBClient{
  386. stringMaps: [][]stringMap{{
  387. {
  388. "mycol": "42",
  389. "StartTs": "1682417791",
  390. "Ts": "1682418264",
  391. },
  392. }},
  393. }
  394. scrpr := scraper{
  395. client: client,
  396. query: Query{
  397. Metrics: []MetricCfg{{
  398. MetricName: "my.name",
  399. ValueColumn: "mycol",
  400. TsColumn: "Ts",
  401. StartTsColumn: "StartTs",
  402. DataType: MetricTypeSum,
  403. Aggregation: MetricAggregationCumulative,
  404. }},
  405. },
  406. }
  407. metrics, err := scrpr.Scrape(context.Background())
  408. require.NoError(t, err)
  409. metric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
  410. assert.Equal(t, pcommon.Timestamp(1682417791), metric.Sum().DataPoints().At(0).StartTimestamp())
  411. assert.Equal(t, pcommon.Timestamp(1682418264), metric.Sum().DataPoints().At(0).Timestamp())
  412. }
  413. func TestScraper_StartAndTS_ErrorOnColumnNotFound(t *testing.T) {
  414. client := &fakeDBClient{
  415. stringMaps: [][]stringMap{{
  416. {
  417. "mycol": "42",
  418. "StartTs": "1682417791",
  419. },
  420. }},
  421. }
  422. scrpr := scraper{
  423. client: client,
  424. query: Query{
  425. Metrics: []MetricCfg{{
  426. MetricName: "my.name",
  427. ValueColumn: "mycol",
  428. TsColumn: "Ts",
  429. StartTsColumn: "StartTs",
  430. DataType: MetricTypeSum,
  431. Aggregation: MetricAggregationCumulative,
  432. }},
  433. },
  434. }
  435. _, err := scrpr.Scrape(context.Background())
  436. assert.Error(t, err)
  437. }
  438. func TestScraper_StartAndTS_ErrorOnParse(t *testing.T) {
  439. client := &fakeDBClient{
  440. stringMaps: [][]stringMap{{
  441. {
  442. "mycol": "42",
  443. "StartTs": "blah",
  444. },
  445. }},
  446. }
  447. scrpr := scraper{
  448. client: client,
  449. query: Query{
  450. Metrics: []MetricCfg{{
  451. MetricName: "my.name",
  452. ValueColumn: "mycol",
  453. StartTsColumn: "StartTs",
  454. DataType: MetricTypeSum,
  455. Aggregation: MetricAggregationCumulative,
  456. }},
  457. },
  458. }
  459. _, err := scrpr.Scrape(context.Background())
  460. assert.Error(t, err)
  461. }