errors.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import re
  2. from datetime import datetime
  3. from pyflink.common import Row
  4. from pyflink.table import TableEnvironment, DataTypes
  5. from pyflink.table.udf import udf
  6. from ob_jobs.common import sql_date_format, PgOptions
  7. err_name_exp = re.compile(r'\(([A-Z_]+)\)')
  8. @udf(result_type=DataTypes.ROW(row_fields=[
  9. DataTypes.FIELD("code", DataTypes.INT()),
  10. DataTypes.FIELD("name", DataTypes.STRING()),
  11. DataTypes.FIELD("sample_message", DataTypes.STRING()),
  12. DataTypes.FIELD("count", DataTypes.INT()),
  13. ]))
  14. def parse_err_name_from_msg(r: Row) -> Row:
  15. err_name_groups = err_name_exp.search(r.err_msg).groups()
  16. err_name = err_name_groups[0] if err_name_groups else f'code-{r.err_code}'
  17. return Row(code=r.err_code, name=err_name, sample_message=r.err_msg, count=r.cnt)
  18. def error_count_table_ddl(pg_opts: PgOptions):
  19. tmp_table_name = 't_error_count'
  20. sql = f"""
  21. create table {tmp_table_name} (
  22. `start` TIMESTAMP,
  23. `stop` TIMESTAMP,
  24. `created_at` TIMESTAMP,
  25. `name` STRING,
  26. `code` INTEGER,
  27. `sourct_type` STRING,
  28. `source_id` STRING,
  29. `count` INTEGER,
  30. `sample_message` STRING,
  31. `stack_trace` STRING,
  32. `params` STRING
  33. ) with (
  34. 'connector' = 'jdbc',
  35. 'url' = 'jdbc:postgresql://{pg_opts.host}:{pg_opts.port}/{pg_opts.dbname}',
  36. 'table-name' = 'error_count',
  37. 'username' = '{pg_opts.user}',
  38. 'password' = '{pg_opts.password}'
  39. )
  40. """
  41. return sql, tmp_table_name
  42. def add_consts_to_error_count(begin: datetime, end: datetime, source_type, source_id):
  43. now = datetime.utcnow()
  44. def f(r: Row) -> Row:
  45. return Row(
  46. begin,
  47. end,
  48. now,
  49. r.name,
  50. r.code,
  51. source_type,
  52. source_id,
  53. r.count,
  54. r.sample_message,
  55. 'stacktrace',
  56. '',
  57. )
  58. return f
  59. def analysis_error_count(table_env: TableEnvironment, query_log_table_name, pg_ops: PgOptions, begin: datetime,
  60. end: datetime):
  61. err_sink_table_types = DataTypes.ROW(row_fields=[
  62. DataTypes.FIELD("start", DataTypes.TIMESTAMP()),
  63. DataTypes.FIELD("stop", DataTypes.TIMESTAMP()),
  64. DataTypes.FIELD("created_at", DataTypes.TIMESTAMP()),
  65. DataTypes.FIELD("name", DataTypes.STRING()),
  66. DataTypes.FIELD("code", DataTypes.INT()),
  67. DataTypes.FIELD("source_type", DataTypes.STRING()),
  68. DataTypes.FIELD("source_id", DataTypes.STRING()),
  69. DataTypes.FIELD("count", DataTypes.INT()),
  70. DataTypes.FIELD("sample_message", DataTypes.STRING()),
  71. DataTypes.FIELD("stack_trace", DataTypes.STRING()),
  72. DataTypes.FIELD("params", DataTypes.STRING()),
  73. ])
  74. add_const_map_func = udf(add_consts_to_error_count(begin, end, 'clickhouse', 'id'),
  75. result_type=err_sink_table_types)
  76. error_count_sink_table_ddl, error_count_sink_table_name = error_count_table_ddl(pg_ops)
  77. table_env.execute_sql(error_count_sink_table_ddl)
  78. result = (
  79. table_env.sql_query(f"""
  80. select exception_code as err_code, count(*) as cnt, first_value(exception) as err_msg
  81. from {query_log_table_name}
  82. where event_time < '{end.strftime(sql_date_format)}' and
  83. event_time > '{begin.strftime(sql_date_format)}' and
  84. type in ('ExceptionBeforeStart', 'ExceptionWhileProcessing')
  85. group by exception_code
  86. """)
  87. .map(parse_err_name_from_msg)
  88. .map(add_const_map_func)
  89. )
  90. (
  91. result
  92. .execute_insert(error_count_sink_table_name)
  93. .wait()
  94. )
  95. return result.to_pandas()