

窗口(Window)是 Flink 众多优势之一。
窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理
随着实时数仓和 flink sql 越来越成熟,flink sql 又变成 flink 中最常用的 api
在这种场景下,我们来看看 flink sql 窗口的源码。
(练习一下表达能力,扯一些有的没的)
窗口样例
先来看 flink sql Group Windows 的写法:
select date_format(now(), 'yyyy-MM-dd HH:mm:ss')
,date_format(TUMBLE_START(proc_time, INTERVAL '1' minute), 'yyyy-MM-dd HH:mm:ss') AS wStart
,date_format(TUMBLE_END(proc_time, INTERVAL '1' minute), 'yyyy-MM-dd HH:mm:ss') AS wEnd
,count(user_id) pv
,count(distinct user_id) uv
,max(user_id)
from user_log
group by TUMBLE(proc_time, INTERVAL '1' minute)
;
完整sql 参加 github sqlSubmit kafka_window_agg.sql
Group Windows 源码
先看下执行流图:

先来看个简单的 Group Windows 的源码,Group Windows 就是和 Stream api 类似的 window,通过解析 sql ,调用 StreamExecGroupWindowAggregate.translateToPlanInternal 创建 window operator
StreamExecGroupWindowAggregate.translateToPlanInternal
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
final boolean isCountWindow;
......
// agg 函数 见下图
final AggregateInfoList aggInfoList =
transformToStreamAggregateInfoList(
inputRowType,
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
aggCallNeedRetractions,
needRetraction,
true, // isStateBackendDataViews
true); // needDistinctInfo
// 生成的 agg 函数代码
final GeneratedClass<?> aggCodeGenerator =
createAggsHandler(
aggInfoList,
config,
planner.getRelBuilder(),
inputRowType.getChildren(),
shiftTimeZone);
.......
// 创建 窗口算子
final WindowOperator<?, ?> operator =
createWindowOperator(
config,
aggCodeGenerator,
equaliser,
accTypes,
windowPropertyTypes,
aggValueTypes,
inputRowType.getChildren().toArray(new LogicalType[0]),
inputTimeFieldIndex,
shiftTimeZone,
inputCountIndex);
.........
}
- 注: 代码比较长,截取部分
aggInfoList : 
aggCodeGenerator: 
StreamExecGroupWindowAggregate.createWindowOperator
private WindowOperator<?, ?> createWindowOperator(
ReadableConfig config,
GeneratedClass<?> aggsHandler,
GeneratedRecordEqualiser recordEqualiser,
LogicalType[] accTypes,
LogicalType[] windowPropertyTypes,
LogicalType[] aggValueTypes,
LogicalType[] inputFields,
int timeFieldIndex,
ZoneId shiftTimeZone,
int inputCountIndex) {
WindowOperatorBuilder builder =
WindowOperatorBuilder.builder()
.withInputFields(inputFields)
.withShiftTimezone(shiftTimeZone)
.withInputCountIndex(inputCountIndex);
// 根据窗口类型窗口对应的 窗口 builder
if (window instanceof TumblingGroupWindow) {
// 滚动窗口
TumblingGroupWindow tumblingWindow = (TumblingGroupWindow) window;
FieldReferenceExpression timeField = tumblingWindow.timeField();
ValueLiteralExpression size = tumblingWindow.size();
if (isProctimeAttribute(timeField) && hasTimeIntervalType(size)) {
// 处理时间窗口
builder = builder.tumble(toDuration(size)).withProcessingTime();
} else if (isRowtimeAttribute(timeField) && hasTimeIntervalType(size)) {
// 事件时间窗口
builder = builder.tumble(toDuration(size)).withEventTime(timeFieldIndex);
} else if (isProctimeAttribute(timeField) && hasRowIntervalType(size)) {
// count 窗口
builder = builder.countWindow(toLong(size));
} else {
// TODO: EventTimeTumblingGroupWindow should sort the stream on event time
// before applying the windowing logic. Otherwise, this would be the same as a
// ProcessingTimeTumblingGroupWindow
throw new UnsupportedOperationException(
"Event-time grouping windows on row intervals are currently not supported.");
}
} else if (window instanceof SlidingGroupWindow) {
// 滑动窗口
.....
}
} else if (window instanceof SessionGroupWindow) {
// session 窗口
.....
} else {
throw new TableException("Unsupported window: " + window.toString());
}
// 窗口触发策略,比如示例 sql ,事件时间,窗口结束才触发
WindowEmitStrategy emitStrategy = WindowEmitStrategy.apply(config, window);
if (emitStrategy.produceUpdates()) {
// mark this operator will send retraction and set new trigger
builder.produceUpdates()
.triggering(emitStrategy.getTrigger())
.withAllowedLateness(Duration.ofMillis(emitStrategy.getAllowLateness()));
}
if (aggsHandler instanceof GeneratedNamespaceAggsHandleFunction) {
// agg
return builder.aggregate(
(GeneratedNamespaceAggsHandleFunction<?>) aggsHandler,
recordEqualiser,
accTypes,
aggValueTypes,
windowPropertyTypes)
// 创建 AggregateWindowOperator
.build();
} else if (aggsHandler instanceof GeneratedNamespaceTableAggsHandleFunction) {
// table agg
return builder.aggregate(
(GeneratedNamespaceTableAggsHandleFunction<?>) aggsHandler,
accTypes,
aggValueTypes,
windowPropertyTypes)
.build();
} else {
throw new TableException(
"Unsupported agg handler class: " + aggsHandler.getClass().getSimpleName());
}
}
窗口触发策略: 
-----------补充----------
如果配置了提前触发参数:
set table.exec.emit.early-fire.enabled = true;
set table.exec.emit.early-fire.delay = 5000;「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




