import json import logging from pyflink.common import Types, Row from pyflink.common.typeinfo import RowTypeInfo from pyflink.datastream import FlatMapFunction from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions from pyflink.table.sources logger = logging.getLogger(__name__) class TraceIDMapToTupleFlatMap(FlatMapFunction): def flat_map(self, value): try: m = json.loads(value.traces) except json.JSONDecodeError as de: logger.warning(f'decode trace value:{value.traces} failed:{de}') return [] return [Row(f0=trace_id, f1=timestamp) for trace_id, timestamp in m.items()] class ClickhouseSink: def __init__(self, ch_host, ch_port, ch_user, ch_name, ch_password): exec_ops = JdbcExecutionOptions.builder().with_batch_size(1000).with_max_retries(3).build() conn_ops = (JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .with_url(f"jdbc:clickhouse://{ch_host}:{ch_port}/{ch_name}") .with_user_name(ch_user) .with_password(ch_password) .build()) sql = 'insert into sample_hit_trace (trace_id, timestamp) values(?, ?)' # type_info = Types.ROW([Types.STRING(), Types.SQL_TIMESTAMP()]) type_info = RowTypeInfo([Types.STRING(), Types.LONG()], ['f0', 'f1']) self.__ch = JdbcSink.sink(sql, type_info, conn_ops, exec_ops) @property def ch_sink(self): return self.__ch