每天1次,打卡阅读
获取AI大数据技术、面经、内推信息
前言
Hello,各位大数据学习爱好者,我是 3分钟秒懂大数据 公众号的作者土哥,目前在杭州某互联网大厂担任大数据算法工程师,组内专注于Flink流式计算组件以及AB融合技术,为了让更多朋友更清晰的了解流式计算组件,现在我以面试的方式为大家全面总结了Flink所涉及的知识点,全文总共6万字,涉及各种原理,以及源码分析,图片是一张张绘制而出,欢迎大家进行解读!


提纲
正文

01、Flink 基础篇
1、什么是Flink?描述一下

Flink是一个以 流 为核心的高可用、高性能的分布式计算引擎。具备 流批一体,高吞吐、低延迟,容错能力,大规模复杂计算等特点,在数据流上提供 数据分发、通信等功能。
2、能否详细解释一下其中的 数据流、流批一体、容错能力等概念?
数据流:所有产生的 数据 都天然带有 时间概念,把 事件 按照时间顺序排列起来,就形成了一个事件流,也被称作数据流。
流批一体:
首先必须先明白什么是 有界数据 和 无界数据

有界数据,就是在一个确定的时间范围内的数据流,有开始,有结束,一旦确定就不会再改变,一般 批处理 用来处理有界数据,如上图的 bounded stream。
Flink的设计思想是以 流 为核心,批是流的特例,擅长处理 无界 和 有界 数据, Flink 提供 精确的时间控制能力 和 有状态 计算机制,可以轻松应对无界数据流,同时 提供 窗口 处理有界数据流。所以被成为流批一体。
容错能力:
在分布式系统中,硬件故障、进程异常、应用异常、网络故障等异常无处不在,Flink引擎必须保证故障发生后 不仅可以 重启 应用程序,还要 确保 其内部状态保持一致,从最后一次正确的时间点重新出发
Flink 利用检查点特性,在框架层面 提供 Exactly-once 语义,即端到端的一致性,确保数据仅处理一次,不会重复也不会丢失,即使出现故障,也能保证数据只写一次。
3、Flink 和 Spark Streaming的区别?
Flink 和 Spark Sreaming 最大的区别在于:Flink 是标准的实时处理引擎,基于事件驱动,以流为核心,而 Spark Streaming 的RDD 实际是一组小批次的RDD集合,是微批(Micro-Batch)的模型,以批为核心。
下面我们介绍两个框架的主要区别:
1. 架构模型
Spark Streaming 在运行时的主要角色包括:


2. 任务调度
Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,Spark Streaming 会依次创建 DStreamGraph、JobScheduler。

Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度,根据物理执行图部署到Taskmanager上形成具体的Task执行。

3. 时间机制
Spark Streaming 支持的时间机制有限,只支持处理时间。
Flink 支持了流处理程序在时间上的三个定义:事件时间 EventTime、摄入时间 IngestionTime 、处理时间 ProcessingTime。同时也支持 watermark 机制来处理滞后数据。

4. 容错机制
对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。
Flink 则使用两阶段提交协议来解决这个问题。
4、Flink的架构包含哪些?
Flink 架构分为 技术架构 和 运行架构 两部分。
5、简单介绍一下技术架构
如下图为Flink技术架构:

6、详细介绍一下Flink的运行架构
如下图为Flink运行架构:

Flink 集群采取 Master - Slave 架构,Master的角色为 JobManager,负责集群和作业管理,Slave的角色是 TaskManager,负责执行计算任务,同时,Flink 提供客户端 Client 来管理集群和提交任务,JobManager 和 TaskManager 是集群的进程。
(3)TaskManager
7、Flink的并行度介绍一下?
Flink程序在执行的时候,会被映射成一个Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成的。在启动时从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

一个Stream可以被分成多个Stream的分区,也就是Stream Partition。一个Operator也可以被分为多个Operator Subtask。如上图中,Source被分成Source1和Source2,它们分别为Source的Operator Subtask。每一个Operator Subtask都是在不同的线程当中独立执行的。一个Operator的并行度,就等于Operator Subtask的个数。
上图Source的并行度为2。而一个Stream的并行度就等于它生成的Operator的并行度。数据在两个operator之间传递的时候有两种模式:
(1)One to One模式:两个operator用此模式传递的时候,会保持数据的分区数和数据的 排序;如上图中的Source1到Map1,它就保留的Source的分区特性,以及分区元素处 理的有序性。
(2)Redistributing (重新分配)模式:这种模式会改变数据的分区数;每个operator subtask会根据选择transformation把数据发送到不同的目标subtasks,比如keyBy()会通过hashcode重新分区,broadcast()和rebalance()方法会随机重新分区;
8、Flink的并行度的怎么设置的?
我们在实际生产环境中可以从四个不同层面设置并行度:
操作算子层面(Operator Level)
执行环境层面(Execution Environment Level)
客户端层面(Client Level)
系统层面(System Level)
需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。
9、Flink编程模型了解不?
10、Flink作业中的DataStream,Transformation介绍一下

11、Flink的分区策略了解吗?

(1)GlobalPartitioner
数据会被分发到下游算子的第一个实例中进行处理。
(2)ForwardPartitioner
(3)ShufflePartitioner
随机的将元素进行分区,可以确保下游的Task能够均匀地获得数据,使用代码如下:
dataStream.shuffle();
(4)RebalancePartitioner
dataStream.rebalance();
(5)RescalePartitioner

dataStream.rescale();
(6)BroadcastPartitioner
将该记录广播给所有分区,即有N个分区,就把数据复制N份,每个分区1份,其使用代码如下:
dataStream.broadcast();
(7)KeyGroupStreamPartitioner
(8)CustomPartitionerWrapper
用户自定义分区器。需要用户自己实现Partitioner接口,来定义自己的分区逻辑。
12、描述一下Flink wordcount执行包含的步骤有哪些?
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {//定义socket的端口号int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("没有指定port参数,使用默认值9000");port = 9000;}//获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream("10.192.12.106", port, "\n");//计算数据DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word:splits) {out.collect(new WordWithCount(word,1L));}}})//打平操作,把每行的单词转为<word,count>类型的数据.keyBy("word")//针对相同的word数据进行分组.timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小.sum("count");//把数据打印到控制台windowCount.print().setParallelism(1);//使用一个并行度//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行env.execute("streaming word count");}/*** 主要为了存储单词以及单词出现的次数*/public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}}
13、Flink常用的算子有哪些?

02、Flink 核心篇

核心篇主要涉及以上知识点,下面让我们详细了解一下。
14、Flink的四大基石包含哪些?
15、说说Flink窗口,以及划分机制
窗口概念:将无界流的数据,按时间区间,划分成多份数据,分别进行统计(聚合)





(3)count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)---基于数量的滚动窗口


Flink中还支持一个特殊的窗口:会话窗口SessionWindows

16、看你基本概念讲的还是很清楚的,那你介绍下Flink的窗口机制以及各组件之间是如何相互工作的

Window本身只是一个ID标识符,其内部可能存储了一些元数据,如
TimeWindow中有开始和结束时间,但是并不会存储窗口中的元素。窗口中的元素实际存储在 Key/Value State 中,key为
Window,value为元素集合(或聚合值)。为了保证窗口的容错性,该实现依赖了 Flink 的 State 机制。
WindowTrigger
2、每一个Window都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。每当有元素加入到该窗口,或者之前注册的定时器超时了,那么Trigger都会被调用。Trigger的返回结果可以是 :
当数据到来时,调用Trigger判断是否需要触发计算,如果调用结果只是Fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据不清理,等待下次Trigger fire的时候再次执行计算。窗口中的数据会被反复计算,直到触发结果清理。在清理之前,窗口和数据不会释放没所以窗口会一直占用内存。
Trigger 触发流程:
Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给函数进行计算。
sum(),
min(),
max(),还有
ReduceFunction,
FoldFunction,还有
WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。
17、讲一下Flink的Time概念

