event_time_timer.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. ################################################################################
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. ################################################################################
  18. from pyflink.common import Time, WatermarkStrategy, Duration
  19. from pyflink.common.typeinfo import Types
  20. from pyflink.common.watermark_strategy import TimestampAssigner
  21. from pyflink.datastream import StreamExecutionEnvironment
  22. from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
  23. from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
  24. class Sum(KeyedProcessFunction):
  25. def __init__(self):
  26. self.state = None
  27. def open(self, runtime_context: RuntimeContext):
  28. state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
  29. state_ttl_config = StateTtlConfig \
  30. .new_builder(Time.seconds(1)) \
  31. .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
  32. .disable_cleanup_in_background() \
  33. .build()
  34. state_descriptor.enable_time_to_live(state_ttl_config)
  35. self.state = runtime_context.get_state(state_descriptor)
  36. def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
  37. # retrieve the current count
  38. current = self.state.value()
  39. if current is None:
  40. current = 0
  41. # update the state's count
  42. current += value[2]
  43. self.state.update(current)
  44. # register an event time timer 2 seconds later
  45. ctx.timer_service().register_event_time_timer(ctx.timestamp() + 2000)
  46. def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
  47. yield ctx.get_current_key(), self.state.value()
  48. class MyTimestampAssigner(TimestampAssigner):
  49. def extract_timestamp(self, value, record_timestamp: int) -> int:
  50. return int(value[0])
  51. def event_timer_timer_demo():
  52. env = StreamExecutionEnvironment.get_execution_environment()
  53. ds = env.from_collection(
  54. collection=[
  55. (1000, 'Alice', 110.1),
  56. (4000, 'Bob', 30.2),
  57. (3000, 'Alice', 20.0),
  58. (2000, 'Bob', 53.1),
  59. (5000, 'Alice', 13.1),
  60. (3000, 'Bob', 3.1),
  61. (7000, 'Bob', 16.1),
  62. (10000, 'Alice', 20.1)
  63. ],
  64. type_info=Types.TUPLE([Types.LONG(), Types.STRING(), Types.FLOAT()]))
  65. ds = ds.assign_timestamps_and_watermarks(
  66. WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(2))
  67. .with_timestamp_assigner(MyTimestampAssigner()))
  68. # apply the process function onto a keyed stream
  69. ds.key_by(lambda value: value[1]) \
  70. .process(Sum()) \
  71. .print()
  72. # submit for execution
  73. env.execute()
  74. if __name__ == '__main__':
  75. event_timer_timer_demo()