record_kafka_sender.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import json
  2. import logging
  3. from kafka import KafkaProducer
  4. from oteldatareplay.resource_span_record import ResourceSpanRecord
  5. logger = logging.getLogger(__name__)
  6. class RecordKafkaSender:
  7. def __init__(self, kafka_server, topic, debug=False):
  8. self.producer: KafkaProducer | None = None
  9. self.kafka_server = kafka_server
  10. self.topic = topic
  11. self.kafka_client = None
  12. self.debug = debug
  13. def start(self):
  14. if self.debug:
  15. self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, linger_ms=0, batch_size=0, acks=1,
  16. max_request_size=1e10)
  17. else:
  18. self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, acks=1,
  19. max_request_size=1e10)
  20. def stop(self):
  21. self.producer.close()
  22. def send_failed(self, ex):
  23. logger.info(f"send failed to {self.topic}, {ex}")
  24. def send_record(self, record: ResourceSpanRecord):
  25. message = json.dumps(record.record).encode('utf-8')
  26. future = self.producer.send(self.topic, value=message)
  27. future.add_errback(self.send_failed)