- EventTime[事件时间]
18、那在API调用时,应该怎么使用?
final StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironrnent();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) ;// 使用摄入时间env.setStrearnTimeCharacteristic(TimeCharacteristic.IngestionTime);// 使用事件时间env.setStrearnTimeCharacteristic(TimeCharacteri stic Eve~tTime);
19、在流数据处理中,有没有遇到过数据延迟等问题,通过什么处理呢?
案例1: 假你正在去往地下停车场的路上,并且打算用手机点一份外卖。
选好了外卖后,你就用在线支付功能付款了,这个时候是11点50分。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点05分了。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。
在上面这个场景中你可以看到,支付数据的事件时间是11点50分,而支付数据的处理时间是12点05分
A 用户在11:02 对 App 进行操作,B用户在11:03 操作了 App,
但是A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到B 用户11:03 的消息,然后再接受到A 用户11:02 的消息,消息乱序了。
一般处理数据延迟、消息乱序等问题,通过WaterMark水印来处理。
水印是用来解决数据延迟、数据乱序等问题,总结如下图所示:

水印就是一个时间戳(timestamp),Flink可以给数据流添加水印
- 水印并不会影响原有Eventtime事件时间
- 当数据流添加水印后,会按照水印时间来触发窗口计算,也就是说watermark水印是用来触发窗口计算的
- 设置水印时间,会比事件时间小几秒钟,表示最大允许数据延迟达到多久
- 水印时间 = 事件时间 - 允许延迟时间 (例如:10:09:57 = 10:10:00 - 3s )
20、WaterMark原理讲解一下?



如果觉得阿周讲解的知识点还满意的话,请关注公众号:3分钟秒懂大数据,获取更多,更全面的技术博文。并加博主微信:threeknowbigdata,拉你进大数据交流群。

21、如果数据延迟非常严重呢?只使用WaterMark可以处理吗?那应该怎么解决?
API调用
l sideOutputLateData(outputTag:OutputTag[T])--保存延迟数据
22、刚才提到State,那你简单说一下什么是State。

在Flink中,状态被称作state,是用来 保存 中间的 计算结果 或者 缓存数据。
根据状态是否需要保存中间结果,分为 无状态计算 和 有状态计算。
对于流计算而言,事件持续产生,如果每次计算相互独立,不依赖上下游的事件,则相同输入,可以得到相同输出,是无状态计算。
如果计算需要依赖于之前或者后续事件,则被称为有状态计算。


23、Flink 状态包括哪些?
(1) 按照由 Flink管理 还是 用户管理,状态可以分为 原始状态(Raw State)和 托管状态(ManagedState)
原始状态(Raw State):由用户自行进行管理。
两者区别:
(2)State 按照是否有 key 划分为 KeyedState 和 OperatorState 两种。
keyedState特点:

OperatorState特点:

24、Flink广播状态了解吗?

25、Flink 状态接口包括哪些?
在Flink中使用状态,包含两种状态接口:
(1)状态操作接口:使用状态对象本身存储,写入、更新数据。
(2)状态访问接口:从StateBackend获取状态对象本身。
状态操作接口
Flink 中的 状态操作接口 面向两类用户,即 应用开发者 和 Flink 框架本身。 所有Flink设计了两套接口
1、面向开发者State接口

面向开发者的 State 接口体系
2、面向内部State接口
状态访问接口
有了状态之后,开发者自定义UDF时,应该如何访问状态?
OperatorStateStore 接口原理:

OperatorState数据以Map形式保存在内存中,并没有使用RocksDBStateBackend和HeapKeyedStateBackend。
KeyedStateStore 接口原理:

26、Flink 状态如何存储
在Flink中, 状态存储被叫做 StateBackend , 它具备两种能力:
(1)在计算过程中提供访问State能力,开发者在编写业务逻辑中能够使用StateBackend的接口读写数据。
(2)能够将State持久化到外部存储,提供容错能力。
Flink状态提供三种存储方式:
(1)内存:MemoryStateBackend,适用于验证、测试、不推荐生产使用。
(2)文件:FSStateBackend,适用于长周期大规模的数据。
(3)RocksDB : RocksDBStateBackend,适用于长周期大规模的数据。
上面提到的 StateBackend是面向用户的,在Flink内部3种 State 的关系如下图:

1、内存型 StateBackend
2、文件型 StateBackend
FSStateBackend 适用于处理大状态、长窗口、或者大键值状态的有状态处理任务。
注意点如下 :
1) State 数据首先被存在 TaskManager 的内存中。
2) State大小不能超过TM内存。
3) TM异步将State数据写入外部存储。
MemoryStateBackend 和FSStateBackend 都依赖于HeapKeyedStateBackend,HeapKeyedStateBackend 使用 State存储数据。
3、RocksDBStateBackend
RocksDBStateBackend 跟内存型和文件型都不同 。
RocksDBStateBackend 使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的State数据全量或者增量持久化到配置的文件系统中,
在 JobManager 内存中会存储少量的检查点元数据。RocksDB克服了State受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
缺点:
RocksDBStateBackend 相比基于内存的StateBackend,访问State的成本高很多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10。
适用场景
1)最适合用于处理大状态、长窗口,或大键值状态的有状态处理任务。
2)RocksDBStateBackend 非常适合用于高可用方案。
3) RocksDBStateBackend 是目前唯一支持增量检查点的后端。 增量检查点非常适用于超 大状态的场景。
注意点
27、Flink 状态如何持久化?
1、全量持久化策略
每次将全量的State写入到状态存储中(HDFS)。内存型、文件型、RocksDB类型的StataBackend 都支持全量持久化策略。

快照保存策略类体系
2、增量持久化策略
28、Flink 状态过期后如何清理?
1、DataStream中状态过期
过期时间:超过多长时间未访问,视为State过期,类似于缓存。
过期时间更新策略:创建和写时更新、读取和写时更新。
State可见性:未清理可用,超时则不可用。
2、Flink SQL中状态过期
StreamQueryConfig qConfig = ...//设置过期时间为 min = 12小时 ,max = 24小时qConfig.withIdleStateRetentionTime(Time.hours(12),Time.hours(24));

29、Flink 通过什么实现可靠的容错机制。
Flink 使用 轻量级分布式快照,设计检查点(checkpoint)实现可靠容错。
30、什么是Checkpoin检查点?
Checkpoint被叫做检查点,是Flink实现容错机制最核心的功能,是Flink可靠性的基石,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。
Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法
1.State:
2.Checkpoint:
表示了一个FlinkJob在一个特定时刻的一份全局状态快照,即包含了所有Task/Operator的状态
可以理解为Checkpoint是把State数据定时持久化存储了
比如KafkaConsumer算子中维护的Offset状态,当任务重新恢复的时候可以从Checkpoint中获取。
31、什么是Savepoin保存点?
32、什么是CheckpointCoordinator检查点协调器?
33、Checkpoint中保存的是什么信息?

例:(0,1000)表示0号partition目前消费到offset为1000的数据
Flink的pv task记录了当前计算的各app的pv值,为了方便讲解,我这里有两个app:app1、app2
例:(app1,50000)(app2,10000)表示app1当前pv值为50000表示app2当前pv值为10000每来一条数据,只需要确定相应app_id,将相应的value值+1后put到map中即可;
chk-100offset:(0,1000)pv:(app1,50000)(app2,10000)该状态信息表示第100次CheckPoint的时候, partition 0 offset消费到了1000,pv统计
34、当作业失败后,检查点如何恢复作业?
Flink提供了 应用自动恢复机制 和 手动作业恢复机制。
应用自动恢复机制:
/bin/flink -s flink/checkpoints/03112312a12398740a87393/chk-50/_metadata
35、当作业失败后,从保存点如何恢复作业?
36、Flink如何实现轻量级异步分布式快照?

37、什么是Barrier对齐?



38、什么是Barrier不对齐?
39、为什么要进行barrier对齐?不对齐到底行不行?
答:Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once;
40、Flink支持Exactly-Once语义,那什么是Exactly-Once?
41、要实现Exactly-Once,需具备什么条件?
流系统要实现Exactly-Once,需要保证上游 Source 层、中间计算层和下游 Sink 层三部分同时满足端到端严格一次处理,如下图:

