1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- import signal
- import time
- from datetime import datetime as dt
- import logging
- from oteldatareplay.record_reader import RecordReader
- from oteldatareplay.resource_span_record import ResourceSpanRecord
- logger = logging.getLogger(__name__)
- class Replayer(object):
- def __init__(self, record_reader: RecordReader, record_handler, record_interval=0.1):
- self.first_record_time = None
- self.running = True
- self.record_reader = record_reader
- self.trace_id2new_id = {}
- self.record_handler = record_handler
- self.record_interval = record_interval
- def signal_handler(self, sig, frame):
- logger.info(f"signal{sig} received. Exiting gracefully.")
- self.stop()
- def start(self):
- signal.signal(signal.SIGINT, self.signal_handler)
- self.parse_meta()
- self.running = True
- self.replay()
- def replay(self):
- gen = self.record_reader.records_generator()
- while self.running:
- try:
- record = next(gen)
- self.replay_record(record)
- time.sleep(self.record_interval)
- except StopIteration:
- logger.info("read complete, reset")
- self.record_reader.reset()
- self.trace_id2new_id.clear()
- gen = self.record_reader.records_generator()
- except Exception as e:
- logger.info(f'replay record failed:{e}')
- def replay_record(self, record: ResourceSpanRecord):
- logger.info('replay record')
- now = dt.now()
- first_record_time = self.first_record_time
- first_span = record.get_first_span()
- start_time_nano = int(first_span['startTimeUnixNano'])
- record_time = dt.fromtimestamp(start_time_nano / 1e9)
- record.update_span(now, first_record_time, self.trace_id2new_id)
- self.record_handler(record)
- return record_time
- def stop(self):
- self.running = False
- def parse_meta(self):
- first_record = next(iter(self.record_reader.records_generator()))
- self.first_record_time = first_record.first_span_start_time
- self.record_reader.reset()
|