1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import logging
- import os
- import pathlib
- import sys
- import pandas as pd
- from pyflink.table import TableEnvironment, EnvironmentSettings
- from ob_jobs.common import utc_yesterday_range, ChOptions, PgOptions, WeightedAvg
- from ob_jobs.errors import analysis_error_count
- from ob_jobs.query_log import query_log_source_table
- from ob_jobs.query_stats import analysis_query_log_stats
- from ob_jobs.report import daily_report
- logger = logging.getLogger(__name__)
- def get_jar_files():
- jar_base = os.environ.get('JAR_BASE_PATH', '/app/jars')
- jar_files = [f for f in pathlib.Path(jar_base).iterdir() if f.is_file() and f.suffix == '.jar']
- return [
- f"file://{jar_base}/{f.name}" for f in jar_files
- ]
- def build_tbl_env():
- env_settings = EnvironmentSettings.in_batch_mode()
- env = TableEnvironment.create(env_settings)
- jar_files = get_jar_files()
- env.get_config().set("pipeline.jars", ';'.join(jar_files))
- return env
- def daily_job(ch_ops: ChOptions, pg_ops: PgOptions):
- env = build_tbl_env()
- source_table_query, tmp_query_log_name = query_log_source_table(ch_ops)
- env.execute_sql(source_table_query)
- ye_begin, ye_end = utc_yesterday_range()
- err_count_df = analysis_error_count(env, tmp_query_log_name, pg_ops, begin=ye_begin, end=ye_end)
- query_df, insert_df, client_query_df = analysis_query_log_stats(env, tmp_query_log_name,
- pg_ops, begin=ye_begin, end=ye_end)
- # err_count_df.to_csv('err_count.csv')
- # query_df.to_csv('query_stats.csv')
- # insert_df.to_csv('insert_stats.csv')
- # client_query_df.to_csv('client_query.csv')
- ENV_KAFKA_BROKERS = 'KAFKA_BROKERS'
- ENV_JAR_BASE_PATH = 'JAR_BASE_PATH'
- ENV_CH_HOST = 'CH_HOST'
- ENV_CH_PORT = 'CH_PORT'
- ENV_CH_USER = 'CH_USER'
- ENV_CH_PASSWORD = 'CH_PASSWORD'
- ENV_CH_DBNAME = 'CH_DBNAME'
- ENV_PG_HOST = 'PG_HOST'
- ENV_PG_PORT = 'PG_PORT'
- ENV_PG_USER = 'PG_USER'
- ENV_PG_PASSWORD = 'PG_PASSWORD'
- ENV_PG_DBNAME = 'PG_DBNAME'
- def test_report(tmp_dir, report_dir):
- err_df, q_df, insert_df, cli_df = read_from_csv()
- daily_report(tmp_dir, report_dir, err_df, q_df, insert_df, cli_df)
- def read_from_csv():
- err_count_df = pd.read_csv('err_count.csv', index_col=0)
- query_df = pd.read_csv('query_stats.csv', index_col=0)
- insert_df = pd.read_csv('insert_stats.csv', index_col=0)
- client_query_df = pd.read_csv('client_query.csv', index_col=0)
- return err_count_df, query_df, insert_df, client_query_df
- def main():
- ch_host = os.getenv(ENV_CH_HOST, default='clickhouse.observe.svc.cluster.local')
- ch_port = os.getenv(ENV_CH_PORT, default=8123)
- ch_user = os.getenv(ENV_CH_USER, default='default')
- ch_password = os.getenv(ENV_CH_PASSWORD, default='cecf@cestong.com')
- ch_dbname = os.getenv(ENV_CH_DBNAME, default='system')
- pg_host = os.getenv(ENV_PG_HOST, default='m1.cestong.com.cn')
- pg_port = os.getenv(ENV_PG_PORT, default=32506)
- pg_user = os.getenv(ENV_PG_USER, default='postgres')
- pg_password = os.getenv(ENV_PG_PASSWORD, default='cecf@cestong.com')
- pg_dbname = os.getenv(ENV_PG_DBNAME, default='ob_admin')
- logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
- ch_opts = ChOptions(ch_host, ch_port, ch_user, ch_password, ch_dbname)
- pg_opts = PgOptions(host=pg_host, port=pg_port, user=pg_user, password=pg_password, dbname=pg_dbname)
- daily_job(ch_opts, pg_opts)
- # test_report('./templates', './reports_file')
- if __name__ == '__main__':
- main()
|