Flink端到端严格一次处理
Source端:数据从上游进入Flink,必须保证消息严格一次消费。同时Source 端必须满足可重放(replay)。否则 Flink 计算层收到消息后未计算,却发生 failure 而重启,消息就会丢失。
Flink计算层:利用 Checkpoint 机制,把状态数据定期持久化存储下来,Flink程序一旦发生故障的时候,可以选择状态点恢复,避免数据的丢失、重复。
Sink端:Flink将处理完的数据发送到Sink端时,通过 两阶段提交协议 ,即 TwoPhaseCommitSinkFunction 函数。该 SinkFunction 提取并封装了两阶段提交协议中的公共逻辑,保证Flink 发送Sink端时实现严格一次处理语义。 同时:Sink端必须支持事务机制,能够进行数据回滚或者满足幂等性。
回滚机制:即当作业失败后,能够将部分写入的结果回滚到之前写入的状态。
幂等性:就是一个相同的操作,无论重复多少次,造成的结果和只操作一次相等。即当作业失败后,写入部分结果,但是当重新写入全部结果时,不会带来负面结果,重复写入不会带来错误结果。
42、什么是两阶段提交协议?
两阶段提交协议(Two -Phase Commit,2PC)是解决分布式事务问题最常用的方法,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现ACID中的 A(原子性)。
两阶段提交协议中 有两个重要角色,协调者(Coordinator)和 参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个。
两阶段提交阶段分为两个阶段:投票阶段(Voting)和 提交阶段(Commit)。
投票阶段:
(1)协调者向所有参与者发送 prepare 请求和事务内容,询问是否可以准备事务提交,等待参与者的相应。
(2)参与者执行事务中包含的操作,并记录 undo 日志(用于回滚)和 redo 日志(用于重放),但不真正提交。
(3)参与者向协调者返回事务操作的执行结果,执行成功返回yes,失败返回no。
提交阶段:
分为成功与失败两种情况。
若所有参与者都返回 yes,说明事务可以提交:
协调者向所有参与者发送 commit 请求。
参与者收到 commit 请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回 ack 。
协调者收到所有参与者的 ack 消息,事务成功完成,如下图:


若有参与者返回 no 或者超时未返回,说明事务中断,需要回滚:
协调者向所有参与者发送rollback请求。
参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack。
协调者收到所有参与者的ack消息,事务回滚完成。


43、Flink 如何保证 Exactly-Once 语义?
Flink通过两阶段提交协议来保证Exactly-Once语义。
我们以 Kafka - Flink -Kafka 为例 说明如何保证Exactly-Once语义。

如上图所示:Flink作业包含以下算子。
Flink使用两阶段提交协议 预提交(Pre-commit)阶段和 提交(Commit)阶段保证端到端严格一次。

预处理阶段: Checkpoint 启动
对于 Source 任务而言,就会把当前的 offset 作为状态保存起来。下次从 Checkpoint 恢复时,Source 任务可以重新提交偏移量,从上次保存的位置开始重新消费数据,如下图所示:

预处理阶段:checkpoint barrier传递 及 offset 保存

预处理阶段:预提交到外部系统

提交阶段:数据精准被消费
44、数的很好,很清楚,那你对Flink 端到端 严格一次Exactly-Once 语义做个总结


45、Flink广播机制了解吗?
如下图所示:

46、Flink反压了解吗?
47、Flink反压的影响有哪些?
反压会影响到两项指标: checkpoint 时长和 state 大小
(1)前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to End Duration)变长。
(2)后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到state 里面,导致 checkpoint 变大。
这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。
48、Flink反压如何解决?
(1)定位反压节点
要解决反压首先要做的是定位到造成反压的节点,这主要有两种办法:
通过 Flink Web UI 自带的反压监控面板;
通过 Flink Task Metrics。
(1)反压监控面板

(2)Task Metrics
Flink 提供的 Task Metrics 是更好的反压监控手段
如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;
如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。
49、Flink支持的数据类型有哪些?
Flink支持的数据类型如下图所示:
从图中可以看到 Flink 类型可以分为基础类型(Basic)、数组(Arrays)、复合类型(Composite)、辅助类型(Auxiliary)、泛型和其它类型(Generic)。Flink 支持任意的 Java 或是 Scala 类型。
50、Flink如何进行序列和反序列化的?
在Flink中,当数据需要进行序列化时,会使用TypeInformation的生成序列化器接口调用一个 createSerialize() 方法,创建出TypeSerializer,TypeSerializer提供了序列化和反序列化能力。如下图所示:Flink 的序列化过程

对于大多数数据类型 Flink 可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化 ,如下图:

比如,BasicTypeInfo、WritableTypeIno ,但针对 GenericTypeInfo 类型,Flink 会使用 Kyro 进行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。
通过一个案例介绍Flink序列化和反序列化:

如上图所示,当创建一个Tuple 3 对象时,包含三个层面,一是 int 类型,一是 double 类型,还有一个是 Person。Person对象包含两个字段,一是 int 型的 ID,另一个是 String 类型的 name,
(1)在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到 Tuple 3 会把 int 类型通过 IntSerializer 进行序列化操作,此时 int 只需要占用四个字节。
(2)Person 类会被当成一个 Pojo 对象来进行处理,PojoSerializer 序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由 MemorySegment 去支持。
MemorySegment 具有什么作用呢?
MemorySegment 在 Flink 中会将对象序列化到预分配的内存块上,它代表 1 个固定长度的内存,默认大小为 32 kb。MemorySegment 代表 Flink 中的一个最小的内存分配单元,相当于是 Java 的一个 byte 数组。每条记录都会以序列化的形式存储在一个或多个 MemorySegment 中。
51、为什么Flink使用自主内存而不用JVM内存管理?
JVM 内存管理的不足:
1)Java 对象存储密度低。Java 的对象在内存中存储包含 3 个主要部分:对象头、实例 数据、对齐填充部分。例如,一个只包含 boolean 属性的对象占 16byte:对象头占 8byte, boolean 属性占 1byte,为了对齐达到 8 的倍数额外占 7byte。而实际上只需要一个 bit(1/8 字节)就够了。
2)Full GC 会极大地影响性能。尤其是为了处理更大数据而开了很大内存空间的 JVM 来说,GC 会达到秒级甚至分钟级。
3)OOM 问题影响稳定性。OutOfMemoryError 是分布式计算框架经常会遇到的问题, 当JVM中所有对象大小超过分配给JVM的内存大小时,就会发生OutOfMemoryError错误, 导致 JVM 崩溃,分布式框架的健壮性和性能都会受到影响。
4)缓存未命中问题。CPU 进行计算的时候,是从 CPU 缓存中获取数据。现代体系的 CPU 会有多级缓存,而加载的时候是以 Cache Line 为单位加载。如果能够将对象连续存储, 这样就会大大降低 Cache Miss。使得 CPU 集中处理业务,而不是空转。
52、那Flink自主内存是如何管理对象的?
53、Flink内存模型介绍一下?

JobManager内存模型

在 1.10 中,Flink 统一了 TM 端的内存管理和配置,相应的在 1.11 中,Flink 进一步 对 JM 端的内存配置进行了修改,使它的选项和配置方式与 TM 端的配置方式保持一致。

TaskManager内存模型


JVM Heap:JVM 堆上内存
1、Framework Heap Memory:Flink 框架本身使用的内存,即 TaskManager 本身所 占用的堆上内存,不计入 Slot 的资源中。
配置参数:taskmanager.memory.framework.heap.size=128MB,默认 128MB
2、Task Heap Memory:Task 执行用户代码时所使用的堆上内存。
配置参数:taskmanager.memory.task.heap.size
Off-Heap Mempry:JVM 堆外内存
1、DirectMemory:JVM 直接内存
1)Framework Off-Heap Memory:Flink框架本身所使用的内存,即TaskManager 本身所占用的对外内存,不计入 Slot 资源。
配置参数:taskmanager.memory.framework.off-heap.size=128MB,默认 128MB
2)Task Off-Heap Memory:Task 执行用户代码所使用的对外内存。
配置参数:taskmanager.memory.task.off-heap.size=0,默认 0
3)Network Memory:网络数据交换所使用的堆外内存大小,如网络数据交换 缓冲区
2、Managed Memory:Flink 管理的堆外内存,
用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。
JVM specific memory:JVM 本身使用的内存
1、JVM metaspace:JVM 元空间
2、JVM over-head 执行开销:JVM 执行时自身所需要的内容,包括线程堆栈、IO、 编译缓存等所使用的内存。
配置参数:taskmanager.memory.jvm-overhead.min=192mb
taskmanager.memory.jvm-overhead.max=1gb
taskmanager.memory.jvm-overhead.fraction=0.1
总体内存
1、总进程内存:Flink Java 应用程序(包括用户代码)和 JVM 运行整个进程所消 耗的总内存。
总进程内存 = Flink 使用内存 + JVM 元空间 + JVM 执行开销
配置项:taskmanager.memory.process.size: 1728m
2、Flink 总内存:仅 Flink Java 应用程序消耗的内存,包括用户代码,但不包括 JVM 为其运行而分配的内存。
Flink 使用内存:框架堆内外 + task 堆内外 + network + manage
54、Flink如何进行资源管理的?
1 集群架构剖析
Flink的运行主要由 客户端、一个JobManager(后文简称JM)和 一个以上的TaskManager(简称TM或Worker)组成。

