replayer.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import signal
  2. import time
  3. from datetime import datetime as dt
  4. import logging
  5. from oteldatareplay.record_reader import RecordReader
  6. from oteldatareplay.resource_span_record import ResourceSpanRecord
  7. logger = logging.getLogger(__name__)
  8. class Replayer(object):
  9. def __init__(self, record_reader: RecordReader, record_handler, record_interval=0.1):
  10. self.first_record_time = None
  11. self.running = True
  12. self.record_reader = record_reader
  13. self.trace_id2new_id = {}
  14. self.record_handler = record_handler
  15. self.record_interval = record_interval
  16. def signal_handler(self, sig, frame):
  17. logger.info(f"signal{sig} received. Exiting gracefully.")
  18. self.stop()
  19. def start(self):
  20. signal.signal(signal.SIGINT, self.signal_handler)
  21. self.parse_meta()
  22. self.running = True
  23. self.replay()
  24. def replay(self):
  25. gen = self.record_reader.records_generator()
  26. while self.running:
  27. try:
  28. record = next(gen)
  29. self.replay_record(record)
  30. time.sleep(self.record_interval)
  31. except StopIteration:
  32. logger.info("read complete, reset")
  33. self.record_reader.reset()
  34. self.trace_id2new_id.clear()
  35. gen = self.record_reader.records_generator()
  36. except Exception as e:
  37. logger.info(f'replay record failed:{e}')
  38. def replay_record(self, record: ResourceSpanRecord):
  39. logger.info('replay record')
  40. now = dt.now()
  41. first_record_time = self.first_record_time
  42. first_span = record.get_first_span()
  43. start_time_nano = int(first_span['startTimeUnixNano'])
  44. record_time = dt.fromtimestamp(start_time_nano / 1e9)
  45. record.update_span(now, first_record_time, self.trace_id2new_id)
  46. self.record_handler(record)
  47. return record_time
  48. def stop(self):
  49. self.running = False
  50. def parse_meta(self):
  51. first_record = next(iter(self.record_reader.records_generator()))
  52. self.first_record_time = first_record.first_span_start_time
  53. self.record_reader.reset()