123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import re
- from datetime import datetime
- from pyflink.common import Row
- from pyflink.table import TableEnvironment, DataTypes
- from pyflink.table.udf import udf
- from ob_jobs.common import sql_date_format, PgOptions
- err_name_exp = re.compile(r'\(([A-Z_]+)\)')
- @udf(result_type=DataTypes.ROW(row_fields=[
- DataTypes.FIELD("code", DataTypes.INT()),
- DataTypes.FIELD("name", DataTypes.STRING()),
- DataTypes.FIELD("sample_message", DataTypes.STRING()),
- DataTypes.FIELD("count", DataTypes.INT()),
- ]))
- def parse_err_name_from_msg(r: Row) -> Row:
- err_name_groups = err_name_exp.search(r.err_msg).groups()
- err_name = err_name_groups[0] if err_name_groups else f'code-{r.err_code}'
- return Row(code=r.err_code, name=err_name, sample_message=r.err_msg, count=r.cnt)
- def error_count_table_ddl(pg_opts: PgOptions):
- tmp_table_name = 't_error_count'
- sql = f"""
- create table {tmp_table_name} (
- `start` TIMESTAMP,
- `stop` TIMESTAMP,
- `created_at` TIMESTAMP,
- `name` STRING,
- `code` INTEGER,
- `sourct_type` STRING,
- `source_id` STRING,
- `count` INTEGER,
- `sample_message` STRING,
- `stack_trace` STRING,
- `params` STRING
- ) with (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:postgresql://{pg_opts.host}:{pg_opts.port}/{pg_opts.dbname}',
- 'table-name' = 'error_count',
- 'username' = '{pg_opts.user}',
- 'password' = '{pg_opts.password}'
- )
- """
- return sql, tmp_table_name
- def add_consts_to_error_count(begin: datetime, end: datetime, source_type, source_id):
- now = datetime.utcnow()
- def f(r: Row) -> Row:
- return Row(
- begin,
- end,
- now,
- r.name,
- r.code,
- source_type,
- source_id,
- r.count,
- r.sample_message,
- 'stacktrace',
- '',
- )
- return f
- def analysis_error_count(table_env: TableEnvironment, query_log_table_name, pg_ops: PgOptions, begin: datetime,
- end: datetime):
- err_sink_table_types = DataTypes.ROW(row_fields=[
- DataTypes.FIELD("start", DataTypes.TIMESTAMP()),
- DataTypes.FIELD("stop", DataTypes.TIMESTAMP()),
- DataTypes.FIELD("created_at", DataTypes.TIMESTAMP()),
- DataTypes.FIELD("name", DataTypes.STRING()),
- DataTypes.FIELD("code", DataTypes.INT()),
- DataTypes.FIELD("source_type", DataTypes.STRING()),
- DataTypes.FIELD("source_id", DataTypes.STRING()),
- DataTypes.FIELD("count", DataTypes.INT()),
- DataTypes.FIELD("sample_message", DataTypes.STRING()),
- DataTypes.FIELD("stack_trace", DataTypes.STRING()),
- DataTypes.FIELD("params", DataTypes.STRING()),
- ])
- add_const_map_func = udf(add_consts_to_error_count(begin, end, 'clickhouse', 'id'),
- result_type=err_sink_table_types)
- error_count_sink_table_ddl, error_count_sink_table_name = error_count_table_ddl(pg_ops)
- table_env.execute_sql(error_count_sink_table_ddl)
- result = (
- table_env.sql_query(f"""
- select exception_code as err_code, count(*) as cnt, first_value(exception) as err_msg
- from {query_log_table_name}
- where event_time < '{end.strftime(sql_date_format)}' and
- event_time > '{begin.strftime(sql_date_format)}' and
- type in ('ExceptionBeforeStart', 'ExceptionWhileProcessing')
- group by exception_code
- """)
- .map(parse_err_name_from_msg)
- .map(add_const_map_func)
- )
- (
- result
- .execute_insert(error_count_sink_table_name)
- .wait()
- )
- return result.to_pandas()
|