客户端
客户端主要用于提交任务到集群,在Session或Per Job模式中,客户端程序还要负责解析用户代码,生成JobGraph;在Application模式中,直接提交用户jar和执行参数即可。客户端一般支持两种模式:detached模式,客户端提交后自动退出。attached模式,客户端提交后阻塞等待任务执行完毕再退出。
JobManager
JM负责决定应用何时调度task,在task执行结束或失败时如何处理,协调检查点、故障恢复。该进程主要由下面几个部分组成:
1 ResourceManager,负责资源的申请和释放、管理slot(Flink集群中最细粒度的资源管理单元)。Flink实现了多种RM的实现方案以适配多种资源管理框架,如yarn、mesos、k8s或standalone。在standalone模式下,RM只能分配slot,而不能启动新的TM。注意:这里所说的RM跟Yarn的RM不是一个东西,这里的RM是JM中的一个独立的服务。
2 Dispatcher,提供Flink提交任务的rest接口,为每个提交的任务启动新的JobMaster,为所有的任务提供web ui,查询任务执行状态。
3 JobMaster,负责管理执行单个JobGraph,多个任务可以同时在一个集群中启动,每个都有自己的JobMaster。注意这里的JobMaster和JobManager的区别。
TaskManager
TM也叫做worker,用于执行数据流图中的任务,缓存并交换数据。集群至少有一个TM,TM中最小的资源管理单元是Slot,每个Slot可以执行一个Task,因此TM中slot的数量就代表同时可以执行任务的数量。
2 Slot与资源管理
每个TM是一个独立的JVM进程,内部基于独立的线程执行一个或多个任务。TM为了控制每个任务的执行资源,使用task slot来进行管理。每个task slot代表TM中的一部分固定的资源,比如一个TM有3个slot,每个slot将会得到TM的1/3内存资源。不同任务之间不会进行资源的抢占,注意GPU目前没有进行隔离,目前slot只能划分内存资源。
比如下面的数据流图,在扩展成并行流图后,同一的task可能分拆成多个任务并行在集群中执行。操作链可以把多个不同的任务进行合并,从而支持在一个线程中先后执行多个任务,无需频繁释放申请线程。同时操作链还可以统一缓存数据,增加数据处理吞吐量,降低处理延迟。
在Flink中,想要不同子任务合并需要满足几个条件:下游节点的入边是1(保证不存在数据的shuffle);子任务的上下游不为空;连接策略总是ALWAYS;分区类型为ForwardPartitioner;并行度一致;当前Flink开启Chain特性。

在集群中的执行图可能如下:

Flink也支持slot的共享,即把不同任务根据任务的依赖关系分配到同一个Slot中。这样带来几个好处:方便统计当前任务所需的最大资源配置(某个子任务的最大并行度);避免Slot的过多申请与释放,提升Slot的使用效率。

通过Slot共享,就有可能某个Slot中包含完整的任务执行链路。
3 应用执行
一个Flink应用就是用户编写的main函数,其中可能包含一个或多个Flink的任务。这些任务可以在本地执行,也可以在远程集群启动,集群既可以长期运行,也支持独立启动。下面是目前支持的任务提交方案:
Session集群
生命周期:集群事先创建并长期运行,客户端提交任务时与该集群连接。即使所有任务都执行完毕,集群仍会保持运行,除非手动停止。因此集群的生命周期与任务无关。
资源隔离:TM的slot由RM申请,当上面的任务执行完毕会自动进行释放。由于多个任务会共享相同的集群,因此任务间会存在竞争,比如网络带宽等。如果某个TM挂掉,上面的所有任务都会失败。
其他方面:拥有提前创建的集群,可以避免每次使用的时候过多考虑集群问题。比较适合那些执行时间很短,对启动时间有比较高的要求的场景,比如交互式查询分析。
Per Job集群
生命周期:为每个提交的任务单独创建一个集群,客户端在提交任务时,直接与ClusterManager沟通申请创建JM并在内部运行提交的任务。TM则根据任务运行需要的资源延迟申请。一旦任务执行完毕,集群将会被回收。
资源隔离:任务如果出现致命问题,仅会影响自己的任务。
其他方面:由于RM需要申请和等待资源,因此启动时间会稍长,适合单个比较大、长时间运行、需要保证长期的稳定性、不在乎启动时间的任务。
Application集群
生命周期:与Per Job类似,只是main()方法运行在集群中。任务的提交程序很简单,不需要启动或连接集群,而是直接把应用程序打包到资源管理系统中并启动对应的EntryPoint,在EntryPoint中调用用户程序的main()方法,解析生成JobGraph,然后启动运行。集群的生命周期与应用相同。
资源隔离:RM和Dispatcher是应用级别。

03、Flink 源码篇

55、FLink作业提交流程应该了解吧?
Flink的提交流程:
1.在Flink Client中,通过反射启动jar中的main函数,生成Flink StreamGraph和JobGraph,将JobGraph提交给Flink集群
2.Flink集群收到JobGraph(JobManager收到)后,将JobGraph翻译成ExecutionGraph,然后开始调度,启动成功之后开始消费数据。
总结来说:Flink核心执行流程,对用户API的调用可以转为 StreamGraph -->JobGraph -- > ExecutionGraph。
56、FLink作业提交分为几种方式?
Flink的作业提交分为两种方式
1.Local 方式:即本地提交模式,直接在IDEA运行代码。
2.远程提交方式:分为Standalone方式、yarn方式、K8s方式
Yarn 方式分为三种提交模式:Yarn-perJob模式、Yarn-Sessionmo模式、Yarn-Application模式
57、FLink JobGraph是在什么时候生成的?
StreamGraph、JobGraph全部是在Flink Client 客户端生成的,即提交集群之前生成,原理图如下:

58、那在jobGraph提交集群之前都经历哪些过程?
(1)用户通过启动Flink集群,使用命令行提交作业,运行 flink run -c WordCount xxx.jar
(2)运行命令行后,会通过run脚本调用CliFrontend入口,CliFrontend会触发用户提交的jar文件中的main方法,然后交给PipelineExecuteor # execute方法,最终根据提交的模式选择触发一个具体的PipelineExecutor执行。
具体流程图如下:

59、看你提到PipeExecutor,它有哪些实现类?
所以PipeExecutor 的实现类如下图所示:(在代码中按CTRL+H就会出来)

除了上述框的两种模式外,在IDEA环境中运行Flink MiniCluster 进行调试时,使用LocalExecutor。
60、Local提交模式有啥特点,怎么实现的?
(1)Local是在本地IDEA环境中运行的提交方式。不上集群。主要用于调试,原理图如下:

1. Flink程序由JobClient进行提交
2. JobClient将作业提交给JobManager
3. JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
4. TaskManager启动一个线程开始执行,TaskManager会向JobManager报告状态更改,如开始执 行,正在进行或者已完成。
5. 作业执行完成后,结果将发送回客户端。
源码分析:通过Flink1.12.2源码进行分析的
(1)创建获取对应的StreamExecutionEnvironment对象:LocalStreamEnvironment
调用StreamExecutionEnvironment对象的execute方法





(2)获取streamGraph

(3)执行具体的PipeLineExecutor - >得到localExecutorFactory

(4) 获取JobGraph
根据localExecutorFactory的实现类LocalExecutor生成JobGraph

(5)实例化MiniCluster集群

(6)返回JobClient 客户端
在上面执行miniCluster.submitJob 将JobGraph提交到本地集群后,会返回一个JobClient客户端,该JobClient包含了应用的一些详细信息,包括JobID,应用的状态等等。最后返回到代码执行的上一层,对应类为StreamExecutionEnvironment。

