################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import json import logging import sys from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, TableDescriptor, Schema) from pyflink.table.expressions import col from pyflink.table.udf import udf def process_json_data_with_udf(): t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) # define the source table = t_env.from_elements( elements=[ (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'), (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'), (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'), (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}') ], schema=['id', 'data']) # define the sink t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('print') .schema(Schema.new_builder() .column('id', DataTypes.BIGINT()) .column('data', DataTypes.STRING()) .build()) .build()) # update json columns @udf(result_type=DataTypes.STRING()) def update_tel(data): json_data = json.loads(data) json_data['tel'] += 1 return json.dumps(json_data) table = table.select(col('id'), update_tel(col('data'))) # execute table.execute_insert('sink') \ .wait() # remove .wait if submitting to a remote cluster, refer to # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster # for more details if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") process_json_data_with_udf()