kafka_avro_format.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. ################################################################################
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. ################################################################################
  18. import logging
  19. import sys
  20. from pyflink.common import Types
  21. from pyflink.datastream import StreamExecutionEnvironment
  22. from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
  23. from pyflink.datastream.formats.avro import AvroRowSerializationSchema, AvroRowDeserializationSchema
  24. # Make sure that the Kafka cluster is started and the topic 'test_avro_topic' is
  25. # created before executing this job.
  26. def write_to_kafka(env):
  27. ds = env.from_collection([
  28. (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
  29. type_info=Types.ROW([Types.INT(), Types.STRING()]))
  30. serialization_schema = AvroRowSerializationSchema(
  31. avro_schema_string="""
  32. {
  33. "type": "record",
  34. "name": "TestRecord",
  35. "fields": [
  36. {"name": "id", "type": "int"},
  37. {"name": "name", "type": "string"}
  38. ]
  39. }"""
  40. )
  41. kafka_producer = FlinkKafkaProducer(
  42. topic='test_avro_topic',
  43. serialization_schema=serialization_schema,
  44. producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
  45. )
  46. # note that the output type of ds must be RowTypeInfo
  47. ds.add_sink(kafka_producer)
  48. env.execute()
  49. def read_from_kafka(env):
  50. deserialization_schema = AvroRowDeserializationSchema(
  51. avro_schema_string="""
  52. {
  53. "type": "record",
  54. "name": "TestRecord",
  55. "fields": [
  56. {"name": "id", "type": "int"},
  57. {"name": "name", "type": "string"}
  58. ]
  59. }"""
  60. )
  61. kafka_consumer = FlinkKafkaConsumer(
  62. topics='test_avro_topic',
  63. deserialization_schema=deserialization_schema,
  64. properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_1'}
  65. )
  66. kafka_consumer.set_start_from_earliest()
  67. env.add_source(kafka_consumer).print()
  68. env.execute()
  69. if __name__ == '__main__':
  70. logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  71. env = StreamExecutionEnvironment.get_execution_environment()
  72. env.add_jars("file:///path/to/flink-sql-avro-1.15.0.jar",
  73. "file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
  74. print("start writing data to kafka")
  75. write_to_kafka(env)
  76. print("start reading data from kafka")
  77. read_from_kafka(env)