Flink 架构概览
Flink 是大数据领域的分布式实时和离线计算引擎,其程序的基础构建模块是流(Streams)和转换(Transformations),每一个数据流起始于一个或多个 Source,并终止于一个或多个 Sink。数据流类似于有向无环图(DAG)。
Flink 提供了诸多高抽象层的 API 以便用户编写分布式任务:
DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用 Flink 提供的各种操作符对分布式数据集进行处理,支持 Java、Scala 和 Python;
DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持 Java 和 Scala;
SQL和Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类 SQL 的 DSL 对关系表进行各种查询操作,支持 Java 和 Scala。
用户通过 DataStream API、DataSet API、SQL 和 Table API 编写 Flink 任务;Flink 程序启动后,会根据用户的代码处理成 Stream Graph,然后优化成为 JobGraph它会生成一个JobGraph;JobGraph 是由 source、map()、keyBy()/window()/apply() 和 Sink 等算子组成的。当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。JobManager 会根据 JobGraph 生成 ExecutionGraph;ExecutionGraph 才是 Flink 真正能执行的数据结构,当很多个 ExecutionGraph 分布在集群中,就会形成一张网状的拓扑结构。
本地模式安装
请按照以下几个步骤下载最新的稳定版本开始使用。
步骤 1:下载#
为了运行Flink,只需提前安装好 Java 8 或者 Java 11。你可以通过以下命令来检查 Java 是否已经安装正确。
java -version
下载release {{ site.version }} 并解压。
$ tar -xzf flink-{{ site.version }}-bin-scala_2.11.tgz
$ cd flink-{{ site.version }}-bin-scala_2.11
步骤 2:启动集群#
Flink 附带了一个 bash 脚本,可以用于启动本地集群。
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
步骤 3:提交作业(Job)#
Flink 的 Releases 附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上。
$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
(to,1)
(be,1)
(or,1)
(not,1)
(to,2)
(be,2)
另外,你可以通过 Flink 的 Web UI来监视集群的状态和正在运行的作业。
步骤 4:停止集群#
完成后,你可以快速停止集群和所有正在运行的组件。
$ ./bin/stop-cluster.sh
Flink WebUI 界面
观察Flink集群首先想到的就是 Flink WebUI 界面:打开浏览器并访问http://localhost:8081 , Flink WebUI 界面包含许多关于 Flink 集群以及运行在其上的 Jobs 的有用信息,比如:JobGraph、Metrics、Checkpointing Statistics、TaskManager Status 等等。
本地调试代码
通常来讲,任何一门大数据框架在实际生产环境中都是以集群的形式运行,而我们调试代码大多数会在本地搭建一个模板工程,Flink 也不例外。
Flink 一个以 Java 及 Scala 作为开发语言的开源大数据项目,客户端支持:Java、Scala、Python; 本篇wiiki使用 Java 来作为开发语言,Maven 作为编译和包管理工具进行项目构建和编译。对于大多数开发者而言,JDK、Maven 和 Git 这三个开发工具是必不可少的。参考:https://github.com/apache/flink
关于 JDK、Maven 和 Git 的安装建议如下表所示:
WordCount
WordCount 程序是大数据处理框架的入门程序,俗称“单词计数”。用来统计一段文字每个单词的出现次数,该程序主要分为两个部分:一部分是将文字拆分成单词;另一部分是单词进行分组计数并打印输出结果。
Flink简略处理流程
1)创建Flink的批处理/流处理计算环境→ 2)监听本地某个端口(可选 )→ 3)将接收的数据进行拆分、分组、窗口计算并且进行聚合输出
大纲
本wiki介绍如何搭建本地调试环境的脚手架;然后分别从DataSet(批处理)、DataStream(流处理)、Flink Table & SQL等三种方式进行单词计数开发;
1)DataSet WordCount
主要步骤/流程:
1.创建 Flink 的上下文运行环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2.使用 fromElements 函数创建一个 DataSet 对象
DataSet<String> text = env.fromElements("Flink Spark Storm","Flink Flink Flink","Spark Spark Spark","Storm Storm Storm");
3. 通过Flink内置的转换函数进行计算
DataSet<Tuple2<String, Integer>> counts =text.flatMap(new LineSplitter()) .groupBy(0) .sum(1);
4. 打印结果
counts.printToErr();
package org.apache.flink.examples.java.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
public static void main(String[] args) throws Exception {
// 创建Flink运行的上下文环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建DataSet,这里我们的输入是一行一行的文本
DataSet<String> text = env.fromElements(
"Flink Spark Storm",
"Flink Flink Flink",
"Spark Spark Spark",
"Storm Storm Storm"
);
// 通过Flink内置的转换函数进行计算
DataSet<Tuple2<String, Integer>> counts =text.flatMap(new LineSplitter()) .groupBy(0).sum(1);
//结果打印
counts.printToErr();
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 将文本分割
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
2)DataStream WordCount
Flink 程序的基本构建是数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个 Sink 接收器中结束。数据流(Stream)就是一组永远不会停止的数据记录流,而转换(Transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。在执行时,Flink 程序映射到 Streaming Dataflows,由流(Streams)和转换操作(Transformation Operators)组成。
为了模仿一个流式计算环境,我们选择监听一个本地的 Socket 端口,并且使用 Flink 中的滚动窗口,每 5 秒打印一次计算结果。
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 创建Flink的流式计算环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 监听本地9000端口
DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");
// 将接收的数据进行拆分,分组,窗口计算并且进行聚合输出
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// 打印结果
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
3)Flink Table & SQL WordCount
Flink Table & SQL 实现原理图
备注:从图中可以看到无论是批查询 SQL 还是流式查询 SQL,都会经过对应的转换器 Parser 转换成为节点树 SQLNode tree,然后生成逻辑执行计划 Logical Plan,逻辑执行计划在经过优化后生成真正可以执行的物理执行计划,交给 DataSet 或者 DataStream 的 API 去执行。
Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
一个完整的 Flink SQL 编写的程序包括如下三部分。
1. Source Operator:是对外部数据源的抽象, 目前 Apache Flink 内置了很多常用的数据源实现,比如 MySQL、Kafka 等。
2. Transformation Operators:算子操作主要完成比如查询、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多数传统数据库支持的操作。
3. Sink Operator:是对外结果表的抽象,目前 Apache Flink 也内置了很多常用的结果表的抽象,比如 Kafka Sink 等。
我们也是通过用一个最经典的 WordCount 程序作为入门,上面已经通过 DataSet/DataStream API 开发,那么实现同样的 WordCount 功能, Flink Table & SQL 核心只需要一行代码:
//省略掉初始化环境等公共代码
SELECT word, COUNT(word) FROM table GROUP BY word;
首先,整个工程中我们 pom 中的依赖如下图所示:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
核心逻辑分为3步:1: 创建上下文环境 2: 读取一行模拟数据作为输入 3: 注册成表,执行 SQL,然后输出
整体代码结构如下:
public class WordCountSQL {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
//创建一个tableEnvironment
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
String words = "hello flink hello lagou";
String[] split = words.split("\\W+");
ArrayList<WC> list = new ArrayList<>();
for(String word : split){
WC wc = new WC(word,1);
list.add(wc);
}
DataSet<WC> input = fbEnv.fromCollection(list);
//DataSet 转sql, 指定字段名
Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();
//注册为一个表
fbTableEnv.createTemporaryView("WordCount", table);
Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");
//将表转换DataSet
DataSet<WC> ds3 = fbTableEnv.toDataSet(table02, WC.class);
ds3.printToErr();
}
public static class WC {
public String word;
public long frequency;
public WC() {}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return word + ", " + frequency;
}
}
}
############################################################################
python client参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/
官方例子:
https://github.com/apache/flink-playgrounds
flink learning
https://flink-learning.org.cn/corporate-practice
https://flink-learning.org.cn/developers/flink-training-course-basics/
基于 Flink 实现的商品实时推荐系统(附源码)
https://github.com/CheckChe0803/flink-recommandSystem-demo
https://mp.weixin.qq.com/s/9EZP2zxvRHk2X61Natj60w




