By大数据研习社
概要:1.随着Flink1.5的发布,FlinkSQL 流批一体更加成熟与完善。
2.Flink SQL可以替代Flink DataStream实现窗口计算。
1 Flink 翻滚窗口适用场景
1.1 定义
将数据依据固定的窗口度对无界数据流进行切片。
1.2 特点
1.3 适用场景

2 Flink SQL窗口编程模型
.window([GroupWindow w].as("w")) 定义一个group window并指定别名w .groupBy($("w")) / 按照窗口w对table进行分组 .select($("b").sum()); // select子句指定返回的列和聚合运算(非键控(key)的window) .window([GroupWindow w].as("w")) // 定义一个group window并指定别名w .groupBy($("w"), $("a")) // 按照属性a和窗口w对table进行分组(键控的window) .select($("a"), $("b").sum()); // select子句指定返回的列和聚合运算在select子句中,我们还可以返回Window的属性:start,end,rowtime
注意:基于时间的窗口是左闭右开的,例如从9点开始创建一个1个时的窗口,则start为09:00:00.000,end为10:00:00.000,rowtime为09:59:59.999。时间戳正好等于end的event是不会被分组到这个窗口的。 .window([GroupWindow w].as("w")) // 定义一个group window并指定别名w .groupBy($("w"), $("a")) // 按照属性a和窗口w对table进行分组(键控的window) .select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count()); //select⼦句返回字段a、窗口的开始时间戳、窗口的结束时间戳、窗口的时间戳,b字段的count注意:我们到底取哪个时间戳是由业务决定的,一般是start。3 Flink SQL滚动窗口实现
3.1 滚动窗口参数

3.2 基于EventTime的滚动窗口实现
package com.bigdata.chap05;import com.bigdata.entity.TempSensorData;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.Tumble;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.*;public class FlinkTableTumbleWinBaseEventTime { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); System.out.println(tEnv.getConfig().getLocalTimeZone()); WatermarkStrategy<TempSensorData> watermarkStrategy = WatermarkStrategy .<TempSensorData>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner<TempSensorData>() { public long extractTimestamp(TempSensorData element, long recordTimestamp) { return element.getTp()*1000; DataStream<TempSensorData> tempSensorData = env.socketTextStream("hadoop1", 8888) String[] arr = event.split(","); .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) }).assignTimestampsAndWatermarks(watermarkStrategy); tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); Table table = tEnv.fromDataStream(tempSensorData, $("evTime").rowtime(),//新增evTime字段为rowtime //table.execute().print(); Table result = table.window(Tumble .groupBy($("sensorID"), $("w")) .select($("sensorID"), $("temp").avg().as("avgTemp")); result.execute().print();3.3 测试数据集
在hadoop1节点上面打开nc服务:nc -lk 8888 ,输入以下数据集测试运行如果能基于EventTime按照时间窗口统计出每个传感器的平均气温,则说明Flink SQL翻滚窗口实现成功。长按识别左侧二维码
关注领福利
领10本经典大数据书