__main__.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import logging
  2. import os
  3. import pathlib
  4. import sys
  5. import pandas as pd
  6. from pyflink.table import TableEnvironment, EnvironmentSettings
  7. from ob_jobs.common import utc_yesterday_range, ChOptions, PgOptions, WeightedAvg
  8. from ob_jobs.errors import analysis_error_count
  9. from ob_jobs.query_log import query_log_source_table
  10. from ob_jobs.query_stats import analysis_query_log_stats
  11. from ob_jobs.report import daily_report
  12. logger = logging.getLogger(__name__)
  13. def get_jar_files():
  14. jar_base = os.environ.get('JAR_BASE_PATH', '/app/jars')
  15. jar_files = [f for f in pathlib.Path(jar_base).iterdir() if f.is_file() and f.suffix == '.jar']
  16. return [
  17. f"file://{jar_base}/{f.name}" for f in jar_files
  18. ]
  19. def build_tbl_env():
  20. env_settings = EnvironmentSettings.in_batch_mode()
  21. env = TableEnvironment.create(env_settings)
  22. jar_files = get_jar_files()
  23. env.get_config().set("pipeline.jars", ';'.join(jar_files))
  24. return env
  25. def daily_job(ch_ops: ChOptions, pg_ops: PgOptions):
  26. env = build_tbl_env()
  27. source_table_query, tmp_query_log_name = query_log_source_table(ch_ops)
  28. env.execute_sql(source_table_query)
  29. ye_begin, ye_end = utc_yesterday_range()
  30. err_count_df = analysis_error_count(env, tmp_query_log_name, pg_ops, begin=ye_begin, end=ye_end)
  31. query_df, insert_df, client_query_df = analysis_query_log_stats(env, tmp_query_log_name,
  32. pg_ops, begin=ye_begin, end=ye_end)
  33. # err_count_df.to_csv('err_count.csv')
  34. # query_df.to_csv('query_stats.csv')
  35. # insert_df.to_csv('insert_stats.csv')
  36. # client_query_df.to_csv('client_query.csv')
  37. ENV_KAFKA_BROKERS = 'KAFKA_BROKERS'
  38. ENV_JAR_BASE_PATH = 'JAR_BASE_PATH'
  39. ENV_CH_HOST = 'CH_HOST'
  40. ENV_CH_PORT = 'CH_PORT'
  41. ENV_CH_USER = 'CH_USER'
  42. ENV_CH_PASSWORD = 'CH_PASSWORD'
  43. ENV_CH_DBNAME = 'CH_DBNAME'
  44. ENV_PG_HOST = 'PG_HOST'
  45. ENV_PG_PORT = 'PG_PORT'
  46. ENV_PG_USER = 'PG_USER'
  47. ENV_PG_PASSWORD = 'PG_PASSWORD'
  48. ENV_PG_DBNAME = 'PG_DBNAME'
  49. def test_report(tmp_dir, report_dir):
  50. err_df, q_df, insert_df, cli_df = read_from_csv()
  51. daily_report(tmp_dir, report_dir, err_df, q_df, insert_df, cli_df)
  52. def read_from_csv():
  53. err_count_df = pd.read_csv('err_count.csv', index_col=0)
  54. query_df = pd.read_csv('query_stats.csv', index_col=0)
  55. insert_df = pd.read_csv('insert_stats.csv', index_col=0)
  56. client_query_df = pd.read_csv('client_query.csv', index_col=0)
  57. return err_count_df, query_df, insert_df, client_query_df
  58. def main():
  59. ch_host = os.getenv(ENV_CH_HOST, default='clickhouse.observe.svc.cluster.local')
  60. ch_port = os.getenv(ENV_CH_PORT, default=8123)
  61. ch_user = os.getenv(ENV_CH_USER, default='default')
  62. ch_password = os.getenv(ENV_CH_PASSWORD, default='cecf@cestong.com')
  63. ch_dbname = os.getenv(ENV_CH_DBNAME, default='system')
  64. pg_host = os.getenv(ENV_PG_HOST, default='m1.cestong.com.cn')
  65. pg_port = os.getenv(ENV_PG_PORT, default=32506)
  66. pg_user = os.getenv(ENV_PG_USER, default='postgres')
  67. pg_password = os.getenv(ENV_PG_PASSWORD, default='cecf@cestong.com')
  68. pg_dbname = os.getenv(ENV_PG_DBNAME, default='ob_admin')
  69. logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  70. ch_opts = ChOptions(ch_host, ch_port, ch_user, ch_password, ch_dbname)
  71. pg_opts = PgOptions(host=pg_host, port=pg_port, user=pg_user, password=pg_password, dbname=pg_dbname)
  72. daily_job(ch_opts, pg_opts)
  73. # test_report('./templates', './reports_file')
  74. if __name__ == '__main__':
  75. main()