1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- ################################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ################################################################################
- import json
- import logging
- import sys
- from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, TableDescriptor,
- Schema)
- from pyflink.table.expressions import col
- from pyflink.table.udf import udf
- def process_json_data_with_udf():
- t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
- # define the source
- table = t_env.from_elements(
- elements=[
- (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
- (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
- (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
- (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
- ],
- schema=['id', 'data'])
- # define the sink
- t_env.create_temporary_table(
- 'sink',
- TableDescriptor.for_connector('print')
- .schema(Schema.new_builder()
- .column('id', DataTypes.BIGINT())
- .column('data', DataTypes.STRING())
- .build())
- .build())
- # update json columns
- @udf(result_type=DataTypes.STRING())
- def update_tel(data):
- json_data = json.loads(data)
- json_data['tel'] += 1
- return json.dumps(json_data)
- table = table.select(col('id'), update_tel(col('data')))
- # execute
- table.execute_insert('sink') \
- .wait()
- # remove .wait if submitting to a remote cluster, refer to
- # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
- # for more details
- if __name__ == '__main__':
- logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
- process_json_data_with_udf()
|