12345678910111213141516171819202122232425262728293031323334353637 |
- import json
- import logging
- from kafka import KafkaProducer
- from oteldatareplay.resource_span_record import ResourceSpanRecord
- logger = logging.getLogger(__name__)
- class RecordKafkaSender:
- def __init__(self, kafka_server, topic, debug=False):
- self.producer: KafkaProducer | None = None
- self.kafka_server = kafka_server
- self.topic = topic
- self.kafka_client = None
- self.debug = debug
- def start(self):
- if self.debug:
- self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, linger_ms=0, batch_size=0, acks=1,
- max_request_size=1e10)
- else:
- self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, acks=1,
- max_request_size=1e10)
- def stop(self):
- self.producer.close()
- def send_failed(self, ex):
- logger.info(f"send failed to {self.topic}, {ex}")
- def send_record(self, record: ResourceSpanRecord):
- message = json.dumps(record.record).encode('utf-8')
- future = self.producer.send(self.topic, value=message)
- future.add_errback(self.send_failed)
|