以上就是Local模式的源码执行过程。
61、远程提交模式都有哪些?
远程提交方式:分为Standalone方式、yarn方式、K8s方式
Standalone:包含session模式
Yarn 方式分为三种提交模式:Yarn-perJob模式、Yarn-Sessionmo模式、Yarn-Application模式。
K8s方式:包含 session模式
62、Standalone模式简单介绍一下?
Standalone 模式为Flink集群的单机版提交方式,只使用一个节点进行提交,常用Session模式。
作业提交原理图如下:

提交命令如下:
bin/flink run org.apache.flink.WordCount xxx.jar
client客户端提交任务给JobManager
JobManager负责申请任务运行所需要的资源并管理任务和资源,
JobManager分发任务给TaskManager执行
TaskManager定期向JobManager汇报状态
63、yarn集群提交方式介绍一下?
通过yarn集群提交分为3种提交方式:分别为session模式、perjob模式、application模式
64、yarn - session模式特点?
提交命令如下:
./bin/flink run -t yarn-session \-Dyarn.application.id=application_XXXX_YY xxx.jar
Yarn-Session模式:所有作业共享集群资源,隔离性差,JM负载瓶颈,main方法在客户端执行。适合执行时间短,频繁执行的短任务,集群中的所有作业 只有一个JobManager,另外,Job被随机分配给TaskManager
特点:
Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的 作业。

65、yarn - perJob模式特点?
提交命令:
./bin/flink run -t yarn-per-job --detached xxx.jar
Yarn-Per-Job模式:每个作业单独启动集群,隔离性好,JM负载均衡,main方法在客户端执行。在per-job模式下,每个Job都有一个JobManager,每个TaskManager只有单个Job。
特点:
一个任务会对应一个Job,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

66、yarn - application模式特点?
提交命令如下:
./bin/flink run-application -t yarn-application xxx.jar
Yarn-Application模式:每个作业单独启动集群,隔离性好,JM负载均衡,main方法在JobManager上执行。
特点:
在yarn-per-job 和 yarn-session模式下,客户端都需要执行以下三步,即:
1、获取作业所需的依赖项;
2、通过执行环境分析并取得逻辑计划,即StreamGraph→JobGraph;

只有在这些都完成之后,才会通过env.execute()方法 触发 Flink运行时真正地开始执行作业。如果所有用户都在同一个客户端上提交作业,较大的依赖会消耗更多的带宽,而较复杂的作业逻辑翻译成JobGraph也需要吃掉更多的CPU和内存,客户端的资源反而会成为瓶颈。
为了解决它,社区在传统部署模式的基础上实现了 Application模式。原本需要客户端做的三件事被转移到了JobManager里,也就是说main()方法在集群中执行(入口点位于 ApplicationClusterEntryPoint ),客 户端只需要负责发起部署请求了
原理图如下:

综上所述,Flink社区比较推荐使用 yarn-perjob 或者 yarn-application模式进行提交应用。
67、yarn - session 提交流程详细介绍一下?
提交流程图如下:

1、启动集群
2、作业提交
(3)Flink Client 通过Rest 向Dispatcher 提交编译好的JobGraph。Dispatcher 是 Rest 接口,不负责实际的调度、指定工作。
(4)Dispatcher 收到 JobGraph 后,为作业创建一个JobMaster,将工作交给JobMaster,JobMaster负责作业调度,管理作业和Task的生命周期,构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)
以上两步执行完后,作业进入调度执行阶段。
(5)JobMaster向ResourceManager申请资源,开始调度ExecutionGraph。
(6)ResourceManager将资源请求加入等待队列,通过心跳向YarnResourceManager申请新的Container来启动TaskManager进程。
(7)YarnResourceManager启动,然后从HDFS加载Jar文件等所需相关资源,在容器中启动TaskManager,TaskManager启动TaskExecutor
(8)TaskManager启动后,向ResourceManager 注册,并把自己的Slot资源情况汇报给ResourceManager。
(9)ResourceManager从等待队列取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。
(10)TaskManager向JobMaster回复自己的一个Slot属于你这个任务,JobMaser会将Slot缓存到SlotPool。
(11)JobMaster调度Task到TaskMnager的Slot上执行。
68、yarn - perjob 提交流程详细介绍一下?
提交命令如下:
./bin/flink run -t yarn-per-job --detached xxx.jar
提交流程图如下所示:

1、启动集群
2、作业提交
(3)ApplicationMaster启动Dispatcher,Dispatcher启动ResourceManager和JobMaster(该步和Session不同,Jabmaster是由Dispatcher拉起,而不是Client传过来的)。JobMaster负责作业调度,管理作业和Task的生命周期,构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)
以上两步执行完后,作业进入调度执行阶段。
(4)JobMaster向ResourceManager申请Slot资源,开始调度ExecutionGraph。
(5)ResourceManager将资源请求加入等待队列,通过心跳向YarnResourceManager申请新的Container来启动TaskManager进程。
(6)YarnResourceManager启动,然后从HDFS加载Jar文件等所需相关资源,在容器中启动TaskManager。
(7)TaskManager在内部启动TaskExecutor。
(8)TaskManager启动后,向ResourceManager 注册,并把自己的Slot资源情况汇报给ResourceManager。
(9)ResourceManager从等待队列取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。
(10)TaskManager向JobMaster回复自己的一个Slot属于你这个任务,JobMaser会将Slot缓存到SlotPool。
(11)JobMaster调度Task到TaskMnager的Slot上执行。
69、流图、作业图、执行图三者区别?
Flink内部Graph总览图,由于现在Flink 实行流批一体代码,Batch API基本废弃,就不过多介绍
在Flink DataStramAPI 中,Graph内部转换图如下:

以WordCount为例,流图、作业图、执行图、物理执行图之间的Task调度如下:



对于Flink 流计算应用,运行用户代码时,首先调用DataStream API ,将用户代码转换为 Transformation,然后经过:StreamGraph->JobGraph->ExecutionGraph 3层转换(这些都是Flink内置的数据结构),最后经过Flink调度执行,在Flink 集群中启动计算任务,形成一个物理执行图。
70、流图介绍一下?
(1)流图 StreamGraph

1)StreamNode 点
StreamNode 点 ,从 Transformation转换而来,可以简单理解为 StreamNode 表示一个算子,存在实体和虚拟,可以有多个输入和输出,实体StreamNode 最终变成物理算子,虚拟的附着在StreamEdge 边 上。
2)StreamEdge 边
StreamEdge 是 StreamGraph 的边,用来连接两个StreamNode 点,一个StreamEdge可以有多个出边、入边等信息。
71、作业图介绍一下?
(2)作业图 JobGraph
JobGraph是由StreamGraph优化而来,是通过OperationChain 机制将算子合并起来,在执行时,调度在同一个Task线程上,避免数据的跨线程,跨网络传递。

1)JobVertex 点
经过算子融合优化后符合条件的多个StreamNode 可能会融合在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个算子, JobVertex 的输入是 JobEdge. 输出是 IntermediateDataSet
2)JobEdge 边
JobEdge 表示 JobGraph 中的一 个数据流转通道, 其上游数据源是 IntermediateDataSet ,下游消费者是 JobVertex 。
JobEdge 中的数据分发模式会直接影响执行时 Task 之间的数据连接关系是点对点连接还是全连接。
3)IntermediateDataSet 中间数据集
中间数据集 IntermediateDataSet 是一种逻辑结构.用来表示 JobVertex 的输出,即该 JobVertex 中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决 定了在执行时刻数据交换的模式。
72、执行图介绍一下?
(3)执行图 ExecutionGraph
ExecutionGraph是调度Flink 作业执行的核心数据结构,包含了作业中所有并行执行的Task信息、Task之间的关联关系、数据流转关系。
StreamGraph 和JobGraph都在Flink Client生成,然后交给Flink集群。JobGraph到ExecutionGraph在JobMaster中 完成,转换过程中重要变化如下:
2)生成了6个核心对象。

