123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- from collections import namedtuple
- from datetime import datetime, timedelta, timezone
- from pyflink.table import AggregateFunction
- from pyflink.common import Row
- import pytz
- local_tz = pytz.timezone('Asia/Shanghai')
- ChOptions = namedtuple('ChOptions', ['ch_host', 'ch_port', 'ch_user', 'ch_password',
- 'ch_dbname'])
- PgOptions = namedtuple('PgOptions', [
- 'host', 'port', 'dbname', 'user', 'password',
- ])
- # 本地的昨天起止时间对应的UTC时间
- def utc_yesterday_range():
- today = datetime.now(tz=local_tz)
- today_begin = datetime(today.year, today.month, today.day, tzinfo=local_tz)
- yesterday_begin = today_begin - timedelta(days=1)
- yesterday_end = yesterday_begin + timedelta(days=1) - timedelta(milliseconds=1)
- return yesterday_begin.astimezone(timezone.utc), yesterday_end.astimezone(timezone.utc)
- def utc_yesterday_begin():
- today = datetime.now(tz=local_tz)
- today_begin = datetime(today.year, today.month, today.day, tzinfo=local_tz)
- yesterday_begin = today_begin - timedelta(days=1)
- return yesterday_begin.astimezone(timezone.utc)
- def utc_yesterday_end():
- today = datetime.now(tz=local_tz)
- today_end = datetime(today.year, today.month, today.day, tzinfo=local_tz) + timedelta(days=1, milliseconds=-1)
- yesterday_end = today_end - timedelta(days=1)
- return yesterday_end.astimezone(timezone.utc)
- sql_date_format = '%Y-%m-%d %H:%M:%S'
- def human_duration(du: timedelta):
- seconds = du.total_seconds()
- sign_string = '-' if seconds < 0 else ''
- seconds = abs(seconds)
- days, seconds = divmod(seconds, 86400)
- hours, seconds = divmod(seconds, 3600)
- minutes, seconds = divmod(seconds, 60)
- if days > 0:
- return '%s%dd%dh%dm%ds' % (sign_string, days, hours, minutes, seconds)
- elif hours > 0:
- return '%s%dh%dm%ds' % (sign_string, hours, minutes, seconds)
- elif minutes > 0:
- return '%s%dm%ds' % (sign_string, minutes, seconds)
- else:
- return '%s%.3fs' % (sign_string, seconds)
- class WeightedAvg(AggregateFunction):
- def create_accumulator(self):
- # Row(sum, count)
- return Row(0, 0)
- def get_value(self, accumulator):
- if accumulator[1] == 0:
- return None
- else:
- return accumulator[0] / accumulator[1]
- def accumulate(self, accumulator, value, weight):
- accumulator[0] += value * weight
- accumulator[1] += weight
- def retract(self, accumulator, value, weight):
- accumulator[0] -= value * weight
- accumulator[1] -= weight
- def get_result_type(self):
- return 'BIGINT'
- def get_accumulator_type(self):
- return 'ROW<f0 BIGINT, f1 BIGINT>'
|