import signal import time from datetime import datetime as dt import logging from oteldatareplay.record_reader import RecordReader from oteldatareplay.resource_span_record import ResourceSpanRecord logger = logging.getLogger(__name__) class Replayer(object): def __init__(self, record_reader: RecordReader, record_handler, record_interval=0.1): self.first_record_time = None self.running = True self.record_reader = record_reader self.trace_id2new_id = {} self.record_handler = record_handler self.record_interval = record_interval def signal_handler(self, sig, frame): logger.info(f"signal{sig} received. Exiting gracefully.") self.stop() def start(self): signal.signal(signal.SIGINT, self.signal_handler) self.parse_meta() self.running = True self.replay() def replay(self): gen = self.record_reader.records_generator() while self.running: try: record = next(gen) self.replay_record(record) time.sleep(self.record_interval) except StopIteration: logger.info("read complete, reset") self.record_reader.reset() self.trace_id2new_id.clear() gen = self.record_reader.records_generator() except Exception as e: logger.info(f'replay record failed:{e}') def replay_record(self, record: ResourceSpanRecord): logger.info('replay record') now = dt.now() first_record_time = self.first_record_time first_span = record.get_first_span() start_time_nano = int(first_span['startTimeUnixNano']) record_time = dt.fromtimestamp(start_time_nano / 1e9) record.update_span(now, first_record_time, self.trace_id2new_id) self.record_handler(record) return record_time def stop(self): self.running = False def parse_meta(self): first_record = next(iter(self.record_reader.records_generator())) self.first_record_time = first_record.first_span_start_time self.record_reader.reset()