执行图ExecutionGraph 核心对象包括6个:
ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition、ExecutionEdge、Execution。
1)ExecutionJobVertex
该对象和 JobGraph 中的 JobVertex 一 一对应。该对象还包含一组 ExecutionVertex, 数量 与该 JobVertex 中所包含的StreamNode 的并行度一致,假设 StreamNode 的并行度为5 ,那么ExecutionJobVertex中也会包含 5个ExecutionVertex。
ExecutionJobVertex用来将一个JobVertex 封装成 ExecutionJobVertex,并依次创建 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用于丰富ExecutionGraph。
2)ExecutionVertex
ExecutionJobVertex会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。
3)IntermediateResult
IntermediateResult 又叫作中间结果集,该对象是个逻辑概念 表示 ExecutionJobVertex输出,和 JobGrap 中的IntermediateDalaSet 一 一对应,同样 一个ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge)。
4)IntermediateResultPartition
IntermediateResultPartition 又叫作中间结果分区。表示1个 ExecutionVertex输出结果,与 Execution Edge 相关联。
表示ExecutionVertex 的输入,连按到上游产生的IntermediateResultPartition 。1个Execution对应唯一的1个IntermediateResultPartition 和1个ExecutionVertex。1个ExecutionVertex 可以有多个ExecutionEdge。
6)Execution
ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将ExecutionVertex中的信息包装为1个Execution,执行一个ExecutionVertex的一次尝试。
JobManager 和 TaskManager 之间关于Task 的部署和Task执行状态的更新都是通过ExecutionAttemptID来识别标识的。

接下来问问作业调度的问题
73、Flink调度器的概念介绍一下?
调度器是Flink作业执行的核心组件,管理作业执行的所有相关过程,包括JobGraph到ExecutionGraph的转换、作业生命周期管理(作业的发布、取消、停止)、作业的Task生命周期管理(Task的发布、取消、停止)、资源申请与释放、作业和Task的Faillover等。
Flink 目前默认的调度器。是Flink新的调度设计,使用SchedulerStrategy来实现调度。
(2)LegacySchedular
过去的调度器,实现了原来的Execution调度逻辑。
74、Flink调度行为包含几种?
调度行为包含四种:
SchedulerStrategy接口定义了调度行为,其中包含4种行为:

75、Flink调度模式包含几种?
调度模式包含3种:Eager模式、分阶段模式(Lazy_From_Source)、分阶段Slot重用模式(Lazy_From_Sources_With_Batch_Slot_Request)。
适用于流计算。一次性申请需要的所有资源,如果资源不足,则作业启动失败。
2)分阶段调度
LAZY_FROM_SOURCES 适用于批处理。从 SourceTask 开始分阶段调度,申请资源的时候,一次性申请本阶段所需要的所有资源。上游 Task 执行完毕后开始调度执行下游的 Task,
读取上游的数据,执行本阶段的计算任务,执行完毕之后,调度后一个阶段的 Task,依次进行调度,直到作业完成。
3)分阶段 Slot 重用调度
LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 适用于批处理。与分阶段调度基本一样,区别在于该模式下使用批处理资源申请模式,可以在资源不足的情况下执行作
业,但是需要确保在本阶段的作业执行中没有 Shuffle 行为。
目前视线中的 Eager 模式和 LAZY_FROM_SOURCES 模式的资源申请逻辑一样,LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 是单独的资源申请逻辑。
76、Flink调度策略包含几种?
调度策略包含3种:


调度策略全部实现于调度器SchedulingStrategy,有三种实现:
1) EagerSchedulingStrategy:适用于流计算,同时调度所有的 task
2) LazyFromSourcesSchedulingStrategy:适用于批处理,当输入数据准备好时(上游处理完)进行 vertices 调度。
3) PipelinedRegionSchedulingStrategy:以流水线的局部为粒度进行调度
PipelinedRegionSchedulingStrategy 是 1.11 加入的,从 1.12 开始,将以 pipelined region为单位进行调度。
pipelined region 是一组流水线连接的任务。这意味着,对于包含多个 region的流作业,在开始部署任务之前,它不再等待所有任务获取 slot。取而代之的是,一旦任何region 获得了足够的任务 slot 就可以部署它。对于批处理作业,将不会为任务分配 slot,也不会单独部署任务。取而代之的是,一旦某个 region 获得了足够的 slot,则该任务将与所有其他任务一起部署在同一区域中。
77、Flink作业生命周期包含哪些状态?
在Flink集群中,JobMaster 负责作业的生命周期管理,具体的管理行为在调度器和ExecutionGraph中实现。
作业的完整生命周期状态变换如下图所示:

(1)作业首先处于创建状态(created),然后切换到运行状态(running),并且在完成所有工作后,它将切换到完成状态(finished)。
(2)在失败的情况下,作业首先切换到失败状态(failing),取消所有正在运行任务。
如果所有节点都已达到最终状态,并且作业不可重新启动,则状态将转换为失败(failed)。(3)如果作业可以重新启动,那么它将进入重新启动状态(restarting)。一旦完成重新启动,它将变成创建状态(created)。
(4)在用户取消作业的情况下,将进入取消状态(cancelling),会取消所有当前正在运行的任务。一旦所有运行的任务已经达到最终状态,该作业将转换到已取消状态(canceled)。
完成状态(finished),取消状态(canceled)和失败状态(failed)表示一个全局的终结状态,并且触发清理工作,而暂停状态(suspended)仅处于本地终止状态。意味着作业的执行在相应的 JobManager 上终止,但集群的另一个 JobManager 可以从持久的HA存储中恢复这个作业并重新启动。因此,处于暂停状态的作业将不会被完全清理。
78、Task的作业生命周期包含哪些状态?
TaskManager 负责Task 的生命周期管理,并将状态的变化通知到JobMaster,在ExecutionGraph中跟踪Execution的状态变化,一个Execution对于一个Task。
Task的生命周期如下:共8种状态。

在执行 ExecutionGraph 期间,每个并行任务经过多个阶段,从创建(created)到完成(finished)或失败(failed) ,下图说明了它们之间的状态和可能的转换。任务可以执行多次(例如故障恢复)。每个 Execution 跟踪一个 ExecutionVertex 的执行,每个 ExecutionVertex 都有一个当前 Execution(current execution)和一个前驱 Execution(prior execution)。
79、Flink的任务调度流程讲解一下?
任务调度流程图如下:

1. 当Flink执行executor会自动根据程序代码生成DAG数据流图 ,即 Jobgraph;
2. ActorSystem创建Actor将数据流图发送给JobManager中的Actor;
3. JobManager会不断接收TaskManager的心跳消息,从而可以获取到有效的TaskManager;
4. JobManager通过调度器在TaskManager中调度执行Task(在Flink中,最小的调度单元就是task,对应就是一个线程) ;
• Job Client
– 主要职责是提交任务, 提交后可以结束进程, 也可以等待结果返回 ;
– Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点;
• JobManager
– 主要职责是调度工作并协调任务做检查点;
– 集群中至少要有一个 master,master 负责调度 task,协调checkpoints 和 容错;
– 高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是stand by;
– Job Manager 包含 Actor System、Scheduler、CheckPoint三个重要的组件 ;
• TaskManager
– 主要职责是从JobManager处接收任务, 并部署和启动任务, 接收上游的数 据并处理
– Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。
– TaskManager在创建之初就设置好了Slot, 每个Slot可以执行一个任务。
80、Flink的任务槽是什么意思?

每个TaskManager是一个JVM的进程, 可以在不同的线程中执行一个或多个子任务。为了控制一个worker能接收多少个task。worker通过task slot来进行控制(一个worker 至少有一个task slot)。
1、任务槽
每个task slot表示TaskManager拥有资源的一个固定大小的子集。
一般来说:我们分配槽的个数都是和CPU的核数相等,比如8核,那么就分配8个槽。
Flink将进程的内存划分到多个slot中。
图中有2个TaskManager,每个TaskManager有3个slot,每个slot占有1/3的内存。
总结:task slot的个数代表TaskManager可以并行执行的task数。
81、Flink 槽共享又是什么意思?
2、槽共享
默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自同一个作业。结果是一个槽可以保存作业的整个管道。允许插槽共享有两个主要好处:
• 有了任务槽共享,可以将基本并行度(base parallelism)从2提升到6。提高了分槽资源的利用率。同时它还可以保障TaskManager给subtask的分配的slot方案更加公平。


