process_json_data.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. ################################################################################
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. ################################################################################
  18. import logging
  19. import sys
  20. from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, TableDescriptor,
  21. Schema)
  22. from pyflink.table.expressions import col
  23. def process_json_data():
  24. t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
  25. # define the source
  26. table = t_env.from_elements(
  27. elements=[
  28. (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
  29. (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
  30. (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
  31. (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
  32. ],
  33. schema=['id', 'data'])
  34. # define the sink
  35. t_env.create_temporary_table(
  36. 'sink',
  37. TableDescriptor.for_connector('print')
  38. .schema(Schema.new_builder()
  39. .column('id', DataTypes.BIGINT())
  40. .column('data', DataTypes.STRING())
  41. .build())
  42. .build())
  43. table = table.select(col('id'), col('data').json_value('$.addr.country', DataTypes.STRING()))
  44. # execute
  45. table.execute_insert('sink') \
  46. .wait()
  47. # remove .wait if submitting to a remote cluster, refer to
  48. # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
  49. # for more details
  50. if __name__ == '__main__':
  51. logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  52. process_json_data()