import json import logging from kafka import KafkaProducer from oteldatareplay.resource_span_record import ResourceSpanRecord logger = logging.getLogger(__name__) class RecordKafkaSender: def __init__(self, kafka_server, topic, debug=False): self.producer: KafkaProducer | None = None self.kafka_server = kafka_server self.topic = topic self.kafka_client = None self.debug = debug def start(self): if self.debug: self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, linger_ms=0, batch_size=0, acks=1, max_request_size=1e10) else: self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, acks=1, max_request_size=1e10) def stop(self): self.producer.close() def send_failed(self, ex): logger.info(f"send failed to {self.topic}, {ex}") def send_record(self, record: ResourceSpanRecord): message = json.dumps(record.record).encode('utf-8') future = self.producer.send(self.topic, value=message) future.add_errback(self.send_failed)