12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- import logging
- from datetime import datetime as dt
- import os
- import json
- from oteldatareplay.record_reader import RecordReader
- dt_format = '%Y-%m-%d %H:%M:%S.%f'
- logger = logging.getLogger(__name__)
- class MetaParser:
- def __init__(self, reader):
- self.trace_id2new_id = None
- self.first_record_time = None
- self.span_id2new_id = None
- self.record_reader: RecordReader = reader
- self.meta = {}
- def parse_meta(self):
- data_dir = os.path.dirname(self.record_reader.file_path)
- meta_path = os.path.join(data_dir, 'meta.json')
- if os.path.exists(meta_path):
- logger.info(f'read meta from {meta_path}')
- with open(meta_path, 'r') as f:
- meta = json.load(f)
- self.first_record_time = dt.strptime(meta['first_record_time'], dt_format)
- self.span_id2new_id = {k: None for k in meta['span_id2new_id']}
- self.trace_id2new_id = {k: None for k in meta['trace_id2new_id']}
- return
- logger.info(f'no meta file {meta_path} exists, need parse')
- first_record = next(iter(self.record_reader.records_generator()))
- self.first_record_time = first_record.first_span_start_time
- self.record_reader.reset()
- span_ids = []
- trace_ids = []
- for record in self.record_reader.records_generator():
- span_ids += record.span_ids
- trace_ids += trace_ids
- self.span_id2new_id = {tid: None for tid in trace_ids}
- self.trace_id2new_id = {sid: None for sid in span_ids}
- logger.info(f'meta parse done, write meta to {meta_path}')
- meta = {'first_record_time': self.first_record_time.strftime(dt_format),
- 'span_id2new_id': [k for k in self.span_id2new_id.keys()],
- 'trace_id2new_id': [k for k in self.trace_id2new_id.keys()]}
- with open(meta_path, 'w') as f:
- json.dump(meta, f)
- self.record_reader.reset()
|