client.go 17 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package mysqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mysqlreceiver"
  4. import (
  5. "database/sql"
  6. "fmt"
  7. "strings"
  8. "time"
  9. // registers the mysql driver
  10. "github.com/go-sql-driver/mysql"
  11. )
  12. type client interface {
  13. Connect() error
  14. getVersion() (string, error)
  15. getGlobalStats() (map[string]string, error)
  16. getInnodbStats() (map[string]string, error)
  17. getTableIoWaitsStats() ([]TableIoWaitsStats, error)
  18. getIndexIoWaitsStats() ([]IndexIoWaitsStats, error)
  19. getStatementEventsStats() ([]StatementEventStats, error)
  20. getTableLockWaitEventStats() ([]tableLockWaitEventStats, error)
  21. getReplicaStatusStats() ([]ReplicaStatusStats, error)
  22. Close() error
  23. }
  24. type mySQLClient struct {
  25. connStr string
  26. client *sql.DB
  27. statementEventsDigestTextLimit int
  28. statementEventsLimit int
  29. statementEventsTimeLimit time.Duration
  30. }
  31. type IoWaitsStats struct {
  32. schema string
  33. name string
  34. countDelete int64
  35. countFetch int64
  36. countInsert int64
  37. countUpdate int64
  38. timeDelete int64
  39. timeFetch int64
  40. timeInsert int64
  41. timeUpdate int64
  42. }
  43. type TableIoWaitsStats struct {
  44. IoWaitsStats
  45. }
  46. type IndexIoWaitsStats struct {
  47. IoWaitsStats
  48. index string
  49. }
  50. type StatementEventStats struct {
  51. schema string
  52. digest string
  53. digestText string
  54. sumTimerWait int64
  55. countErrors int64
  56. countWarnings int64
  57. countRowsAffected int64
  58. countRowsSent int64
  59. countRowsExamined int64
  60. countCreatedTmpDiskTables int64
  61. countCreatedTmpTables int64
  62. countSortMergePasses int64
  63. countSortRows int64
  64. countNoIndexUsed int64
  65. }
  66. type tableLockWaitEventStats struct {
  67. schema string
  68. name string
  69. countReadNormal int64
  70. countReadWithSharedLocks int64
  71. countReadHighPriority int64
  72. countReadNoInsert int64
  73. countReadExternal int64
  74. countWriteAllowWrite int64
  75. countWriteConcurrentInsert int64
  76. countWriteLowPriority int64
  77. countWriteNormal int64
  78. countWriteExternal int64
  79. sumTimerReadNormal int64
  80. sumTimerReadWithSharedLocks int64
  81. sumTimerReadHighPriority int64
  82. sumTimerReadNoInsert int64
  83. sumTimerReadExternal int64
  84. sumTimerWriteAllowWrite int64
  85. sumTimerWriteConcurrentInsert int64
  86. sumTimerWriteLowPriority int64
  87. sumTimerWriteNormal int64
  88. sumTimerWriteExternal int64
  89. }
  90. type ReplicaStatusStats struct {
  91. replicaIOState string
  92. sourceHost string
  93. sourceUser string
  94. sourcePort int64
  95. connectRetry int64
  96. sourceLogFile string
  97. readSourceLogPos int64
  98. relayLogFile string
  99. relayLogPos int64
  100. relaySourceLogFile string
  101. replicaIORunning string
  102. replicaSQLRunning string
  103. replicateDoDB string
  104. replicateIgnoreDB string
  105. replicateDoTable string
  106. replicateIgnoreTable string
  107. replicateWildDoTable string
  108. replicateWildIgnoreTable string
  109. lastErrno int64
  110. lastError string
  111. skipCounter int64
  112. execSourceLogPos int64
  113. relayLogSpace int64
  114. untilCondition string
  115. untilLogFile string
  116. untilLogPos string
  117. sourceSSLAllowed string
  118. sourceSSLCAFile string
  119. sourceSSLCAPath string
  120. sourceSSLCert string
  121. sourceSSLCipher string
  122. sourceSSLKey string
  123. secondsBehindSource sql.NullInt64
  124. sourceSSLVerifyServerCert string
  125. lastIOErrno int64
  126. lastIOError string
  127. lastSQLErrno int64
  128. lastSQLError string
  129. replicateIgnoreServerIds string
  130. sourceServerID int64
  131. sourceUUID string
  132. sourceInfoFile string
  133. sqlDelay int64
  134. sqlRemainingDelay sql.NullInt64
  135. replicaSQLRunningState string
  136. sourceRetryCount int64
  137. sourceBind string
  138. lastIOErrorTimestamp string
  139. lastSQLErrorTimestamp string
  140. sourceSSLCrl string
  141. sourceSSLCrlpath string
  142. retrievedGtidSet string
  143. executedGtidSet string
  144. autoPosition string
  145. replicateRewriteDB string
  146. channelName string
  147. sourceTLSVersion string
  148. sourcePublicKeyPath string
  149. getSourcePublicKey int64
  150. networkNamespace string
  151. }
  152. var _ client = (*mySQLClient)(nil)
  153. func newMySQLClient(conf *Config) (client, error) {
  154. tls, err := conf.TLS.LoadTLSConfig()
  155. if err != nil {
  156. return nil, err
  157. }
  158. tlsConfig := ""
  159. if tls != nil {
  160. err := mysql.RegisterTLSConfig("custom", tls)
  161. if err != nil {
  162. return nil, err
  163. }
  164. tlsConfig = "custom"
  165. }
  166. driverConf := mysql.Config{
  167. User: conf.Username,
  168. Passwd: string(conf.Password),
  169. Net: conf.Transport,
  170. Addr: conf.Endpoint,
  171. DBName: conf.Database,
  172. AllowNativePasswords: conf.AllowNativePasswords,
  173. TLS: tls,
  174. TLSConfig: tlsConfig,
  175. }
  176. connStr := driverConf.FormatDSN()
  177. return &mySQLClient{
  178. connStr: connStr,
  179. statementEventsDigestTextLimit: conf.StatementEvents.DigestTextLimit,
  180. statementEventsLimit: conf.StatementEvents.Limit,
  181. statementEventsTimeLimit: conf.StatementEvents.TimeLimit,
  182. }, nil
  183. }
  184. func (c *mySQLClient) Connect() error {
  185. clientDB, err := sql.Open("mysql", c.connStr)
  186. if err != nil {
  187. return fmt.Errorf("unable to connect to database: %w", err)
  188. }
  189. c.client = clientDB
  190. return nil
  191. }
  192. // getVersion queries the db for the version.
  193. func (c *mySQLClient) getVersion() (string, error) {
  194. query := "SELECT VERSION();"
  195. var version string
  196. err := c.client.QueryRow(query).Scan(&version)
  197. if err != nil {
  198. return "", err
  199. }
  200. return version, nil
  201. }
  202. // getGlobalStats queries the db for global status metrics.
  203. func (c *mySQLClient) getGlobalStats() (map[string]string, error) {
  204. q := "SHOW GLOBAL STATUS;"
  205. return query(*c, q)
  206. }
  207. // getInnodbStats queries the db for innodb metrics.
  208. func (c *mySQLClient) getInnodbStats() (map[string]string, error) {
  209. q := "SELECT name, count FROM information_schema.innodb_metrics WHERE name LIKE '%buffer_pool_size%';"
  210. return query(*c, q)
  211. }
  212. // getTableIoWaitsStats queries the db for table_io_waits metrics.
  213. func (c *mySQLClient) getTableIoWaitsStats() ([]TableIoWaitsStats, error) {
  214. query := "SELECT OBJECT_SCHEMA, OBJECT_NAME, " +
  215. "COUNT_DELETE, COUNT_FETCH, COUNT_INSERT, COUNT_UPDATE," +
  216. "SUM_TIMER_DELETE, SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE " +
  217. "FROM performance_schema.table_io_waits_summary_by_table " +
  218. "WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema');"
  219. rows, err := c.client.Query(query)
  220. if err != nil {
  221. return nil, err
  222. }
  223. defer rows.Close()
  224. var stats []TableIoWaitsStats
  225. for rows.Next() {
  226. var s TableIoWaitsStats
  227. err := rows.Scan(&s.schema, &s.name,
  228. &s.countDelete, &s.countFetch, &s.countInsert, &s.countUpdate,
  229. &s.timeDelete, &s.timeFetch, &s.timeInsert, &s.timeUpdate)
  230. if err != nil {
  231. return nil, err
  232. }
  233. stats = append(stats, s)
  234. }
  235. return stats, nil
  236. }
  237. // getIndexIoWaitsStats queries the db for index_io_waits metrics.
  238. func (c *mySQLClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) {
  239. query := "SELECT OBJECT_SCHEMA, OBJECT_NAME, ifnull(INDEX_NAME, 'NONE') as INDEX_NAME," +
  240. "COUNT_FETCH, COUNT_INSERT, COUNT_UPDATE, COUNT_DELETE," +
  241. "SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE, SUM_TIMER_DELETE " +
  242. "FROM performance_schema.table_io_waits_summary_by_index_usage " +
  243. "WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema');"
  244. rows, err := c.client.Query(query)
  245. if err != nil {
  246. return nil, err
  247. }
  248. defer rows.Close()
  249. var stats []IndexIoWaitsStats
  250. for rows.Next() {
  251. var s IndexIoWaitsStats
  252. err := rows.Scan(&s.schema, &s.name, &s.index,
  253. &s.countDelete, &s.countFetch, &s.countInsert, &s.countUpdate,
  254. &s.timeDelete, &s.timeFetch, &s.timeInsert, &s.timeUpdate)
  255. if err != nil {
  256. return nil, err
  257. }
  258. stats = append(stats, s)
  259. }
  260. return stats, nil
  261. }
  262. func (c *mySQLClient) getStatementEventsStats() ([]StatementEventStats, error) {
  263. query := fmt.Sprintf("SELECT ifnull(SCHEMA_NAME, 'NONE') as SCHEMA_NAME, DIGEST,"+
  264. "LEFT(DIGEST_TEXT, %d) as DIGEST_TEXT, SUM_TIMER_WAIT, SUM_ERRORS,"+
  265. "SUM_WARNINGS, SUM_ROWS_AFFECTED, SUM_ROWS_SENT, SUM_ROWS_EXAMINED,"+
  266. "SUM_CREATED_TMP_DISK_TABLES, SUM_CREATED_TMP_TABLES, SUM_SORT_MERGE_PASSES,"+
  267. "SUM_SORT_ROWS, SUM_NO_INDEX_USED "+
  268. "FROM performance_schema.events_statements_summary_by_digest "+
  269. "WHERE SCHEMA_NAME NOT IN ('mysql', 'performance_schema', 'information_schema') "+
  270. "AND last_seen > DATE_SUB(NOW(), INTERVAL %d SECOND) "+
  271. "ORDER BY SUM_TIMER_WAIT DESC "+
  272. "LIMIT %d",
  273. c.statementEventsDigestTextLimit,
  274. int64(c.statementEventsTimeLimit.Seconds()),
  275. c.statementEventsLimit)
  276. rows, err := c.client.Query(query)
  277. if err != nil {
  278. return nil, err
  279. }
  280. defer rows.Close()
  281. var stats []StatementEventStats
  282. for rows.Next() {
  283. var s StatementEventStats
  284. err := rows.Scan(&s.schema, &s.digest, &s.digestText,
  285. &s.sumTimerWait, &s.countErrors, &s.countWarnings,
  286. &s.countRowsAffected, &s.countRowsSent, &s.countRowsExamined, &s.countCreatedTmpDiskTables,
  287. &s.countCreatedTmpTables, &s.countSortMergePasses, &s.countSortRows, &s.countNoIndexUsed)
  288. if err != nil {
  289. return nil, err
  290. }
  291. stats = append(stats, s)
  292. }
  293. return stats, nil
  294. }
  295. func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, error) {
  296. query := "SELECT OBJECT_SCHEMA, OBJECT_NAME, COUNT_READ_NORMAL, COUNT_READ_WITH_SHARED_LOCKS," +
  297. "COUNT_READ_HIGH_PRIORITY, COUNT_READ_NO_INSERT, COUNT_READ_EXTERNAL, COUNT_WRITE_ALLOW_WRITE," +
  298. "COUNT_WRITE_CONCURRENT_INSERT, COUNT_WRITE_LOW_PRIORITY, COUNT_WRITE_NORMAL," +
  299. "COUNT_WRITE_EXTERNAL, SUM_TIMER_READ_NORMAL, SUM_TIMER_READ_WITH_SHARED_LOCKS," +
  300. "SUM_TIMER_READ_HIGH_PRIORITY, SUM_TIMER_READ_NO_INSERT, SUM_TIMER_READ_EXTERNAL," +
  301. "SUM_TIMER_WRITE_ALLOW_WRITE, SUM_TIMER_WRITE_CONCURRENT_INSERT, SUM_TIMER_WRITE_LOW_PRIORITY," +
  302. "SUM_TIMER_WRITE_NORMAL, SUM_TIMER_WRITE_EXTERNAL " +
  303. "FROM performance_schema.table_lock_waits_summary_by_table " +
  304. "WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema', 'information_schema')"
  305. rows, err := c.client.Query(query)
  306. if err != nil {
  307. return nil, err
  308. }
  309. defer rows.Close()
  310. var stats []tableLockWaitEventStats
  311. for rows.Next() {
  312. var s tableLockWaitEventStats
  313. err := rows.Scan(&s.schema, &s.name,
  314. &s.countReadNormal, &s.countReadWithSharedLocks, &s.countReadHighPriority, &s.countReadNoInsert, &s.countReadExternal,
  315. &s.countWriteAllowWrite, &s.countWriteConcurrentInsert, &s.countWriteLowPriority, &s.countWriteNormal, &s.countWriteExternal,
  316. &s.sumTimerReadNormal, &s.sumTimerReadWithSharedLocks, &s.sumTimerReadHighPriority, &s.sumTimerReadNoInsert, &s.sumTimerReadExternal,
  317. &s.sumTimerWriteAllowWrite, &s.sumTimerWriteConcurrentInsert, &s.sumTimerWriteLowPriority, &s.sumTimerWriteNormal, &s.sumTimerWriteExternal)
  318. if err != nil {
  319. return nil, err
  320. }
  321. stats = append(stats, s)
  322. }
  323. return stats, nil
  324. }
  325. func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
  326. version, err := c.getVersion()
  327. if err != nil {
  328. return nil, err
  329. }
  330. if version < "8.0.22" {
  331. return nil, nil
  332. }
  333. query := "SHOW REPLICA STATUS"
  334. rows, err := c.client.Query(query)
  335. if err != nil {
  336. return nil, err
  337. }
  338. defer rows.Close()
  339. cols, err := rows.Columns()
  340. if err != nil {
  341. return nil, err
  342. }
  343. var stats []ReplicaStatusStats
  344. for rows.Next() {
  345. var s ReplicaStatusStats
  346. dest := []any{}
  347. for _, col := range cols {
  348. switch strings.ToLower(col) {
  349. case "replica_io_state":
  350. dest = append(dest, &s.replicaIOState)
  351. case "source_host":
  352. dest = append(dest, &s.sourceHost)
  353. case "source_user":
  354. dest = append(dest, &s.sourceUser)
  355. case "source_port":
  356. dest = append(dest, &s.sourcePort)
  357. case "connect_retry":
  358. dest = append(dest, &s.connectRetry)
  359. case "source_log_file":
  360. dest = append(dest, &s.sourceLogFile)
  361. case "read_source_log_pos":
  362. dest = append(dest, &s.readSourceLogPos)
  363. case "relay_log_file":
  364. dest = append(dest, &s.relayLogFile)
  365. case "relay_log_pos":
  366. dest = append(dest, &s.relayLogPos)
  367. case "relay_source_log_file":
  368. dest = append(dest, &s.relaySourceLogFile)
  369. case "replica_io_running":
  370. dest = append(dest, &s.replicaIORunning)
  371. case "replica_sql_running":
  372. dest = append(dest, &s.replicaSQLRunning)
  373. case "replicate_do_db":
  374. dest = append(dest, &s.replicateDoDB)
  375. case "replicate_ignore_db":
  376. dest = append(dest, &s.replicateIgnoreDB)
  377. case "replicate_do_table":
  378. dest = append(dest, &s.replicateDoTable)
  379. case "replicate_ignore_table":
  380. dest = append(dest, &s.replicateIgnoreTable)
  381. case "replicate_wild_do_table":
  382. dest = append(dest, &s.replicateWildDoTable)
  383. case "replicate_wild_ignore_table":
  384. dest = append(dest, &s.replicateWildIgnoreTable)
  385. case "last_errno":
  386. dest = append(dest, &s.lastErrno)
  387. case "last_error":
  388. dest = append(dest, &s.lastError)
  389. case "skip_counter":
  390. dest = append(dest, &s.skipCounter)
  391. case "exec_source_log_pos":
  392. dest = append(dest, &s.execSourceLogPos)
  393. case "relay_log_space":
  394. dest = append(dest, &s.relayLogSpace)
  395. case "until_condition":
  396. dest = append(dest, &s.untilCondition)
  397. case "until_log_file":
  398. dest = append(dest, &s.untilLogFile)
  399. case "until_log_pos":
  400. dest = append(dest, &s.untilLogPos)
  401. case "source_ssl_allowed":
  402. dest = append(dest, &s.sourceSSLAllowed)
  403. case "source_ssl_ca_file":
  404. dest = append(dest, &s.sourceSSLCAFile)
  405. case "source_ssl_ca_path":
  406. dest = append(dest, &s.sourceSSLCAPath)
  407. case "source_ssl_cert":
  408. dest = append(dest, &s.sourceSSLCert)
  409. case "source_ssl_cipher":
  410. dest = append(dest, &s.sourceSSLCipher)
  411. case "source_ssl_key":
  412. dest = append(dest, &s.sourceSSLKey)
  413. case "seconds_behind_source":
  414. dest = append(dest, &s.secondsBehindSource)
  415. case "source_ssl_verify_server_cert":
  416. dest = append(dest, &s.sourceSSLVerifyServerCert)
  417. case "last_io_errno":
  418. dest = append(dest, &s.lastIOErrno)
  419. case "last_io_error":
  420. dest = append(dest, &s.lastIOError)
  421. case "last_sql_errno":
  422. dest = append(dest, &s.lastSQLErrno)
  423. case "last_sql_error":
  424. dest = append(dest, &s.lastSQLError)
  425. case "replicate_ignore_server_ids":
  426. dest = append(dest, &s.replicateIgnoreServerIds)
  427. case "source_server_id":
  428. dest = append(dest, &s.sourceServerID)
  429. case "source_uuid":
  430. dest = append(dest, &s.sourceUUID)
  431. case "source_info_file":
  432. dest = append(dest, &s.sourceInfoFile)
  433. case "sql_delay":
  434. dest = append(dest, &s.sqlDelay)
  435. case "sql_remaining_delay":
  436. dest = append(dest, &s.sqlRemainingDelay)
  437. case "replica_sql_running_state":
  438. dest = append(dest, &s.replicaSQLRunningState)
  439. case "source_retry_count":
  440. dest = append(dest, &s.sourceRetryCount)
  441. case "source_bind":
  442. dest = append(dest, &s.sourceBind)
  443. case "last_io_error_timestamp":
  444. dest = append(dest, &s.lastIOErrorTimestamp)
  445. case "last_sql_error_timestamp":
  446. dest = append(dest, &s.lastSQLErrorTimestamp)
  447. case "source_ssl_crl":
  448. dest = append(dest, &s.sourceSSLCrl)
  449. case "source_ssl_crlpath":
  450. dest = append(dest, &s.sourceSSLCrlpath)
  451. case "retrieved_gtid_set":
  452. dest = append(dest, &s.retrievedGtidSet)
  453. case "executed_gtid_set":
  454. dest = append(dest, &s.executedGtidSet)
  455. case "auto_position":
  456. dest = append(dest, &s.autoPosition)
  457. case "replicate_rewrite_db":
  458. dest = append(dest, &s.replicateRewriteDB)
  459. case "channel_name":
  460. dest = append(dest, &s.channelName)
  461. case "source_tls_version":
  462. dest = append(dest, &s.sourceTLSVersion)
  463. case "source_public_key_path":
  464. dest = append(dest, &s.sourcePublicKeyPath)
  465. case "get_source_public_key":
  466. dest = append(dest, &s.getSourcePublicKey)
  467. case "network_namespace":
  468. dest = append(dest, &s.networkNamespace)
  469. default:
  470. return nil, fmt.Errorf("unknown column name %s for replica status", col)
  471. }
  472. }
  473. err := rows.Scan(dest...)
  474. if err != nil {
  475. return nil, err
  476. }
  477. stats = append(stats, s)
  478. }
  479. return stats, nil
  480. }
  481. func query(c mySQLClient, query string) (map[string]string, error) {
  482. rows, err := c.client.Query(query)
  483. if err != nil {
  484. return nil, err
  485. }
  486. defer rows.Close()
  487. stats := map[string]string{}
  488. for rows.Next() {
  489. var key, val string
  490. if err := rows.Scan(&key, &val); err != nil {
  491. return nil, err
  492. }
  493. stats[key] = val
  494. }
  495. return stats, nil
  496. }
  497. func (c *mySQLClient) Close() error {
  498. if c.client != nil {
  499. return c.client.Close()
  500. }
  501. return nil
  502. }