import logging from datetime import datetime as dt import os import json from oteldatareplay.record_reader import RecordReader dt_format = '%Y-%m-%d %H:%M:%S.%f' logger = logging.getLogger(__name__) class MetaParser: def __init__(self, reader): self.trace_id2new_id = None self.first_record_time = None self.span_id2new_id = None self.record_reader: RecordReader = reader self.meta = {} def parse_meta(self): data_dir = os.path.dirname(self.record_reader.file_path) meta_path = os.path.join(data_dir, 'meta.json') if os.path.exists(meta_path): logger.info(f'read meta from {meta_path}') with open(meta_path, 'r') as f: meta = json.load(f) self.first_record_time = dt.strptime(meta['first_record_time'], dt_format) self.span_id2new_id = {k: None for k in meta['span_id2new_id']} self.trace_id2new_id = {k: None for k in meta['trace_id2new_id']} return logger.info(f'no meta file {meta_path} exists, need parse') first_record = next(iter(self.record_reader.records_generator())) self.first_record_time = first_record.first_span_start_time self.record_reader.reset() span_ids = [] trace_ids = [] for record in self.record_reader.records_generator(): span_ids += record.span_ids trace_ids += trace_ids self.span_id2new_id = {tid: None for tid in trace_ids} self.trace_id2new_id = {sid: None for sid in span_ids} logger.info(f'meta parse done, write meta to {meta_path}') meta = {'first_record_time': self.first_record_time.strftime(dt_format), 'span_id2new_id': [k for k in self.span_id2new_id.keys()], 'trace_id2new_id': [k for k in self.trace_id2new_id.keys()]} with open(meta_path, 'w') as f: json.dump(meta, f) self.record_reader.reset()