暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink 窗口能触发多少次

原创 James 2022-06-16
551

flink 版本: 1.14.3

注: 本文探讨 flink sql api 和 stream api 下窗口的触发机制

前几天有个同学问我这个问题,不清楚 Flink 窗口触发的机制,不知道窗口结束后还能触发几次。

先把这个问题分解成两个阶段:

  1. 窗口结束之前
  2. 窗口结束之后

预设几个窗口的场景,来说明这个问题。

Flink sql

通用的建表语句

CREATE TABLE user_log
(
    user_id     VARCHAR,
    item_id     VARCHAR,
    category_id VARCHAR,
    behavior    VARCHAR,
    ts          TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '1' MINUTES
) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_log'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'earliest-offset'
      ,'format' = 'json'
      );


CREATE TABLE user_log_sink
(
    start_time   timestamp(3),
    end_time     timestamp(3),
    coun         int
) WITH (
      'connector' = 'print'
);

都以事件时间为例,可以自己控制输入数据

Tumble 窗口

先看样例
sql 如下


insert into user_log_sink
select window_start, window_end, count(user_id)
from TABLE(
   TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' MINUTES));

  1. 窗口结束之前: 每个窗口结束时触发一次,窗口结束时间为 水印 达到 10 分时(实际事件时间为 11 分,因为水印时间为 事件时间 - 1 分钟,建表时设置)
  2. 窗口结束之后: 不会触发

测试数据

{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:00:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:10:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:10:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:09:30.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:11:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:10:30.001"}

留些问题大家思考一下,问题比较多,可以简单写一下,对理解窗口触发很有帮助:
问题1:窗口什么时候触发 ?
问题2:0 - 10 分这个窗口的 count 值是多少 ?

HOP 窗口

先看样例
sql 如下


insert into user_log_sink
select window_start, window_end, count(user_id)
from TABLE(
        HOP(TABLE user_log, DESCRIPTOR(ts), INTERVAL '2' MINUTES ,INTERVAL '10' MINUTES ));

  1. 窗口结束之前: 每个窗口结束时触发一次,窗口结束时间为水印到达窗口结束时间时,只是滑动窗口,每条数据都会属于多个窗口(窗口)
  2. 窗口结束之后: 不会触发

注: HOP 窗口,窗口步长和窗口长度必须成整数倍,不然会报错

Exception in thread "main" org.apache.flink.table.api.TableException: HOP table function based aggregate requires size must be an integral multiple of slide, but got size 600000 ms and slide 180000 ms
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregateBase.createSliceAssigner(StreamExecWindowAggregateBase.java:115)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregateBase.createSliceAssigner(StreamExecWindowAggregateBase.java:92)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate.translateToPlanInternal(StreamExecLocalWindowAggregate.java:125)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:75)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate.translateToPlanInternal(StreamExecGlobalWindowAggregate.java:139)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:96)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
	at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:124)
	at com.rookie.submit.main.SqlSubmit$.main(SqlSubmit.scala:91)
	at com.rookie.submit.main.SqlSubmit.main(SqlSubmit.scala)


测试数据


{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:00:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:01:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:02:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:03:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:04:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:05:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:06:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:07:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:08:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:09:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:10:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:09:30.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:11:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:09:31.001"}


SH 复制 全屏

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论