Trigger
Flink 触发器

有这样一个需求:要求使用Flink统计每天的某个指标的和,需要每个小时出一次结果或者每到一定数量的数据就计算一次,像这样:
2022-12-12 02:01:00 sum12022-12-12 02:02:00 sum22022-12-12 02:03:00 sum32022-12-12 02:04:00 sum4......
如果单纯的使用窗口函数,没办法按这样的效果出数据,因为窗口是固定的,每次计算都是整个窗口内的数据之和,而没法按小时递增上去。
想要实现这样的效果,我们可以使用Flink的触发器;窗口虽然是固定的,但是我们可以让它每隔一段时间或者接受到一定的数据量就触发一次计算。代码如下:
public static class MyTrigger<W extends Window> extends Trigger<Object , W>{private final long interval;private final long maxCount;private ReducingStateDescriptor<Long> intervalDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ReducingStateDescriptor<Long> maxCountDesc = new ReducingStateDescriptor<>("sum", new Sum(), LongSerializer.INSTANCE);private MyTrigger(long interval, long maxCount){this.interval = interval;this.maxCount = maxCount;}public static <W extends Window> MyTrigger<W> of(Time interval, long maxCount){return new MyTrigger<>(interval.toMilliseconds() -1, maxCount);}@Overridepublic TriggerResult onElement(Object o, long l, W w, TriggerContext triggerContext) throws Exception {if (w.maxTimestamp() <= triggerContext.getCurrentWatermark()){return TriggerResult.FIRE_AND_PURGE;} else {triggerContext.registerProcessingTimeTimer(w.maxTimestamp());}ReducingState<Long> intervalState = triggerContext.getPartitionedState(intervalDesc);ReducingState<Long> maxCountState = triggerContext.getPartitionedState(maxCountDesc);maxCountState.add(1L);if (intervalState.get() == null){registerNextTimer(l, w, triggerContext,intervalState);}if (maxCountState.get() >= maxCount){maxCountState.clear();System.err.println("maxCount到达阈值,触发计算");return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long l, W w, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long l, W w, TriggerContext triggerContext) throws Exception {if (l == w.maxTimestamp()){return TriggerResult.FIRE_AND_PURGE;}ReducingState<Long> maxCountState = triggerContext.getPartitionedState(maxCountDesc);Long count = maxCountState .get();ReducingState<Long> intervalState = triggerContext.getPartitionedState(intervalDesc);Long interval = intervalState.get();if (interval != null && interval == l){intervalState.clear();registerNextTimer(l, w, triggerContext, intervalState);if (count != null){System.err.println("触发定时器:"+ new Timestamp(l) + "");return TriggerResult.FIRE;}}return TriggerResult.CONTINUE;}@Overridepublic void clear(W w, TriggerContext triggerContext) throws Exception {triggerContext.getPartitionedState(maxCountDesc).clear();ReducingState<Long> intervalState = triggerContext.getPartitionedState(intervalDesc);Long timer = intervalState.get();if (timer != null){triggerContext.deleteProcessingTimeTimer(timer);intervalState.clear();}}public boolean canMerge(){return true;}public void onMerge(W window, OnMergeContext ctx) throws Exception {// 合并数量间隔的状态ctx.mergePartitionedState(maxCountDesc);// 合并时间间隔的状态ctx.mergePartitionedState(intervalDesc);Long interval = ctx.getPartitionedState(intervalDesc).get();if (interval != null) {ctx.registerEventTimeTimer(interval);}}private static class Min implements ReduceFunction<Long> {@Overridepublic Long reduce(Long aLong, Long t1) throws Exception {return Math.min(aLong, t1);}}private static class Sum implements ReduceFunction<Long>{@Overridepublic Long reduce(Long aLong, Long t1) throws Exception {return aLong + t1;}}private void registerNextTimer(long time, W window, TriggerContext ctx,ReducingState<Long> intervalState) throws Exception {// 在下一个timer时间和窗口结束时间取最小值long nextTimer = Math.min(time + interval, window.maxTimestamp());intervalState.add(nextTimer);System.err.println("注册定时器:"+ new Timestamp(nextTimer));ctx.registerEventTimeTimer(nextTimer);}}
public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String , TimeWindow>{@Overridepublic void process(String s, ProcessWindowFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String, TimeWindow>.Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<Tuple3<String, Integer, String>> collector) throws Exception {Tuple3<String, Integer, String> tuple3 = new Tuple3<>();Integer sum = 0;String device_name = "";for (Tuple3<String, Integer, String> tuple : iterable){sum += tuple.f1;device_name = tuple.f0;}tuple3.f0 = device_name;tuple3.f1 = sum;tuple3.f2 = new Timestamp(context.currentProcessingTime()).toString();collector.collect(tuple3);}}
WatermarkStrategy<Tuple3<String, Integer, String>> tuple3WatermarkStrategy = WatermarkStrategy.<Tuple3<String, Integer, String>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> stringIntegerStringTuple3, long l) {return System.currentTimeMillis();}});DataStream<Tuple3<String, Integer, String>> tuple3DataStream2 = tuple3DataStream1.assignTimestampsAndWatermarks(tuple3WatermarkStrategy).keyBy(event -> event.f0)//.process(new MyKeyedFunction());.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))).trigger(MyTrigger.of(Time.seconds(30) ,100000L)).process(new MyProcessWindowFunction());
动动手指

关注我们
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




