__main__.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import logging
  2. import os
  3. import sys
  4. from pyflink.common import WatermarkStrategy, Types
  5. from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
  6. from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
  7. from pyflink.datastream.formats.json import JsonRowDeserializationSchema
  8. from otel_sampled_clickhouse.operators import TraceIDMapToTupleFlatMap, ClickhouseSink
  9. def get_jar_files():
  10. jar_base = os.environ.get('JAR_BASE_PATH', '/app/jars')
  11. return [
  12. f"file://{jar_base}/flink-sql-connector-kafka-3.0.2-1.18.jar",
  13. f"file://{jar_base}/flink-connector-jdbc-3.1.1-1.17.jar",
  14. f"file://{jar_base}/clickhouse-jdbc-0.5.0-all.jar",
  15. ]
  16. def debug_map_func(value):
  17. print(f"type:{type(value)}\nvalue:{value}")
  18. return value
  19. def trace_id2ch(brokers, ch_ops):
  20. env = StreamExecutionEnvironment.get_execution_environment()
  21. env.add_jars(*get_jar_files())
  22. env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
  23. env.set_parallelism(1)
  24. # values_type_info = Types.MAP(Types.STRING(), Types.BIG_INT())
  25. values_type_info = Types.STRING()
  26. row_type_info = Types.ROW_NAMED(['traces'], [values_type_info])
  27. # row_type_info = Types.ROW([values_type_info])
  28. json_de = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
  29. source = (KafkaSource.builder()
  30. .set_bootstrap_servers(brokers)
  31. .set_topics("otel_sample_hit")
  32. .set_group_id("traceid2clickhouse")
  33. .set_value_only_deserializer(json_de)
  34. .set_starting_offsets(KafkaOffsetsInitializer.earliest())
  35. .build())
  36. ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
  37. ch_sink = ClickhouseSink(**ch_ops)
  38. (ds
  39. .flat_map(TraceIDMapToTupleFlatMap())
  40. .map(lambda x: x, output_type=Types.ROW([Types.STRING(), Types.LONG()])) # 有用别删
  41. .add_sink(ch_sink.ch_sink)
  42. )
  43. # submit for execution
  44. env.execute()
  45. ENV_KAFKA_BROKERS = 'KAFKA_BROKERS'
  46. ENV_JAR_BASE_PATH = 'JAR_BASE_PATH'
  47. ENV_CH_HOST = 'CH_HOST'
  48. ENV_CH_PORT = 'CH_PORT'
  49. ENV_CH_USER = 'CH_USER'
  50. ENV_CH_PASSWORD = 'CH_PASSWORD'
  51. ENV_CH_DBNAME = 'CH_DBNAME'
  52. def main():
  53. ch_host = os.getenv(ENV_CH_HOST, default='clickhouse.observe.svc.cluster.local')
  54. ch_port = os.getenv(ENV_CH_PORT, default=8123)
  55. ch_user = os.getenv(ENV_CH_USER, default='default')
  56. ch_password = os.getenv(ENV_CH_PASSWORD, default='cecf@cestong.com')
  57. ch_dbname = os.getenv(ENV_CH_DBNAME, default='otel_cold')
  58. kafka_brokers = os.getenv(ENV_KAFKA_BROKERS, default='')
  59. logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  60. brokers_env = kafka_brokers
  61. if brokers_env.strip() == "":
  62. logging.error(f'provide at least one broker via env:{ENV_KAFKA_BROKERS}')
  63. exit(1)
  64. brokers_list = [b.strip() for b in brokers_env.split(',')]
  65. if len(brokers_list) == 0:
  66. logging.error(f'provide at least one broker via env:{ENV_KAFKA_BROKERS}')
  67. exit(1)
  68. ch_options = {
  69. 'ch_host': ch_host,
  70. 'ch_port': ch_port,
  71. 'ch_user': ch_user,
  72. 'ch_password': ch_password,
  73. 'ch_name': ch_dbname,
  74. }
  75. bks = ','.join(brokers_list)
  76. trace_id2ch(bks, ch_options)
  77. if __name__ == '__main__':
  78. main()