1234567891011121314151617181920212223242526272829303132333435363738 |
- import json
- import logging
- from pyflink.common import Types, Row
- from pyflink.common.typeinfo import RowTypeInfo
- from pyflink.datastream import FlatMapFunction
- from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
- from pyflink.table.sources
- logger = logging.getLogger(__name__)
- class TraceIDMapToTupleFlatMap(FlatMapFunction):
- def flat_map(self, value):
- try:
- m = json.loads(value.traces)
- except json.JSONDecodeError as de:
- logger.warning(f'decode trace value:{value.traces} failed:{de}')
- return []
- return [Row(f0=trace_id, f1=timestamp) for trace_id, timestamp in m.items()]
- class ClickhouseSink:
- def __init__(self, ch_host, ch_port, ch_user, ch_name, ch_password):
- exec_ops = JdbcExecutionOptions.builder().with_batch_size(1000).with_max_retries(3).build()
- conn_ops = (JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .with_url(f"jdbc:clickhouse://{ch_host}:{ch_port}/{ch_name}")
- .with_user_name(ch_user)
- .with_password(ch_password)
- .build())
- sql = 'insert into sample_hit_trace (trace_id, timestamp) values(?, ?)'
- # type_info = Types.ROW([Types.STRING(), Types.SQL_TIMESTAMP()])
- type_info = RowTypeInfo([Types.STRING(), Types.LONG()], ['f0', 'f1'])
- self.__ch = JdbcSink.sink(sql, type_info, conn_ops, exec_ops)
- @property
- def ch_sink(self):
- return self.__ch
|