__main__.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import argparse
  2. import logging
  3. import signal
  4. import yaml
  5. from oteldatareplay.record_kafka_sender import RecordKafkaSender
  6. from oteldatareplay.record_reader import RecordReader
  7. from oteldatareplay.replayer import Replayer
  8. logger: logging.Logger | None = None
  9. # This is a sample Python script.
  10. # Press Shift+F10 to execute it or replace it with your code.
  11. # Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
  12. def on_interrupt_func(player: RecordReader, replayer: Replayer):
  13. def on_interrupt(signum, frame):
  14. logger.info(f'signal: {signum} received: stopping')
  15. player.close()
  16. replayer.stop()
  17. logger.info('stopped')
  18. return on_interrupt
  19. def main():
  20. parser = argparse.ArgumentParser(description='Process some integers.')
  21. parser.add_argument('-c', '--config', help='配置文件位置',
  22. required=True)
  23. parser.add_argument('-d', '--debug', action="store", help='是否开启debug', default=False)
  24. args = parser.parse_args()
  25. with open(args.config, 'r') as stream:
  26. cfg = yaml.safe_load(stream)
  27. global logger
  28. log_cfg = cfg['logging'] if 'logging' in cfg else {'debug': False}
  29. log_level = logging.DEBUG if log_cfg['debug'] else logging.INFO
  30. logging.basicConfig(level=log_level,
  31. format='%(asctime)s - %(levelname)s - %(message)s',
  32. handlers=[logging.StreamHandler()])
  33. logger = logging.getLogger('oteldatareplay')
  34. logger.info('[CONFIG]:', cfg)
  35. logger.info('[DEBUG]:', args.debug)
  36. data_path = cfg['source']['path']
  37. cfg_kafka = cfg['kafka']
  38. kafka_record_sender = RecordKafkaSender(cfg_kafka['bootstrap_servers'], cfg_kafka['topic'], debug=args.debug)
  39. kafka_record_sender.start()
  40. record_reader = RecordReader(data_path)
  41. replayer = Replayer(record_reader, kafka_record_sender.send_record,
  42. record_interval=cfg['replayer']['interval_seconds'])
  43. signal.signal(signal.SIGINT, on_interrupt_func(record_reader, replayer))
  44. replayer.start()
  45. # Press the green button in the gutter to run the script.
  46. if __name__ == '__main__':
  47. main()
  48. # See PyCharm help at https://www.jetbrains.com/help/pycharm/