04、Flink SQL篇
Flink SQL也是面试的重点考察点,不仅需要你掌握扎实的SQL编程,同时还需要理解SQL提交的核心原理,以及Flink SQL中涉及的一些重点知识,例如CEP、CDC、SQL GateWay、SQL-Hive等,思维导图如下:

82、Flink SQL有没有使用过?
用过,在Flink中,一共有四种级别的抽象,而Flink SQL作为最上层,是Flink API的一等公民

在标准SQL中,SQL语句包含四种类型
DML(Data Manipulation Language):数据操作语言,用来定义数据库记录(数据)。
DCL (Data Control Language):数据控制语言,用来定义访问权限和安全级别。
DQL (Data Query Language):数据查询语言,用来查询记录(数据)。
DDL(Data Definition Language):数据定义语言,用来定义数据库对象(库,表,列等)。
Flink SQL包含 DML 数据操作语言、 DDL
数据语言, DQL 数据查询语言,不包含DCL语言。
83、Flink被称作流批一体,那从哪个版本开始,真正实现流批一体的?
从1.9.0版本开始,引入了阿里巴巴的 Blink ,对 FIink TabIe & SQL 模块做了重大的重构,保留了 Flink Planner 的同时,引入了 Blink PIanner,没引入以前,Flink 没考虑流批作业统一,针对流批作业,底层实现两套代码,引入后,基于流批一体理念,重新设计算子,以流为核心,流作业和批作业最终都会被转为transformation。
84、Flink SQL 使用哪种解析器?
Flink SQL使用 Apache Calcite作为解析器和优化器。
Calcite 一种动态数据管理框架,它具备很多典型数据库管理系统的功能 如SQL 解析、 SQL 校验、 SQL 查询优化、 SQL 生成以及数据连接查询等,但是又省略了一些关键的功能,如 Calcite并不存储相关的元数据和基本数据,不完全包含相关处理数据的算法等。
85、Calcite主要功能包含哪些?
Calcite 主要包含以下五个部分:
(1) SQL 解析 (Parser)
(2) SQL 校验 (Validato)
校验分两部分
1)无状态的校验 即验证 SQL 语句是否符合规范。
2)有状态的校验 即通过与元数据结合验证 SQL 中的 Schema、Field、 Function 是否存 在,输入输出类型是否匹配等。
(3) SQL 查询优化
对上个步骤的输出( RelNode ,逻辑计划树)进行优化,得到优化后的物理执行计划 优化有两种:基于规则的优化 和 基于代价的优化,后面会详细介绍。
(4) SQL 生成
将物理执行计划生成为在特定平台/引擎的可执行程序,如生成符合 MySQL 或 Oracle 等不同平台规则的 SQL 查询语句等。
(5) 数据连接与执行
通过各个执行平台执行查询,得到输出结果。
在Flink 或者其他使用 Calcite 的大数据引擎中,一般到 SQL 查询优化即结束,由各个平台结合 Calcite SQL 代码生成 和 平台实现的代码生成,将优化后的物理执行计划组合成可执行的代码,然后在内存中编译执行。
86、Flink SQL 处理流程说一下?
下面举个例子,详细描述一下Flink Sql的处理流程,如下图所示:

我们写一张source表,来源为kafka,当执行create table log_kafka之后 Flink SQL将做如下操作:

87、Flink SQL包含哪些优化规则?
如下图为执行流程图

优化规则包含如下:
88、Flink SQL中涉及到哪些operation?
在Flink SQL中吗,涉及的DDL,DML,DQL操作都是Operation,在 Flink内部表示,Operation可以和SqlNode对应起来。
Operation执行在优化前,执行的函数为executeQperation,如下图所示,为执行的所有Operation。

89、Flink Hive有没有使用过?

90、Flink与Hive集成时都做了哪些操作?

(1)Flink1.1新引入了Hive方言,所以在Flink SQL中可以编写HIve语法,即Hive Dialect。
(2)编写HIve SQL后,FlinkSQL Planner 会将SQL进行解析,验证,转换成逻辑计划,物理计划,最终变成Jobgraph。
(3)HiveCatalog作为Flink和Hive的表元素持久化介质,会将不同会话的Flink元数据存储到Hive Metastore中。用户利用HiveCatalog可以将hive表或者 Kafka表存储到Hive Metastore中。
BlinkPlanner 是在Flink1.9版本新引入的机制,Blink 的查询处理器则实现流批作业接口的统一,底层的 API 都是Transformation。真正实现 流 &批 的统一处理,替代原FlinkPlanner将流&批区分处理的方式。在1.11版本后 已经默认为Blink Planner。
91、HiveCatalog类包含哪些方法?

HiveCatalog主要是持久化元数据,所以 一般的创建类型都包含,如 database,Table,View,Function,Partition,还有is_Generic字段判断等。
92、Flink SQL1.11新增了实时数仓功能,介绍一下?
针对这个特点,Flink1.11 版本将 FlieSystemStreaming Sink 重新修改,增加了分区提交和滚动策略机制,让HiveStreaming sink 重新使用文件系统流接收器。
Flink 1.11 的 Table/SQL API 中,FileSystemConnector 是靠增强版 StreamingFileSink组件实现,在源码中名为 StreamingFileWriter。* 只有在Checkpoint 成功时,StreamingFileSink写入的文件才会由 Pending状态变成 Finished状态,从而能够安全地被下游读取。所以,我们一定要打开 Checkpointing,并设定合理的间隔。
93、Flink -Hive实时写数据介绍下?
StreamingWrite,从kafka 中实时拿到数据,使用分区提交将数据从Kafka写入Hive表中,并运行批处理查询以读取该数据。
Flink -SQL 写法
Source源

Sink目的地

Insert 插入

Flink-table写法:
Source源


Insert 插入

94、Flink -Hive实时读数据介绍下?
如下图所示:

1、Flink都是基于calcite先解析sql,确定表来源于hive,如果是Hive表,将会在HiveCatalog中创建HiveTableFactory
2、HiveTableFactory 会基于配置文件创建 HiveTableSource,然后HiveTableSource在真正执行时,会调用getDataStream方法,通过getDataStream方法来确定查询匹配的分区信息,然后创建表对应的InputFormat,然后确定并行度,根据并行度确定slot 分发HiveMapredSplitReader任务。
3、在TaskManager端的slot中,Split会确定读取的内容,基于Hive中定义的序列化工具,InputFormat执行读取反序列化,得到value值。
4、最后循环执行reader.next 获取value,将其解析成Row。
95、Flink -Hive实时写数据时,如何保证已经写入分区的数据何时才能对下游可见呢?
如下图所示:

首先可以看一下,在实时的将数据存储到Hive数仓中,FileSystemConnector 为了与 Flink-Hive集成的大环境适配,最大的改变就是分区提交,可以看一下左下图,官方文档给出的,分区可以采取日期+ 小时的策略,或者时分秒的策略。
那如何保证已经写入分区的数据何时才能对下游可见呢? 这就和 触发机制 有关, 触发机制包含process-time和 partition-time以及时延。
partition-time 指的是根据事件时间中提取的分区触发。当'watermark' > 'partition-time' + 'delay' ,选择partition-time的数据才能提交成功,
process-time 指根据系统处理时间触发,当加上时延后,要想让分区进行提交,当'currentprocessing time' > 'partition creation time' + 'delay' 选择 process-time的数据可以提交成功。
但选择process-time触发机制会有缺陷,就是当数据迟到或者程序失败重启时,数据不能按照事件时间被归入正确分区。所以 一般会选择 partition-time。
96、源码中分区提交的PartitionCommitTrigger介绍一下?

1.pendingPartitions/pendingPartitionsState:等待提交的分区以及对应的状态;2.watermarks/watermarksState:watermarks(用 TreeMap 存储以保证有序)以及对应的状态。
97、PartitionTimeCommitTigger 是如何知道该提交哪些分区的呢?(源码分析)
1、检查checkpoint ID 是否合法;
2、取出当前checkpoint ID 对应的水印,并调用 TreeMap的headMap() 和 clear() 方法删掉早于当前 checkpoint ID的水印数据(没用了);
3、遍历等待提交的分区,调用之前定义的PartitionTimeExtractor。
(比如${year}-${month}-${day} ${hour}:00:00)抽取分区时间。
如果watermark>partition-time+delay,说明可以提交,并返回它们
98、如何保证已经写入分区的数据对下游可见的标志问题(源码分析)
在源码中,主要涉及PartitionCommitPolicy类,如下图所示:


