process_json_data.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. ################################################################################
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. ################################################################################
  18. import json
  19. import logging
  20. import sys
  21. from pyflink.datastream import StreamExecutionEnvironment
  22. def process_json_data():
  23. env = StreamExecutionEnvironment.get_execution_environment()
  24. # define the source
  25. ds = env.from_collection(
  26. collection=[
  27. (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
  28. (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
  29. (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
  30. (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')]
  31. )
  32. def update_tel(data):
  33. # parse the json
  34. json_data = json.loads(data[1])
  35. json_data['tel'] += 1
  36. return data[0], json_data
  37. def filter_by_country(data):
  38. # the json data could be accessed directly, there is no need to parse it again using
  39. # json.loads
  40. return "China" in data[1]['addr']['country']
  41. ds.map(update_tel).filter(filter_by_country).print()
  42. # submit for execution
  43. env.execute()
  44. if __name__ == '__main__':
  45. logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  46. process_json_data()