123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import logging
- import os
- import sys
- from pyflink.common import WatermarkStrategy, Types
- from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
- from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
- from pyflink.datastream.formats.json import JsonRowDeserializationSchema
- from otel_sampled_clickhouse.operators import TraceIDMapToTupleFlatMap, ClickhouseSink
- def get_jar_files():
- jar_base = os.environ.get('JAR_BASE_PATH', '/app/jars')
- return [
- f"file://{jar_base}/flink-sql-connector-kafka-3.0.2-1.18.jar",
- f"file://{jar_base}/flink-connector-jdbc-3.1.1-1.17.jar",
- f"file://{jar_base}/clickhouse-jdbc-0.5.0-all.jar",
- ]
- def debug_map_func(value):
- print(f"type:{type(value)}\nvalue:{value}")
- return value
- def trace_id2ch(brokers, ch_ops):
- env = StreamExecutionEnvironment.get_execution_environment()
- env.add_jars(*get_jar_files())
- env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
- env.set_parallelism(1)
- # values_type_info = Types.MAP(Types.STRING(), Types.BIG_INT())
- values_type_info = Types.STRING()
- row_type_info = Types.ROW_NAMED(['traces'], [values_type_info])
- # row_type_info = Types.ROW([values_type_info])
- json_de = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
- source = (KafkaSource.builder()
- .set_bootstrap_servers(brokers)
- .set_topics("otel_sample_hit")
- .set_group_id("traceid2clickhouse")
- .set_value_only_deserializer(json_de)
- .set_starting_offsets(KafkaOffsetsInitializer.earliest())
- .build())
- ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
- ch_sink = ClickhouseSink(**ch_ops)
- (ds
- .flat_map(TraceIDMapToTupleFlatMap())
- .map(lambda x: x, output_type=Types.ROW([Types.STRING(), Types.LONG()])) # 有用别删
- .add_sink(ch_sink.ch_sink)
- )
- # submit for execution
- env.execute()
- ENV_KAFKA_BROKERS = 'KAFKA_BROKERS'
- ENV_JAR_BASE_PATH = 'JAR_BASE_PATH'
- ENV_CH_HOST = 'CH_HOST'
- ENV_CH_PORT = 'CH_PORT'
- ENV_CH_USER = 'CH_USER'
- ENV_CH_PASSWORD = 'CH_PASSWORD'
- ENV_CH_DBNAME = 'CH_DBNAME'
- def main():
- ch_host = os.getenv(ENV_CH_HOST, default='clickhouse.observe.svc.cluster.local')
- ch_port = os.getenv(ENV_CH_PORT, default=8123)
- ch_user = os.getenv(ENV_CH_USER, default='default')
- ch_password = os.getenv(ENV_CH_PASSWORD, default='cecf@cestong.com')
- ch_dbname = os.getenv(ENV_CH_DBNAME, default='otel_cold')
- kafka_brokers = os.getenv(ENV_KAFKA_BROKERS, default='')
- logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
- brokers_env = kafka_brokers
- if brokers_env.strip() == "":
- logging.error(f'provide at least one broker via env:{ENV_KAFKA_BROKERS}')
- exit(1)
- brokers_list = [b.strip() for b in brokers_env.split(',')]
- if len(brokers_list) == 0:
- logging.error(f'provide at least one broker via env:{ENV_KAFKA_BROKERS}')
- exit(1)
- ch_options = {
- 'ch_host': ch_host,
- 'ch_port': ch_port,
- 'ch_user': ch_user,
- 'ch_password': ch_password,
- 'ch_name': ch_dbname,
- }
- bks = ','.join(brokers_list)
- trace_id2ch(bks, ch_options)
- if __name__ == '__main__':
- main()
|