暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

拿来即用 | Flink 触发器实战!

大数据技能圈 2023-09-06
36

Trigger

Flink 触发器

有这样一个需求:要求使用Flink统计每天的某个指标的和,需要每个小时出一次结果或者每到一定数量的数据就计算一次,像这样:

    2022-12-12 02:01:00  sum1
    2022-12-12 02:02:00  sum2
    2022-12-12 02:03:00  sum3
    2022-12-12 02:04:00  sum4
    ......

    如果单纯的使用窗口函数,没办法按这样的效果出数据,因为窗口是固定的,每次计算都是整个窗口内的数据之和,而没法按小时递增上去。

    想要实现这样的效果,我们可以使用Flink的触发器;窗口虽然是固定的,但是我们可以让它每隔一段时间或者接受到一定的数据量就触发一次计算。代码如下:

    1

    trigger 代码

      public static class MyTrigger<W extends Window> extends Trigger<Object , W>{




      private final long interval;
      private final long maxCount;


      private ReducingStateDescriptor<Long> intervalDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
      private ReducingStateDescriptor<Long> maxCountDesc = new ReducingStateDescriptor<>("sum", new Sum(), LongSerializer.INSTANCE);


      private MyTrigger(long interval, long maxCount){
      this.interval = interval;
      this.maxCount = maxCount;
      }


      public static <W extends Window> MyTrigger<W> of(Time interval, long maxCount){
      return new MyTrigger<>(interval.toMilliseconds() -1, maxCount);
      }


      @Override
      public TriggerResult onElement(Object o, long l, W w, TriggerContext triggerContext) throws Exception {
      if (w.maxTimestamp() <= triggerContext.getCurrentWatermark()){
      return TriggerResult.FIRE_AND_PURGE;
      } else {
      triggerContext.registerProcessingTimeTimer(w.maxTimestamp());
      }


      ReducingState<Long> intervalState = triggerContext.getPartitionedState(intervalDesc);
      ReducingState<Long> maxCountState = triggerContext.getPartitionedState(maxCountDesc);


      maxCountState.add(1L);
      if (intervalState.get() == null){
      registerNextTimer(l, w, triggerContext,intervalState);
      }
      if (maxCountState.get() >= maxCount){
      maxCountState.clear();
      System.err.println("maxCount到达阈值,触发计算");
      return TriggerResult.FIRE;
      }
      return TriggerResult.CONTINUE;
      }


      @Override
      public TriggerResult onProcessingTime(long l, W w, TriggerContext triggerContext) throws Exception {
      return TriggerResult.CONTINUE;
      }


      @Override
      public TriggerResult onEventTime(long l, W w, TriggerContext triggerContext) throws Exception {
      if (l == w.maxTimestamp()){
      return TriggerResult.FIRE_AND_PURGE;
      }
      ReducingState<Long> maxCountState = triggerContext.getPartitionedState(maxCountDesc);
      Long count = maxCountState .get();
      ReducingState<Long> intervalState = triggerContext.getPartitionedState(intervalDesc);
      Long interval = intervalState.get();
      if (interval != null && interval == l){
      intervalState.clear();
      registerNextTimer(l, w, triggerContext, intervalState);
      if (count != null){
      System.err.println("触发定时器:"+ new Timestamp(l) + "");
      return TriggerResult.FIRE;
      }
      }
      return TriggerResult.CONTINUE;
      }


      @Override
      public void clear(W w, TriggerContext triggerContext) throws Exception {
      triggerContext.getPartitionedState(maxCountDesc).clear();


      ReducingState<Long> intervalState = triggerContext.getPartitionedState(intervalDesc);
      Long timer = intervalState.get();
      if (timer != null){
      triggerContext.deleteProcessingTimeTimer(timer);
      intervalState.clear();
      }


      }


      public boolean canMerge(){
      return true;
      }


      public void onMerge(W window, OnMergeContext ctx) throws Exception {
      // 合并数量间隔的状态
      ctx.mergePartitionedState(maxCountDesc);
      // 合并时间间隔的状态
      ctx.mergePartitionedState(intervalDesc);
      Long interval = ctx.getPartitionedState(intervalDesc).get();
      if (interval != null) {
      ctx.registerEventTimeTimer(interval);
      }
      }


      private static class Min implements ReduceFunction<Long> {


      @Override
      public Long reduce(Long aLong, Long t1) throws Exception {
      return Math.min(aLong, t1);
      }
      }


      private static class Sum implements ReduceFunction<Long>{


      @Override
      public Long reduce(Long aLong, Long t1) throws Exception {
      return aLong + t1;
      }
      }


      private void registerNextTimer(
      long time, W window, TriggerContext ctx,
      ReducingState<Long> intervalState) throws Exception {
      // 在下一个timer时间和窗口结束时间取最小值
      long nextTimer = Math.min(time + interval, window.maxTimestamp());
      intervalState.add(nextTimer);
      System.err.println("注册定时器:"+ new Timestamp(nextTimer));
      ctx.registerEventTimeTimer(nextTimer);
      }
          }

      2

      业务处理逻辑代码

        public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String , TimeWindow>{


        @Override
        public void process(String s, ProcessWindowFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String, TimeWindow>.Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
        Tuple3<String, Integer, String> tuple3 = new Tuple3<>();
        Integer sum = 0;
        String device_name = "";
        for (Tuple3<String, Integer, String> tuple : iterable){
        sum += tuple.f1;
        device_name = tuple.f0;
        }
        tuple3.f0 = device_name;
        tuple3.f1 = sum;
        tuple3.f2 = new Timestamp(context.currentProcessingTime()).toString();
        collector.collect(tuple3);
        }
        }

        3

        调用trigger和业务代码

          WatermarkStrategy<Tuple3<String, Integer, String>> tuple3WatermarkStrategy = WatermarkStrategy
          .<Tuple3<String, Integer, String>>forMonotonousTimestamps().withTimestampAssigner(
          new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
          @Override
          public long extractTimestamp(Tuple3<String, Integer, String> stringIntegerStringTuple3, long l) {
          return System.currentTimeMillis();
          }
          }
          );




          DataStream<Tuple3<String, Integer, String>> tuple3DataStream2 = tuple3DataStream1
          .assignTimestampsAndWatermarks(tuple3WatermarkStrategy)
          .keyBy(event -> event.f0)
          //.process(new MyKeyedFunction());
          .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
          .trigger(MyTrigger.of(Time.seconds(30) ,100000L))
                          .process(new MyProcessWindowFunction());

          动动手指

          关注我们

          文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论