flink 版本: 1.14.3
注: 本文探讨 flink sql api 和 stream api 下窗口的触发机制
前几天有个同学问我这个问题,不清楚 Flink 窗口触发的机制,不知道窗口结束后还能触发几次。
先把这个问题分解成两个阶段:
- 窗口结束之前
- 窗口结束之后
预设几个窗口的场景,来说明这个问题。
Flink sql
通用的建表语句
CREATE TABLE user_log
(
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '1' MINUTES
) WITH (
'connector' = 'kafka'
,'topic' = 'user_log'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'json'
);
CREATE TABLE user_log_sink
(
start_time timestamp(3),
end_time timestamp(3),
coun int
) WITH (
'connector' = 'print'
);
都以事件时间为例,可以自己控制输入数据
Tumble 窗口
先看样例
sql 如下
insert into user_log_sink
select window_start, window_end, count(user_id)
from TABLE(
TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' MINUTES));
- 窗口结束之前: 每个窗口结束时触发一次,窗口结束时间为 水印 达到 10 分时(实际事件时间为 11 分,因为水印时间为 事件时间 - 1 分钟,建表时设置)
- 窗口结束之后: 不会触发
测试数据
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:00:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:10:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:10:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:09:30.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:11:00.001"}
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:10:30.001"}

留些问题大家思考一下,问题比较多,可以简单写一下,对理解窗口触发很有帮助:
问题1:窗口什么时候触发 ?
问题2:0 - 10 分这个窗口的 count 值是多少 ?
HOP 窗口
先看样例
sql 如下
insert into user_log_sink
select window_start, window_end, count(user_id)
from TABLE(
HOP(TABLE user_log, DESCRIPTOR(ts), INTERVAL '2' MINUTES ,INTERVAL '10' MINUTES ));
- 窗口结束之前: 每个窗口结束时触发一次,窗口结束时间为水印到达窗口结束时间时,只是滑动窗口,每条数据都会属于多个窗口(窗口)
- 窗口结束之后: 不会触发
注: HOP 窗口,窗口步长和窗口长度必须成整数倍,不然会报错
Exception in thread "main" org.apache.flink.table.api.TableException: HOP table function based aggregate requires size must be an integral multiple of slide, but got size 600000 ms and slide 180000 ms
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregateBase.createSliceAssigner(StreamExecWindowAggregateBase.java:115)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregateBase.createSliceAssigner(StreamExecWindowAggregateBase.java:92)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate.translateToPlanInternal(StreamExecLocalWindowAggregate.java:125)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:75)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate.translateToPlanInternal(StreamExecGlobalWindowAggregate.java:139)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:96)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:124)
at com.rookie.submit.main.SqlSubmit$.main(SqlSubmit.scala:91)
at com.rookie.submit.main.SqlSubmit.main(SqlSubmit.scala)
测试数据
{"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:00:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:01:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:02:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:03:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:04:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:05:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:06:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:07:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:08:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:09:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:10:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:09:30.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:11:00.001"} {"category_id":40,"user_id":"1","item_id":"40972","behavior":"pv","ts":"2022-04-22 00:09:31.001"}SH
复制 全屏
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




