暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink1.15 SQL实现翻滚窗口实时计算

大数据研习社 2022-05-07
681

长按二维码关注

大数据领域必关注的公众号

By大数据研习社

概要:1.随着Flink1.5的发布,FlinkSQL 流批一体更加成熟与完善。

2.Flink SQL可以替代Flink DataStream实现窗口计算。



1 Flink 翻滚窗口适用场景


1.1 定义

将数据依据固定的窗口度对无界数据流进行切片。


1.2 特点

时间对、窗口长度固定、event无重叠

1.3 适用场景

BI统计(计算各个时间段的指标)
 


2 Flink SQL窗口编程模型


Table table = input
 .window([GroupWindow w].as("w")) 定义一个group window并指定别名w
 .groupBy($("w")) / 按照窗口wtable进行分组
 .select($("b").sum()); // select子句指定返回的列和聚合运算(非键控(key)window)
 
Table table = input
 .window([GroupWindow w].as("w")) // 定义一个group window并指定别名w
 .groupBy($("w"), $("a")) // 按照属性a和窗口wtable进行分组(键控的window
 .select($("a"), $("b").sum()); // select子句指定返回的列和聚合运算
 
select子句中,我们还可以返回Window的属性:startendrowtime
 


注意:基于时间的窗口是左闭右开的,例如从9点开始创建一个1个时的窗口,则start09:00:00.000end10:00:00.000rowtime09:59:59.999。时间戳正好等于endevent是不会被分组到这个窗口的。
 
Table table = input
 .window([GroupWindow w].as("w")) // 定义一个group window并指定别名w
 .groupBy($("w"), $("a")) // 按照属性a和窗口wtable进行分组(键控的window
 .select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count());
 //select⼦句返回字段a、窗口的开始时间戳、窗口的结束时间戳、窗口的时间戳,b字段的count
 
注意:我们到底取哪个时间戳是由业务决定的,一般是start
 

3 Flink SQL滚动窗口实现


3.1 滚动窗口参数

滚动窗口通过Tumble类来定义,三个方法:
 


3.2 基于EventTime的滚动窗口实现

package com.bigdata.chap05;
import com.bigdata.entity.TempSensorData;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import java.time.ZoneId;
import static org.apache.flink.table.api.Expressions.*;
/**
 * 基于事件时间的滚动窗口
 */
public class FlinkTableTumbleWinBaseEventTime {
    public static void main(String[] args) throws Exception {
        //1、获取Stream执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        System.out.println(tEnv.getConfig().getLocalTimeZone());
 
        env.setParallelism(1);
 
        //3、读取数据并提取时间戳指定水印生成策略
        WatermarkStrategy<TempSensorData> watermarkStrategy = WatermarkStrategy
                .<TempSensorData>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<TempSensorData>() {
                    @Override
                    public long extractTimestamp(TempSensorData element, long recordTimestamp) {
                        return element.getTp()*1000;
                    }
                });
        DataStream<TempSensorData> tempSensorData = env.socketTextStream("hadoop1", 8888)
                .map(event -> {
                    String[] arr = event.split(",");
                    return TempSensorData
                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                }).assignTimestampsAndWatermarks(watermarkStrategy);
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
        //4、流转换为动态表
        Table table = tEnv.fromDataStream(tempSensorData,
                $("sensorID"),
                $("tp"),
                $("temp"),
                $("evTime").rowtime(),//新增evTime字段为rowtime
                $("pt").proctime()
        );
        //table.execute().print();
        //5、自定义窗口并计算
        Table result = table.window(Tumble
//窗口大小为2s
                .over(lit(2).second())
//按照eventTime排序
                .on($("evTime"))
                .as("w"))
//按照sensorID和窗口分组
                .groupBy($("sensorID"), $("w"))
//统计每个窗口的平均气温
                .select($("sensorID"), $("temp").avg().as("avgTemp"));
        //6、打印
        result.execute().print();
    }
}
 

3.3 测试数据集

hadoop1节点上面打开nc服务:nc -lk 8888  ,输入以下数据集测试运行
s-5,1645085900,14
s-5,1645085901,17
s-5,1645085902,22
s-5,1645085903,7
s-5,1645085904,21
s-5,1645085905,23
s-5,1645085906,8
s-5,1645085907,32
s-5,1645085908,15
s-5,1645085909,9
 
如果能基于EventTime按照时间窗口统计出每个传感器的平均气温,则说明Flink SQL翻滚窗口实现成功。
欢迎点赞 + 收藏 + 在看  素质三连 


往期精彩回顾
程序员,如何避免内卷
Apache 架构师总结的 30 条架构原则
【全网首发】Hadoop 3.0分布式集群安装
大数据运维工程师经典面试题汇总(附带答案)
大数据面试130题
某集团大数据平台整体架构及实施方案完整目录
大数据凉凉了?Apache将一众大数据开源项目束之高阁!
实战企业数据湖,抢先数仓新玩法
Superset制作智慧数据大屏,看它就够了
Apache Flink 在快手的过去、现在和未来
华为云-基于Ambari构建大数据平台(上)
华为云-基于Ambari构建大数据平台(下)
【HBase调优】Hbase万亿级存储性能优化总结
【Python精华】100个Python练手小程序
【HBase企业应用开发】工作中自己总结的Hbase笔记,非常全面!
【剑指Offer】近50个常见算法面试题的Java实现代码

长按识别左侧二维码

     关注领福利    

  领10本经典大数据书

文章转载自大数据研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论