import re from datetime import datetime from pyflink.common import Row from pyflink.table import TableEnvironment, DataTypes from pyflink.table.udf import udf from ob_jobs.common import sql_date_format, PgOptions err_name_exp = re.compile(r'\(([A-Z_]+)\)') @udf(result_type=DataTypes.ROW(row_fields=[ DataTypes.FIELD("code", DataTypes.INT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("sample_message", DataTypes.STRING()), DataTypes.FIELD("count", DataTypes.INT()), ])) def parse_err_name_from_msg(r: Row) -> Row: err_name_groups = err_name_exp.search(r.err_msg).groups() err_name = err_name_groups[0] if err_name_groups else f'code-{r.err_code}' return Row(code=r.err_code, name=err_name, sample_message=r.err_msg, count=r.cnt) def error_count_table_ddl(pg_opts: PgOptions): tmp_table_name = 't_error_count' sql = f""" create table {tmp_table_name} ( `start` TIMESTAMP, `stop` TIMESTAMP, `created_at` TIMESTAMP, `name` STRING, `code` INTEGER, `sourct_type` STRING, `source_id` STRING, `count` INTEGER, `sample_message` STRING, `stack_trace` STRING, `params` STRING ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://{pg_opts.host}:{pg_opts.port}/{pg_opts.dbname}', 'table-name' = 'error_count', 'username' = '{pg_opts.user}', 'password' = '{pg_opts.password}' ) """ return sql, tmp_table_name def add_consts_to_error_count(begin: datetime, end: datetime, source_type, source_id): now = datetime.utcnow() def f(r: Row) -> Row: return Row( begin, end, now, r.name, r.code, source_type, source_id, r.count, r.sample_message, 'stacktrace', '', ) return f def analysis_error_count(table_env: TableEnvironment, query_log_table_name, pg_ops: PgOptions, begin: datetime, end: datetime): err_sink_table_types = DataTypes.ROW(row_fields=[ DataTypes.FIELD("start", DataTypes.TIMESTAMP()), DataTypes.FIELD("stop", DataTypes.TIMESTAMP()), DataTypes.FIELD("created_at", DataTypes.TIMESTAMP()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("code", DataTypes.INT()), DataTypes.FIELD("source_type", DataTypes.STRING()), DataTypes.FIELD("source_id", DataTypes.STRING()), DataTypes.FIELD("count", DataTypes.INT()), DataTypes.FIELD("sample_message", DataTypes.STRING()), DataTypes.FIELD("stack_trace", DataTypes.STRING()), DataTypes.FIELD("params", DataTypes.STRING()), ]) add_const_map_func = udf(add_consts_to_error_count(begin, end, 'clickhouse', 'id'), result_type=err_sink_table_types) error_count_sink_table_ddl, error_count_sink_table_name = error_count_table_ddl(pg_ops) table_env.execute_sql(error_count_sink_table_ddl) result = ( table_env.sql_query(f""" select exception_code as err_code, count(*) as cnt, first_value(exception) as err_msg from {query_log_table_name} where event_time < '{end.strftime(sql_date_format)}' and event_time > '{begin.strftime(sql_date_format)}' and type in ('ExceptionBeforeStart', 'ExceptionWhileProcessing') group by exception_code """) .map(parse_err_name_from_msg) .map(add_const_map_func) ) ( result .execute_insert(error_count_sink_table_name) .wait() ) return result.to_pandas()