99、Flink SQL CEP有没有接触过?
CEP的概念:

CEP的使用场景:


像用户异常检测:我们指定异常操作事件为要输出的结果流;策略营销:指定符合要求的事件为结果流;运维监控:指定一定范围的指标为结果流;银行卡盗刷:指定同一时刻在两个地方被刷两次为异常结果流。
Flink CEP SQL 语法 是通过SQL方式进行复杂事件处理,但是与 Flink SQL语法也不太相同,其中包含许多规则。
100、Flink SQL CEP了解的参数介绍一下?
CEP包含的参数如下:

p输出模式(每个找到的匹配项应该输出多少行)
Øone row per match
每次检测到完整的匹配后进行汇总输出
Øall rows per match (flink暂不支持)
检测到完整的匹配后会把匹配过程中每条具体记录进行输出
Ø在计算中使用那些匹配的事件
running匹配中和final匹配结束
Ø define语句中只可以使用running,measure两者都可以
Ø输出结果区别
对于one row per match,输出没区别
对于all rows per match ,输出不同




3、匹配后跳转模式介绍
after match(匹配后,从哪里开始重新匹配)
Ø skip to next row
从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配
Ø skip past last row
从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配
Ø skip to first pattern Item
从匹配成功的事件序列中第一个对应于patternItem的事件开始进行下一次匹配
Øskip to last pattern Item
从匹配成功的事件序列中最后一个对应于patternItem的事件开始进行下一次匹配
注意:
在使用skip to first/last patternItem容易出现循环匹配问题,需要慎重
针对上面的匹配后跳转模式分别介绍:

(2)after match skip to next row 如下图

(3)after match skip to last patternItem 如下图

(4)after match skip to first patternItem 如下图

101、编写一个CEP SQL案例,如银行卡盗刷
通过Flink CEP SQL 写的关于金融场景 银行卡盗刷案例。
案例介绍:在金融场景中,有时会出现银行卡盗刷现象,犯罪分子利用互联网等技术,在间隔10分钟或者更短时间内,使一张银行卡在不同的两个地方出现多次刷卡记录,这从常规操作来说,在间隔时间很多的情况下,用户是无法同时在两个城市进行刷卡交易的,所以出现这种问题,就需要后台做出触发报警机制。
要求:当相同的cardId在十分钟内,从两个不同的Location发生刷卡现象,触发报警机制,以便检测信用卡盗刷现象。

(1)编写cep sql时,包含许多技巧,首先我们编写最基础的查询语句,从一张表中查询需要的字段。
select starttime,endtime,cardId,event from dataStream
(2)match_recognize();
该字段是CEP SQL 的前提条件,用于生成一个追加表,所有的 CEP SQL都是书写在这里面。
(3)分区,排序
由于是对同一ID,所以需要使用 partition by,还要根据时间进行排序 order by
(4)理解CEP SQL核心的编写顺序
如上图标的顺序
1、CEP SQL 的类为Pattern,检测在10分钟内两个地方出现刷卡现象,所以定义两个事件:
Pattern (e1 e2+) within interval ‘10’minute
2、定义在Pattern中要求的判断语句,规定使用define
define
e1 as a1.action = ''
e2 as e2.action = '' and e2.location <> e1.location
3、根据上述的输入条件构建输出条件,规定使用 measures
measures
e2.action as event
e1.timestamp as starttime
last(e2.timestamp) as endtime
4、输出条件匹配成功,输出一条,规定写法(这块根据不同的规则写不同的语句)
one row per match
5、匹配后跳转跳转到下一行(根据不同规则写不同语句)
after match skip to next row

根据核心编写顺序进行理解,然后在按照书写正确的顺序进行编写。
102、Flink CDC了解吗?什么是 Flink SQL CDC Connectors?
在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。
Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors,开源地址:https://github.com/ververica/flink-cdc-connectors。
目前(1.13版本)支持的 Connectors 如下:

另外支持解析 Kafka 中 debezium-json 和 canal-json 格式的 Change Log,通过Flink 进行计算或者直接写入到其他外部数据存储系统(比如 Elasticsearch),或者将 Changelog Json 格式的 Flink 数据写入到 Kafka:

Flink CDC Connectors 和 Flink 之间的版本映射:

103、Flink CDC原理介绍一下
在最新CDC 调研报告中,Debezium 和 Canal 是目前最流行使用的 CDC 工具,这些 CDC 工具的核心原理是抽取数据库日志获取变更。在经过一系列调研后,目前Debezium (支持全量、增量同步,同时支持 MySQL、PostgreSQL、Oracle 等数据库),使用较为广泛。
Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。(以下右侧是 Debezium 的数据格式,左侧是 Flink 的 RowData 数据格式)。

104、通过CDC设计一种Flink SQL 采集+计算+传输(ETL)一体化的实时数仓
设计图如下:

通过 Flink CDC connectors 替换 Debezium+Kafka 的数据采集模块,实现 Flink SQL 采集+计算+传输(ETL)一体化,以Mysql为Source源,Flink CDC中间件为插件,ES或者Kafka,或者其他为Sink,这样设计的优点如下:
开箱即用,简单易上手
减少维护的组件,简化实时链路,减轻部署成本
减小端到端延迟
Flink 自身支持 Exactly Once 的读取和计算
数据不落地,减少存储成本
支持全量和增量流式读取
binlog 采集位点可回溯
105、Flink SQL CDC如何实现一致性保障(源码分析)
/*** The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data* from databases into Flink.* 通过Checkpoint机制来保证发生failure时不会丢数,实现exactly once语义* <p>The source function participates in checkpointing and guarantees that no data is lost* during a failure, and that the computation processes elements "exactly once".* 注意:这个Source Function不能同时运行多个实例* <p>Note: currently, the source function can't run in multiple parallel instances.** <p>Please refer to Debezium's documentation for the available configuration properties:* https://debezium.io/documentation/reference/1.2/development/engine.html#engine-properties</p>*/@PublicEvolvingpublic class DebeziumSourceFunction<T> extends RichSourceFunction<T> implementsCheckpointedFunction,ResultTypeQueryable<T> {}
为实现 CheckpointedFunction,需要实现以下两个方法:
public interface CheckpointedFunction {//做快照,把内存中的数据保存在checkpoint状态中void snapshotState(FunctionSnapshotContext var1) throws Exception;//程序异常恢复后从checkpoint状态中恢复数据void initializeState(FunctionInitializationContext var1) throws Exception;}
接下来我们看看 DebeziumSourceFunction 中都记录了哪些状态。
/** Accessor for state in the operator state backend.offsetState中记录了读取的binlog文件和位移信息等,对应Debezium中的*/private transient ListState<byte[]> offsetState;/*** State to store the history records, i.e. schema changes.* historyRecordsState记录了schema的变化等信息* @see FlinkDatabaseHistory*/private transient ListState<String> historyRecordsState;
我们发现在 Flink SQL CDC 是一个相对简易的场景,没有中间算子,是通过 Checkpoint 持久化 binglog 消费位移和 schema 变化信息的快照,来实现 Exactly Once。
106、Flink SQL GateWay了解吗?
Flink SQL GateWay的概念:
FlinkSql Gateway是Flink集群的“任务网关”,支持以restapi 的形式提交查询、插入、删除等任务,如下图所示:

总体架构如下图所示:

107、Flink SQL GateWay创建会话讲解一下?
创建会话流程图如下:

108、Flink SQL GateWay如何处理并发请求?多个提交怎么处理?
sql gateway内部维护SessionManager,里面通过Map维护了各个Session,每个Session的任务执行是独立的。同一个Session通过ExecuteContext内部的tEnv按顺序提交。
109、如何维护多个SQL之间的关联性?
110、sql字符串如何提交到集群成为代码?
以上就是Flink 的全部内容,160张图全部亲自绘制出来!不想被白嫖呀,觉得好的,点赞,在看,分享三连击,谢谢!!!
最后
第一时间获取最新大数据技术,尽在本公众号:3分钟秒懂大数据
--END--

获取本文PDF版,请扫下方二维码 加我微信,备注:Flink







