1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- ################################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ################################################################################
- from pyflink.common import Time, WatermarkStrategy, Duration
- from pyflink.common.typeinfo import Types
- from pyflink.common.watermark_strategy import TimestampAssigner
- from pyflink.datastream import StreamExecutionEnvironment
- from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
- from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
- class Sum(KeyedProcessFunction):
- def __init__(self):
- self.state = None
- def open(self, runtime_context: RuntimeContext):
- state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
- state_ttl_config = StateTtlConfig \
- .new_builder(Time.seconds(1)) \
- .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
- .disable_cleanup_in_background() \
- .build()
- state_descriptor.enable_time_to_live(state_ttl_config)
- self.state = runtime_context.get_state(state_descriptor)
- def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
- # retrieve the current count
- current = self.state.value()
- if current is None:
- current = 0
- # update the state's count
- current += value[2]
- self.state.update(current)
- # register an event time timer 2 seconds later
- ctx.timer_service().register_event_time_timer(ctx.timestamp() + 2000)
- def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
- yield ctx.get_current_key(), self.state.value()
- class MyTimestampAssigner(TimestampAssigner):
- def extract_timestamp(self, value, record_timestamp: int) -> int:
- return int(value[0])
- def event_timer_timer_demo():
- env = StreamExecutionEnvironment.get_execution_environment()
- ds = env.from_collection(
- collection=[
- (1000, 'Alice', 110.1),
- (4000, 'Bob', 30.2),
- (3000, 'Alice', 20.0),
- (2000, 'Bob', 53.1),
- (5000, 'Alice', 13.1),
- (3000, 'Bob', 3.1),
- (7000, 'Bob', 16.1),
- (10000, 'Alice', 20.1)
- ],
- type_info=Types.TUPLE([Types.LONG(), Types.STRING(), Types.FLOAT()]))
- ds = ds.assign_timestamps_and_watermarks(
- WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(2))
- .with_timestamp_assigner(MyTimestampAssigner()))
- # apply the process function onto a keyed stream
- ds.key_by(lambda value: value[1]) \
- .process(Sum()) \
- .print()
- # submit for execution
- env.execute()
- if __name__ == '__main__':
- event_timer_timer_demo()
|