state_access.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. ################################################################################
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. ################################################################################
  18. from pyflink.common import Time
  19. from pyflink.common.typeinfo import Types
  20. from pyflink.datastream import StreamExecutionEnvironment
  21. from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
  22. from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
  23. class Sum(KeyedProcessFunction):
  24. def __init__(self):
  25. self.state = None
  26. def open(self, runtime_context: RuntimeContext):
  27. state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
  28. state_ttl_config = StateTtlConfig \
  29. .new_builder(Time.seconds(1)) \
  30. .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
  31. .disable_cleanup_in_background() \
  32. .build()
  33. state_descriptor.enable_time_to_live(state_ttl_config)
  34. self.state = runtime_context.get_state(state_descriptor)
  35. def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
  36. # retrieve the current count
  37. current = self.state.value()
  38. if current is None:
  39. current = 0
  40. # update the state's count
  41. current += value[1]
  42. self.state.update(current)
  43. yield value[0], current
  44. def state_access_demo():
  45. env = StreamExecutionEnvironment.get_execution_environment()
  46. ds = env.from_collection(
  47. collection=[
  48. ('Alice', 110.1),
  49. ('Bob', 30.2),
  50. ('Alice', 20.0),
  51. ('Bob', 53.1),
  52. ('Alice', 13.1),
  53. ('Bob', 3.1),
  54. ('Bob', 16.1),
  55. ('Alice', 20.1)
  56. ],
  57. type_info=Types.TUPLE([Types.STRING(), Types.FLOAT()]))
  58. # apply the process function onto a keyed stream
  59. ds.key_by(lambda value: value[0]) \
  60. .process(Sum()) \
  61. .print()
  62. # submit for execution
  63. env.execute()
  64. if __name__ == '__main__':
  65. state_access_demo()