import logging import os import pathlib import sys import pandas as pd from pyflink.table import TableEnvironment, EnvironmentSettings from ob_jobs.common import utc_yesterday_range, ChOptions, PgOptions, WeightedAvg from ob_jobs.errors import analysis_error_count from ob_jobs.query_log import query_log_source_table from ob_jobs.query_stats import analysis_query_log_stats from ob_jobs.report import daily_report logger = logging.getLogger(__name__) def get_jar_files(): jar_base = os.environ.get('JAR_BASE_PATH', '/app/jars') jar_files = [f for f in pathlib.Path(jar_base).iterdir() if f.is_file() and f.suffix == '.jar'] return [ f"file://{jar_base}/{f.name}" for f in jar_files ] def build_tbl_env(): env_settings = EnvironmentSettings.in_batch_mode() env = TableEnvironment.create(env_settings) jar_files = get_jar_files() env.get_config().set("pipeline.jars", ';'.join(jar_files)) return env def daily_job(ch_ops: ChOptions, pg_ops: PgOptions): env = build_tbl_env() source_table_query, tmp_query_log_name = query_log_source_table(ch_ops) env.execute_sql(source_table_query) ye_begin, ye_end = utc_yesterday_range() err_count_df = analysis_error_count(env, tmp_query_log_name, pg_ops, begin=ye_begin, end=ye_end) query_df, insert_df, client_query_df = analysis_query_log_stats(env, tmp_query_log_name, pg_ops, begin=ye_begin, end=ye_end) # err_count_df.to_csv('err_count.csv') # query_df.to_csv('query_stats.csv') # insert_df.to_csv('insert_stats.csv') # client_query_df.to_csv('client_query.csv') ENV_KAFKA_BROKERS = 'KAFKA_BROKERS' ENV_JAR_BASE_PATH = 'JAR_BASE_PATH' ENV_CH_HOST = 'CH_HOST' ENV_CH_PORT = 'CH_PORT' ENV_CH_USER = 'CH_USER' ENV_CH_PASSWORD = 'CH_PASSWORD' ENV_CH_DBNAME = 'CH_DBNAME' ENV_PG_HOST = 'PG_HOST' ENV_PG_PORT = 'PG_PORT' ENV_PG_USER = 'PG_USER' ENV_PG_PASSWORD = 'PG_PASSWORD' ENV_PG_DBNAME = 'PG_DBNAME' def test_report(tmp_dir, report_dir): err_df, q_df, insert_df, cli_df = read_from_csv() daily_report(tmp_dir, report_dir, err_df, q_df, insert_df, cli_df) def read_from_csv(): err_count_df = pd.read_csv('err_count.csv', index_col=0) query_df = pd.read_csv('query_stats.csv', index_col=0) insert_df = pd.read_csv('insert_stats.csv', index_col=0) client_query_df = pd.read_csv('client_query.csv', index_col=0) return err_count_df, query_df, insert_df, client_query_df def main(): ch_host = os.getenv(ENV_CH_HOST, default='clickhouse.observe.svc.cluster.local') ch_port = os.getenv(ENV_CH_PORT, default=8123) ch_user = os.getenv(ENV_CH_USER, default='default') ch_password = os.getenv(ENV_CH_PASSWORD, default='cecf@cestong.com') ch_dbname = os.getenv(ENV_CH_DBNAME, default='system') pg_host = os.getenv(ENV_PG_HOST, default='m1.cestong.com.cn') pg_port = os.getenv(ENV_PG_PORT, default=32506) pg_user = os.getenv(ENV_PG_USER, default='postgres') pg_password = os.getenv(ENV_PG_PASSWORD, default='cecf@cestong.com') pg_dbname = os.getenv(ENV_PG_DBNAME, default='ob_admin') logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") ch_opts = ChOptions(ch_host, ch_port, ch_user, ch_password, ch_dbname) pg_opts = PgOptions(host=pg_host, port=pg_port, user=pg_user, password=pg_password, dbname=pg_dbname) daily_job(ch_opts, pg_opts) # test_report('./templates', './reports_file') if __name__ == '__main__': main()