1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- import argparse
- import logging
- import signal
- import yaml
- from oteldatareplay.record_kafka_sender import RecordKafkaSender
- from oteldatareplay.record_reader import RecordReader
- from oteldatareplay.replayer import Replayer
- logger: logging.Logger | None = None
- # This is a sample Python script.
- # Press Shift+F10 to execute it or replace it with your code.
- # Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
- def on_interrupt_func(player: RecordReader, replayer: Replayer):
- def on_interrupt(signum, frame):
- logger.info(f'signal: {signum} received: stopping')
- player.close()
- replayer.stop()
- logger.info('stopped')
- return on_interrupt
- def main():
- parser = argparse.ArgumentParser(description='Process some integers.')
- parser.add_argument('-c', '--config', help='配置文件位置',
- required=True)
- parser.add_argument('-d', '--debug', action="store", help='是否开启debug', default=False)
- args = parser.parse_args()
- with open(args.config, 'r') as stream:
- cfg = yaml.safe_load(stream)
- global logger
- log_cfg = cfg['logging'] if 'logging' in cfg else {'debug': False}
- log_level = logging.DEBUG if log_cfg['debug'] else logging.INFO
- logging.basicConfig(level=log_level,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[logging.StreamHandler()])
- logger = logging.getLogger('oteldatareplay')
- logger.info('[CONFIG]:', cfg)
- logger.info('[DEBUG]:', args.debug)
- data_path = cfg['source']['path']
- cfg_kafka = cfg['kafka']
- kafka_record_sender = RecordKafkaSender(cfg_kafka['bootstrap_servers'], cfg_kafka['topic'], debug=args.debug)
- kafka_record_sender.start()
- record_reader = RecordReader(data_path)
- replayer = Replayer(record_reader, kafka_record_sender.send_record,
- record_interval=cfg['replayer']['interval_seconds'])
- signal.signal(signal.SIGINT, on_interrupt_func(record_reader, replayer))
- replayer.start()
- # Press the green button in the gutter to run the script.
- if __name__ == '__main__':
- main()
- # See PyCharm help at https://www.jetbrains.com/help/pycharm/
|