scraper_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package mysqlreceiver
  4. import (
  5. "bufio"
  6. "context"
  7. "database/sql"
  8. "errors"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "testing"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. "go.opentelemetry.io/collector/config/confignet"
  16. "go.opentelemetry.io/collector/receiver/receivertest"
  17. "go.opentelemetry.io/collector/receiver/scrapererror"
  18. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
  19. "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
  20. )
  21. func TestScrape(t *testing.T) {
  22. t.Run("successful scrape", func(t *testing.T) {
  23. cfg := createDefaultConfig().(*Config)
  24. cfg.Username = "otel"
  25. cfg.Password = "otel"
  26. cfg.NetAddr = confignet.NetAddr{Endpoint: "localhost:3306"}
  27. cfg.MetricsBuilderConfig.Metrics.MysqlStatementEventCount.Enabled = true
  28. cfg.MetricsBuilderConfig.Metrics.MysqlStatementEventWaitTime.Enabled = true
  29. cfg.MetricsBuilderConfig.Metrics.MysqlConnectionErrors.Enabled = true
  30. cfg.MetricsBuilderConfig.Metrics.MysqlMysqlxWorkerThreads.Enabled = true
  31. cfg.MetricsBuilderConfig.Metrics.MysqlJoins.Enabled = true
  32. cfg.MetricsBuilderConfig.Metrics.MysqlTableOpenCache.Enabled = true
  33. cfg.MetricsBuilderConfig.Metrics.MysqlQueryClientCount.Enabled = true
  34. cfg.MetricsBuilderConfig.Metrics.MysqlQueryCount.Enabled = true
  35. cfg.MetricsBuilderConfig.Metrics.MysqlQuerySlowCount.Enabled = true
  36. cfg.MetricsBuilderConfig.Metrics.MysqlTableLockWaitReadCount.Enabled = true
  37. cfg.MetricsBuilderConfig.Metrics.MysqlTableLockWaitReadTime.Enabled = true
  38. cfg.MetricsBuilderConfig.Metrics.MysqlTableLockWaitWriteCount.Enabled = true
  39. cfg.MetricsBuilderConfig.Metrics.MysqlTableLockWaitWriteTime.Enabled = true
  40. cfg.MetricsBuilderConfig.Metrics.MysqlClientNetworkIo.Enabled = true
  41. cfg.MetricsBuilderConfig.Metrics.MysqlPreparedStatements.Enabled = true
  42. cfg.MetricsBuilderConfig.Metrics.MysqlCommands.Enabled = true
  43. cfg.MetricsBuilderConfig.Metrics.MysqlReplicaSQLDelay.Enabled = true
  44. cfg.MetricsBuilderConfig.Metrics.MysqlReplicaTimeBehindSource.Enabled = true
  45. cfg.MetricsBuilderConfig.Metrics.MysqlConnectionCount.Enabled = true
  46. scraper := newMySQLScraper(receivertest.NewNopCreateSettings(), cfg)
  47. scraper.sqlclient = &mockClient{
  48. globalStatsFile: "global_stats",
  49. innodbStatsFile: "innodb_stats",
  50. tableIoWaitsFile: "table_io_waits_stats",
  51. indexIoWaitsFile: "index_io_waits_stats",
  52. statementEventsFile: "statement_events",
  53. tableLockWaitEventStatsFile: "table_lock_wait_event_stats",
  54. replicaStatusFile: "replica_stats",
  55. }
  56. scraper.renameCommands = true
  57. actualMetrics, err := scraper.scrape(context.Background())
  58. require.NoError(t, err)
  59. expectedFile := filepath.Join("testdata", "scraper", "expected.yaml")
  60. expectedMetrics, err := golden.ReadMetrics(expectedFile)
  61. require.NoError(t, err)
  62. require.NoError(t, pmetrictest.CompareMetrics(actualMetrics, expectedMetrics,
  63. pmetrictest.IgnoreMetricDataPointsOrder(), pmetrictest.IgnoreStartTimestamp(), pmetrictest.IgnoreTimestamp()))
  64. })
  65. t.Run("scrape has partial failure", func(t *testing.T) {
  66. cfg := createDefaultConfig().(*Config)
  67. cfg.Username = "otel"
  68. cfg.Password = "otel"
  69. cfg.NetAddr = confignet.NetAddr{Endpoint: "localhost:3306"}
  70. cfg.MetricsBuilderConfig.Metrics.MysqlReplicaSQLDelay.Enabled = true
  71. cfg.MetricsBuilderConfig.Metrics.MysqlReplicaTimeBehindSource.Enabled = true
  72. cfg.MetricsBuilderConfig.Metrics.MysqlTableLockWaitReadCount.Enabled = true
  73. cfg.MetricsBuilderConfig.Metrics.MysqlTableLockWaitReadTime.Enabled = true
  74. cfg.MetricsBuilderConfig.Metrics.MysqlTableLockWaitWriteCount.Enabled = true
  75. cfg.MetricsBuilderConfig.Metrics.MysqlTableLockWaitWriteTime.Enabled = true
  76. scraper := newMySQLScraper(receivertest.NewNopCreateSettings(), cfg)
  77. scraper.sqlclient = &mockClient{
  78. globalStatsFile: "global_stats_partial",
  79. innodbStatsFile: "innodb_stats_empty",
  80. tableIoWaitsFile: "table_io_waits_stats_empty",
  81. indexIoWaitsFile: "index_io_waits_stats_empty",
  82. statementEventsFile: "statement_events_empty",
  83. tableLockWaitEventStatsFile: "table_lock_wait_event_stats_empty",
  84. replicaStatusFile: "replica_stats_empty",
  85. }
  86. actualMetrics, scrapeErr := scraper.scrape(context.Background())
  87. require.Error(t, scrapeErr)
  88. expectedFile := filepath.Join("testdata", "scraper", "expected_partial.yaml")
  89. expectedMetrics, err := golden.ReadMetrics(expectedFile)
  90. require.NoError(t, err)
  91. assert.NoError(t, pmetrictest.CompareMetrics(actualMetrics, expectedMetrics,
  92. pmetrictest.IgnoreMetricDataPointsOrder(), pmetrictest.IgnoreStartTimestamp(),
  93. pmetrictest.IgnoreTimestamp()))
  94. var partialError scrapererror.PartialScrapeError
  95. require.True(t, errors.As(scrapeErr, &partialError), "returned error was not PartialScrapeError")
  96. // 5 comes from 4 failed "must-have" metrics that aren't present,
  97. // and the other failure comes from a row that fails to parse as a number
  98. require.Equal(t, partialError.Failed, 5, "Expected partial error count to be 5")
  99. })
  100. }
  101. var _ client = (*mockClient)(nil)
  102. type mockClient struct {
  103. globalStatsFile string
  104. innodbStatsFile string
  105. tableIoWaitsFile string
  106. indexIoWaitsFile string
  107. statementEventsFile string
  108. tableLockWaitEventStatsFile string
  109. replicaStatusFile string
  110. }
  111. func readFile(fname string) (map[string]string, error) {
  112. var stats = map[string]string{}
  113. file, err := os.Open(filepath.Join("testdata", "scraper", fname+".txt"))
  114. if err != nil {
  115. return nil, err
  116. }
  117. defer file.Close()
  118. scanner := bufio.NewScanner(file)
  119. for scanner.Scan() {
  120. text := strings.Split(scanner.Text(), "\t")
  121. stats[text[0]] = text[1]
  122. }
  123. return stats, nil
  124. }
  125. func (c *mockClient) Connect() error {
  126. return nil
  127. }
  128. func (c *mockClient) getVersion() (string, error) {
  129. return "8.0.27", nil
  130. }
  131. func (c *mockClient) getGlobalStats() (map[string]string, error) {
  132. return readFile(c.globalStatsFile)
  133. }
  134. func (c *mockClient) getInnodbStats() (map[string]string, error) {
  135. return readFile(c.innodbStatsFile)
  136. }
  137. func (c *mockClient) getTableIoWaitsStats() ([]TableIoWaitsStats, error) {
  138. var stats []TableIoWaitsStats
  139. file, err := os.Open(filepath.Join("testdata", "scraper", c.tableIoWaitsFile+".txt"))
  140. if err != nil {
  141. return nil, err
  142. }
  143. defer file.Close()
  144. scanner := bufio.NewScanner(file)
  145. for scanner.Scan() {
  146. var s TableIoWaitsStats
  147. text := strings.Split(scanner.Text(), "\t")
  148. s.schema = text[0]
  149. s.name = text[1]
  150. s.countDelete, _ = parseInt(text[2])
  151. s.countFetch, _ = parseInt(text[3])
  152. s.countInsert, _ = parseInt(text[4])
  153. s.countUpdate, _ = parseInt(text[5])
  154. s.timeDelete, _ = parseInt(text[6])
  155. s.timeFetch, _ = parseInt(text[7])
  156. s.timeInsert, _ = parseInt(text[8])
  157. s.timeUpdate, _ = parseInt(text[9])
  158. stats = append(stats, s)
  159. }
  160. return stats, nil
  161. }
  162. func (c *mockClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) {
  163. var stats []IndexIoWaitsStats
  164. file, err := os.Open(filepath.Join("testdata", "scraper", c.indexIoWaitsFile+".txt"))
  165. if err != nil {
  166. return nil, err
  167. }
  168. defer file.Close()
  169. scanner := bufio.NewScanner(file)
  170. for scanner.Scan() {
  171. var s IndexIoWaitsStats
  172. text := strings.Split(scanner.Text(), "\t")
  173. s.schema = text[0]
  174. s.name = text[1]
  175. s.index = text[2]
  176. s.countDelete, _ = parseInt(text[3])
  177. s.countFetch, _ = parseInt(text[4])
  178. s.countInsert, _ = parseInt(text[5])
  179. s.countUpdate, _ = parseInt(text[6])
  180. s.timeDelete, _ = parseInt(text[7])
  181. s.timeFetch, _ = parseInt(text[8])
  182. s.timeInsert, _ = parseInt(text[9])
  183. s.timeUpdate, _ = parseInt(text[10])
  184. stats = append(stats, s)
  185. }
  186. return stats, nil
  187. }
  188. func (c *mockClient) getStatementEventsStats() ([]StatementEventStats, error) {
  189. var stats []StatementEventStats
  190. file, err := os.Open(filepath.Join("testdata", "scraper", c.statementEventsFile+".txt"))
  191. if err != nil {
  192. return nil, err
  193. }
  194. defer file.Close()
  195. scanner := bufio.NewScanner(file)
  196. for scanner.Scan() {
  197. var s StatementEventStats
  198. text := strings.Split(scanner.Text(), "\t")
  199. s.schema = text[0]
  200. s.digest = text[1]
  201. s.digestText = text[2]
  202. s.sumTimerWait, _ = parseInt(text[3])
  203. s.countErrors, _ = parseInt(text[4])
  204. s.countWarnings, _ = parseInt(text[5])
  205. s.countRowsAffected, _ = parseInt(text[6])
  206. s.countRowsSent, _ = parseInt(text[7])
  207. s.countRowsExamined, _ = parseInt(text[8])
  208. s.countCreatedTmpDiskTables, _ = parseInt(text[9])
  209. s.countCreatedTmpTables, _ = parseInt(text[10])
  210. s.countSortMergePasses, _ = parseInt(text[11])
  211. s.countSortRows, _ = parseInt(text[12])
  212. s.countNoIndexUsed, _ = parseInt(text[13])
  213. stats = append(stats, s)
  214. }
  215. return stats, nil
  216. }
  217. func (c *mockClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, error) {
  218. var stats []tableLockWaitEventStats
  219. file, err := os.Open(filepath.Join("testdata", "scraper", c.tableLockWaitEventStatsFile+".txt"))
  220. if err != nil {
  221. return nil, err
  222. }
  223. defer file.Close()
  224. scanner := bufio.NewScanner(file)
  225. for scanner.Scan() {
  226. var s tableLockWaitEventStats
  227. text := strings.Split(scanner.Text(), "\t")
  228. s.schema = text[0]
  229. s.name = text[1]
  230. s.countReadNormal, _ = parseInt(text[2])
  231. s.countReadWithSharedLocks, _ = parseInt(text[3])
  232. s.countReadHighPriority, _ = parseInt(text[4])
  233. s.countReadNoInsert, _ = parseInt(text[5])
  234. s.countReadExternal, _ = parseInt(text[6])
  235. s.countWriteAllowWrite, _ = parseInt(text[7])
  236. s.countWriteConcurrentInsert, _ = parseInt(text[8])
  237. s.countWriteLowPriority, _ = parseInt(text[9])
  238. s.countWriteNormal, _ = parseInt(text[10])
  239. s.countWriteExternal, _ = parseInt(text[11])
  240. s.sumTimerReadNormal, _ = parseInt(text[12])
  241. s.sumTimerReadWithSharedLocks, _ = parseInt(text[13])
  242. s.sumTimerReadHighPriority, _ = parseInt(text[14])
  243. s.sumTimerReadNoInsert, _ = parseInt(text[15])
  244. s.sumTimerReadExternal, _ = parseInt(text[16])
  245. s.sumTimerWriteAllowWrite, _ = parseInt(text[17])
  246. s.sumTimerWriteConcurrentInsert, _ = parseInt(text[18])
  247. s.sumTimerWriteLowPriority, _ = parseInt(text[19])
  248. s.sumTimerWriteNormal, _ = parseInt(text[20])
  249. s.sumTimerWriteExternal, _ = parseInt(text[21])
  250. stats = append(stats, s)
  251. }
  252. return stats, nil
  253. }
  254. func (c *mockClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
  255. var stats []ReplicaStatusStats
  256. file, err := os.Open(filepath.Join("testdata", "scraper", c.replicaStatusFile+".txt"))
  257. if err != nil {
  258. return nil, err
  259. }
  260. defer file.Close()
  261. scanner := bufio.NewScanner(file)
  262. for scanner.Scan() {
  263. var s ReplicaStatusStats
  264. text := strings.Split(scanner.Text(), "\t")
  265. s.replicaIOState = text[0]
  266. s.sourceHost = text[1]
  267. s.sourceUser = text[2]
  268. s.sourcePort, _ = parseInt(text[3])
  269. s.connectRetry, _ = parseInt(text[4])
  270. s.sourceLogFile = text[5]
  271. s.readSourceLogPos, _ = parseInt(text[6])
  272. s.relayLogFile = text[7]
  273. s.relayLogPos, _ = parseInt(text[8])
  274. s.relaySourceLogFile = text[9]
  275. s.replicaIORunning = text[10]
  276. s.replicaSQLRunning = text[11]
  277. s.replicateDoDB = text[12]
  278. s.replicateIgnoreDB = text[13]
  279. s.replicateDoTable = text[14]
  280. s.replicateIgnoreTable = text[15]
  281. s.replicateWildDoTable = text[16]
  282. s.replicateWildIgnoreTable = text[17]
  283. s.lastErrno, _ = parseInt(text[18])
  284. s.lastError = text[19]
  285. s.skipCounter, _ = parseInt(text[20])
  286. s.execSourceLogPos, _ = parseInt(text[21])
  287. s.relayLogSpace, _ = parseInt(text[22])
  288. s.untilCondition = text[23]
  289. s.untilLogFile = text[24]
  290. s.untilLogPos = text[25]
  291. s.sourceSSLAllowed = text[26]
  292. s.sourceSSLCAFile = text[27]
  293. s.sourceSSLCAPath = text[28]
  294. s.sourceSSLCert = text[29]
  295. s.sourceSSLCipher = text[30]
  296. s.sourceSSLKey = text[31]
  297. v, _ := parseInt(text[32])
  298. s.secondsBehindSource = sql.NullInt64{Int64: v, Valid: true}
  299. s.sourceSSLVerifyServerCert = text[33]
  300. s.lastIOErrno, _ = parseInt(text[34])
  301. s.lastIOError = text[35]
  302. s.lastSQLErrno, _ = parseInt(text[36])
  303. s.lastSQLError = text[37]
  304. s.replicateIgnoreServerIds = text[38]
  305. s.sourceServerID, _ = parseInt(text[39])
  306. s.sourceUUID = text[40]
  307. s.sourceInfoFile = text[41]
  308. s.sqlDelay, _ = parseInt(text[42])
  309. v, _ = parseInt(text[43])
  310. s.sqlRemainingDelay = sql.NullInt64{Int64: v, Valid: true}
  311. s.replicaSQLRunningState = text[44]
  312. s.sourceRetryCount, _ = parseInt(text[45])
  313. s.sourceBind = text[46]
  314. s.lastIOErrorTimestamp = text[47]
  315. s.lastSQLErrorTimestamp = text[48]
  316. s.sourceSSLCrl = text[49]
  317. s.sourceSSLCrlpath = text[50]
  318. s.retrievedGtidSet = text[51]
  319. s.executedGtidSet = text[52]
  320. s.autoPosition = text[53]
  321. s.replicateRewriteDB = text[54]
  322. s.channelName = text[55]
  323. s.sourceTLSVersion = text[56]
  324. s.sourcePublicKeyPath = text[57]
  325. s.getSourcePublicKey, _ = parseInt(text[58])
  326. s.networkNamespace = text[59]
  327. stats = append(stats, s)
  328. }
  329. return stats, nil
  330. }
  331. func (c *mockClient) Close() error {
  332. return nil
  333. }