暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink sql Group Windows 窗口源码解析

原创 yukits 2022-06-16
1188

窗口(Window)是 Flink 众多优势之一。

窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理

随着实时数仓和 flink sql 越来越成熟,flink sql 又变成 flink 中最常用的 api

在这种场景下,我们来看看 flink sql 窗口的源码。

(练习一下表达能力,扯一些有的没的)

窗口样例

先来看 flink sql Group Windows 的写法:

select date_format(now(), 'yyyy-MM-dd HH:mm:ss')
     ,date_format(TUMBLE_START(proc_time, INTERVAL '1' minute), 'yyyy-MM-dd HH:mm:ss') AS wStart
     ,date_format(TUMBLE_END(proc_time, INTERVAL '1' minute), 'yyyy-MM-dd HH:mm:ss') AS wEnd
     ,count(user_id) pv
     ,count(distinct user_id) uv
     ,max(user_id)
from user_log
group by TUMBLE(proc_time, INTERVAL '1' minute)
;

完整sql 参加 github sqlSubmit kafka_window_agg.sql

Group Windows 源码

先看下执行流图:

Group Windows

先来看个简单的 Group Windows 的源码,Group Windows 就是和 Stream api 类似的 window,通过解析 sql ,调用 StreamExecGroupWindowAggregate.translateToPlanInternal 创建 window operator

StreamExecGroupWindowAggregate.translateToPlanInternal


  @SuppressWarnings("unchecked")
    @Override
    protected Transformation<RowData> translateToPlanInternal(
            PlannerBase planner, ExecNodeConfig config) {
        final boolean isCountWindow;
        
        ......

        // agg 函数 见下图
        final AggregateInfoList aggInfoList =
                transformToStreamAggregateInfoList(
                        inputRowType,
                        JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
                        aggCallNeedRetractions,
                        needRetraction,
                        true, // isStateBackendDataViews
                        true); // needDistinctInfo
        // 生成的 agg 函数代码
        final GeneratedClass<?> aggCodeGenerator =
                createAggsHandler(
                        aggInfoList,
                        config,
                        planner.getRelBuilder(),
                        inputRowType.getChildren(),
                        shiftTimeZone);

        .......

        // 创建 窗口算子
        final WindowOperator<?, ?> operator =
                createWindowOperator(
                        config,
                        aggCodeGenerator,
                        equaliser,
                        accTypes,
                        windowPropertyTypes,
                        aggValueTypes,
                        inputRowType.getChildren().toArray(new LogicalType[0]),
                        inputTimeFieldIndex,
                        shiftTimeZone,
                        inputCountIndex);
        .........
    }

  • 注: 代码比较长,截取部分

aggInfoList : 

aggCodeGenerator: 

StreamExecGroupWindowAggregate.createWindowOperator

    private WindowOperator<?, ?> createWindowOperator(
            ReadableConfig config,
            GeneratedClass<?> aggsHandler,
            GeneratedRecordEqualiser recordEqualiser,
            LogicalType[] accTypes,
            LogicalType[] windowPropertyTypes,
            LogicalType[] aggValueTypes,
            LogicalType[] inputFields,
            int timeFieldIndex,
            ZoneId shiftTimeZone,
            int inputCountIndex) {
        WindowOperatorBuilder builder =
                WindowOperatorBuilder.builder()
                        .withInputFields(inputFields)
                        .withShiftTimezone(shiftTimeZone)
                        .withInputCountIndex(inputCountIndex);
        // 根据窗口类型窗口对应的 窗口 builder 
        if (window instanceof TumblingGroupWindow) {
        	// 滚动窗口
            TumblingGroupWindow tumblingWindow = (TumblingGroupWindow) window;
            FieldReferenceExpression timeField = tumblingWindow.timeField();
            ValueLiteralExpression size = tumblingWindow.size();
            if (isProctimeAttribute(timeField) && hasTimeIntervalType(size)) {
            	// 处理时间窗口
                builder = builder.tumble(toDuration(size)).withProcessingTime();
            } else if (isRowtimeAttribute(timeField) && hasTimeIntervalType(size)) {
            	// 事件时间窗口
                builder = builder.tumble(toDuration(size)).withEventTime(timeFieldIndex);
            } else if (isProctimeAttribute(timeField) && hasRowIntervalType(size)) {
            	// count 窗口
                builder = builder.countWindow(toLong(size));
            } else {
                // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
                // before applying the  windowing logic. Otherwise, this would be the same as a
                // ProcessingTimeTumblingGroupWindow
                throw new UnsupportedOperationException(
                        "Event-time grouping windows on row intervals are currently not supported.");
            }
        } else if (window instanceof SlidingGroupWindow) {
        	// 滑动窗口
            
            .....
            }
        } else if (window instanceof SessionGroupWindow) {
        	// session 窗口
            .....
        } else {
            throw new TableException("Unsupported window: " + window.toString());
        }

        // 窗口触发策略,比如示例 sql ,事件时间,窗口结束才触发
        WindowEmitStrategy emitStrategy = WindowEmitStrategy.apply(config, window);
        if (emitStrategy.produceUpdates()) {
            // mark this operator will send retraction and set new trigger
            builder.produceUpdates()
                    .triggering(emitStrategy.getTrigger())
                    .withAllowedLateness(Duration.ofMillis(emitStrategy.getAllowLateness()));
        }

        if (aggsHandler instanceof GeneratedNamespaceAggsHandleFunction) {
        	// agg 
            return builder.aggregate(
                            (GeneratedNamespaceAggsHandleFunction<?>) aggsHandler,
                            recordEqualiser,
                            accTypes,
                            aggValueTypes,
                            windowPropertyTypes)
            		// 创建 AggregateWindowOperator
                    .build();
        } else if (aggsHandler instanceof GeneratedNamespaceTableAggsHandleFunction) {
        	// table agg
            return builder.aggregate(
                            (GeneratedNamespaceTableAggsHandleFunction<?>) aggsHandler,
                            accTypes,
                            aggValueTypes,
                            windowPropertyTypes)
                    .build();
        } else {
            throw new TableException(
                    "Unsupported agg handler class: " + aggsHandler.getClass().getSimpleName());
        }
    }

窗口触发策略: 

-----------补充----------
如果配置了提前触发参数:

set table.exec.emit.early-fire.enabled = true;
set table.exec.emit.early-fire.delay = 5000;
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论