common.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from collections import namedtuple
  2. from datetime import datetime, timedelta, timezone
  3. from pyflink.table import AggregateFunction
  4. from pyflink.common import Row
  5. import pytz
  6. local_tz = pytz.timezone('Asia/Shanghai')
  7. ChOptions = namedtuple('ChOptions', ['ch_host', 'ch_port', 'ch_user', 'ch_password',
  8. 'ch_dbname'])
  9. PgOptions = namedtuple('PgOptions', [
  10. 'host', 'port', 'dbname', 'user', 'password',
  11. ])
  12. # 本地的昨天起止时间对应的UTC时间
  13. def utc_yesterday_range():
  14. today = datetime.now(tz=local_tz)
  15. today_begin = datetime(today.year, today.month, today.day, tzinfo=local_tz)
  16. yesterday_begin = today_begin - timedelta(days=1)
  17. yesterday_end = yesterday_begin + timedelta(days=1) - timedelta(milliseconds=1)
  18. return yesterday_begin.astimezone(timezone.utc), yesterday_end.astimezone(timezone.utc)
  19. def utc_yesterday_begin():
  20. today = datetime.now(tz=local_tz)
  21. today_begin = datetime(today.year, today.month, today.day, tzinfo=local_tz)
  22. yesterday_begin = today_begin - timedelta(days=1)
  23. return yesterday_begin.astimezone(timezone.utc)
  24. def utc_yesterday_end():
  25. today = datetime.now(tz=local_tz)
  26. today_end = datetime(today.year, today.month, today.day, tzinfo=local_tz) + timedelta(days=1, milliseconds=-1)
  27. yesterday_end = today_end - timedelta(days=1)
  28. return yesterday_end.astimezone(timezone.utc)
  29. sql_date_format = '%Y-%m-%d %H:%M:%S'
  30. def human_duration(du: timedelta):
  31. seconds = du.total_seconds()
  32. sign_string = '-' if seconds < 0 else ''
  33. seconds = abs(seconds)
  34. days, seconds = divmod(seconds, 86400)
  35. hours, seconds = divmod(seconds, 3600)
  36. minutes, seconds = divmod(seconds, 60)
  37. if days > 0:
  38. return '%s%dd%dh%dm%ds' % (sign_string, days, hours, minutes, seconds)
  39. elif hours > 0:
  40. return '%s%dh%dm%ds' % (sign_string, hours, minutes, seconds)
  41. elif minutes > 0:
  42. return '%s%dm%ds' % (sign_string, minutes, seconds)
  43. else:
  44. return '%s%.3fs' % (sign_string, seconds)
  45. class WeightedAvg(AggregateFunction):
  46. def create_accumulator(self):
  47. # Row(sum, count)
  48. return Row(0, 0)
  49. def get_value(self, accumulator):
  50. if accumulator[1] == 0:
  51. return None
  52. else:
  53. return accumulator[0] / accumulator[1]
  54. def accumulate(self, accumulator, value, weight):
  55. accumulator[0] += value * weight
  56. accumulator[1] += weight
  57. def retract(self, accumulator, value, weight):
  58. accumulator[0] -= value * weight
  59. accumulator[1] -= weight
  60. def get_result_type(self):
  61. return 'BIGINT'
  62. def get_accumulator_type(self):
  63. return 'ROW<f0 BIGINT, f1 BIGINT>'