operators.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import json
  2. import logging
  3. from pyflink.common import Types, Row
  4. from pyflink.common.typeinfo import RowTypeInfo
  5. from pyflink.datastream import FlatMapFunction
  6. from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
  7. from pyflink.table.sources
  8. logger = logging.getLogger(__name__)
  9. class TraceIDMapToTupleFlatMap(FlatMapFunction):
  10. def flat_map(self, value):
  11. try:
  12. m = json.loads(value.traces)
  13. except json.JSONDecodeError as de:
  14. logger.warning(f'decode trace value:{value.traces} failed:{de}')
  15. return []
  16. return [Row(f0=trace_id, f1=timestamp) for trace_id, timestamp in m.items()]
  17. class ClickhouseSink:
  18. def __init__(self, ch_host, ch_port, ch_user, ch_name, ch_password):
  19. exec_ops = JdbcExecutionOptions.builder().with_batch_size(1000).with_max_retries(3).build()
  20. conn_ops = (JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  21. .with_url(f"jdbc:clickhouse://{ch_host}:{ch_port}/{ch_name}")
  22. .with_user_name(ch_user)
  23. .with_password(ch_password)
  24. .build())
  25. sql = 'insert into sample_hit_trace (trace_id, timestamp) values(?, ?)'
  26. # type_info = Types.ROW([Types.STRING(), Types.SQL_TIMESTAMP()])
  27. type_info = RowTypeInfo([Types.STRING(), Types.LONG()], ['f0', 'f1'])
  28. self.__ch = JdbcSink.sink(sql, type_info, conn_ops, exec_ops)
  29. @property
  30. def ch_sink(self):
  31. return self.__ch