暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink本地调试代码

原创 Abel Chan 2021-12-11
3469

Flink 架构概览

Flink 是大数据领域的分布式实时和离线计算引擎,其程序的基础构建模块是流(Streams)和转换(Transformations),每一个数据流起始于一个或多个 Source,并终止于一个或多个 Sink。数据流类似于有向无环图(DAG)。

Flink 提供了诸多高抽象层的 API 以便用户编写分布式任务:

DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用 Flink 提供的各种操作符对分布式数据集进行处理,支持 Java、Scala 和 Python;

DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持 Java 和 Scala;

SQL和Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类 SQL 的 DSL 对关系表进行各种查询操作,支持 Java 和 Scala。


用户通过 DataStream API、DataSet API、SQL 和 Table API 编写 Flink 任务;Flink 程序启动后,会根据用户的代码处理成 Stream Graph,然后优化成为 JobGraph它会生成一个JobGraph;JobGraph 是由 source、map()、keyBy()/window()/apply() 和 Sink 等算子组成的。当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。JobManager 会根据 JobGraph 生成 ExecutionGraph;ExecutionGraph 才是 Flink 真正能执行的数据结构,当很多个 ExecutionGraph 分布在集群中,就会形成一张网状的拓扑结构。

本地模式安装


请按照以下几个步骤下载最新的稳定版本开始使用。


步骤 1:下载#

为了运行Flink,只需提前安装好 Java 8 或者 Java 11。你可以通过以下命令来检查 Java 是否已经安装正确。

java -version

下载release {{ site.version }} 并解压。

$ tar -xzf flink-{{ site.version }}-bin-scala_2.11.tgz

$ cd flink-{{ site.version }}-bin-scala_2.11


步骤 2:启动集群#

Flink 附带了一个 bash 脚本,可以用于启动本地集群。

$ ./bin/start-cluster.sh

Starting cluster.

Starting standalonesession daemon on host.

Starting taskexecutor daemon on host.


步骤 3:提交作业(Job)#

Flink 的 Releases 附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上。

$ ./bin/flink run examples/streaming/WordCount.jar

$ tail log/flink-*-taskexecutor-*.out

(to,1)

(be,1)

(or,1)

(not,1)

(to,2)

(be,2)

另外,你可以通过 Flink 的 Web UI来监视集群的状态和正在运行的作业。


步骤 4:停止集群#

完成后,你可以快速停止集群和所有正在运行的组件。

$ ./bin/stop-cluster.sh


Flink WebUI 界面

观察Flink集群首先想到的就是 Flink WebUI 界面:打开浏览器并访问http://localhost:8081 , Flink WebUI 界面包含许多关于 Flink 集群以及运行在其上的 Jobs 的有用信息,比如:JobGraph、Metrics、Checkpointing Statistics、TaskManager Status 等等。

本地调试代码


通常来讲,任何一门大数据框架在实际生产环境中都是以集群的形式运行,而我们调试代码大多数会在本地搭建一个模板工程,Flink 也不例外。

Flink 一个以 Java 及 Scala 作为开发语言的开源大数据项目,客户端支持:Java、Scala、Python;  本篇wiiki使用 Java 来作为开发语言,Maven 作为编译和包管理工具进行项目构建和编译。对于大多数开发者而言,JDK、Maven 和 Git 这三个开发工具是必不可少的。参考:https://github.com/apache/flink

关于 JDK、Maven 和 Git 的安装建议如下表所示:

WordCount

WordCount 程序是大数据处理框架的入门程序,俗称“单词计数”。用来统计一段文字每个单词的出现次数,该程序主要分为两个部分:一部分是将文字拆分成单词;另一部分是单词进行分组计数并打印输出结果。

Flink简略处理流程

 1)创建Flink的批处理/流处理计算环境→ 2)监听本地某个端口(可选 )→ 3)将接收的数据进行拆分、分组、窗口计算并且进行聚合输出

大纲

本wiki介绍如何搭建本地调试环境的脚手架;然后分别从DataSet(批处理)、DataStream(流处理)、Flink Table & SQL等三种方式进行单词计数开发;


1)DataSet WordCount

主要步骤/流程:

1.创建 Flink 的上下文运行环境

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

2.使用 fromElements 函数创建一个 DataSet 对象

DataSet<String> text = env.fromElements("Flink Spark Storm","Flink Flink Flink","Spark Spark Spark","Storm Storm Storm");

3. 通过Flink内置的转换函数进行计算

DataSet<Tuple2<String, Integer>> counts =text.flatMap(new LineSplitter()) .groupBy(0)  .sum(1);

4. 打印结果

 counts.printToErr();


package org.apache.flink.examples.java.wordcount;


import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.MultipleParameterTool;

import org.apache.flink.examples.java.wordcount.util.WordCountData;

import org.apache.flink.util.Collector;

import org.apache.flink.util.Preconditions;

    public static void main(String[] args) throws Exception {

      // 创建Flink运行的上下文环境

      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 创建DataSet,这里我们的输入是一行一行的文本

      DataSet<String> text = env.fromElements(

            "Flink Spark Storm",

            "Flink Flink Flink",

            "Spark Spark Spark",

            "Storm Storm Storm"

      );

      // 通过Flink内置的转换函数进行计算

      DataSet<Tuple2<String, Integer>> counts =text.flatMap(new LineSplitter()) .groupBy(0).sum(1);

      //结果打印

      counts.printToErr();

   }

   public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

      @Override

      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

         // 将文本分割

         String[] tokens = value.toLowerCase().split("\\W+");

         for (String token : tokens) {

            if (token.length() > 0) {

               out.collect(new Tuple2<String, Integer>(token, 1));

            }

         }

      }

    }


2)DataStream WordCount

Flink 程序的基本构建是数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个 Sink 接收器中结束。数据流(Stream)就是一组永远不会停止的数据记录流,而转换(Transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。在执行时,Flink 程序映射到 Streaming Dataflows,由流(Streams)和转换操作(Transformation Operators)组成。


为了模仿一个流式计算环境,我们选择监听一个本地的 Socket 端口,并且使用 Flink 中的滚动窗口,每 5 秒打印一次计算结果。

public class StreamingJob {

    public static void main(String[] args) throws Exception {

        // 创建Flink的流式计算环境

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 监听本地9000端口

        DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");

        // 将接收的数据进行拆分,分组,窗口计算并且进行聚合输出

        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {

                    @Override

                    public void flatMap(String value, Collector<WordWithCount> out) {

                        for (String word : value.split("\\s")) {

                            out.collect(new WordWithCount(word, 1L));

                        }

                    }

                })

                .keyBy("word")

                .timeWindow(Time.seconds(5), Time.seconds(1))

                .reduce(new ReduceFunction<WordWithCount>() {

                    @Override

                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {

                        return new WordWithCount(a.word, a.count + b.count);

                    }

                });

        // 打印结果

        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");

    }

    // Data type for words with count

    public static class WordWithCount {

        public String word;

        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {

            this.word = word;

            this.count = count;

        }

        @Override

        public String toString() {

            return word + " : " + count;

        }

    }

}

3)Flink Table & SQL WordCount

Flink Table & SQL 实现原理图

备注:从图中可以看到无论是批查询 SQL 还是流式查询 SQL,都会经过对应的转换器 Parser 转换成为节点树 SQLNode tree,然后生成逻辑执行计划 Logical Plan,逻辑执行计划在经过优化后生成真正可以执行的物理执行计划,交给 DataSet 或者 DataStream 的 API 去执行。

代码参考:https://github.com/wangzhiwubigdata/quickstart/blob/master/src/main/java/org/myorg/quickstart/WordCountSQL.java

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

一个完整的 Flink SQL 编写的程序包括如下三部分。

1.  Source Operator:是对外部数据源的抽象, 目前 Apache Flink 内置了很多常用的数据源实现,比如 MySQL、Kafka 等。

2. Transformation Operators:算子操作主要完成比如查询、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多数传统数据库支持的操作。

3.  Sink Operator:是对外结果表的抽象,目前 Apache Flink 也内置了很多常用的结果表的抽象,比如 Kafka Sink 等。

我们也是通过用一个最经典的 WordCount 程序作为入门,上面已经通过 DataSet/DataStream API 开发,那么实现同样的 WordCount 功能, Flink Table & SQL 核心只需要一行代码:

//省略掉初始化环境等公共代码

SELECT word, COUNT(word) FROM table GROUP BY word;

首先,整个工程中我们 pom 中的依赖如下图所示:

<dependency>

         <groupId>org.apache.flink</groupId>

         <artifactId>flink-java</artifactId>

         <version>1.10.0</version>

     </dependency>

<dependency>

         <groupId>org.apache.flink</groupId>

         <artifactId>flink-streaming-java_2.11

         <version>1.10.0</version>

</dependency>

<dependency>

         <groupId>org.apache.flink</groupId>

         <artifactId>flink-table-api-java-bridge_2.11</artifactId>

         <version>1.10.0</version>

</dependency>

<dependency>

         <groupId>org.apache.flink</groupId>

         <artifactId>flink-table-planner-blink_2.11</artifactId>

         <version>1.10.0</version>

</dependency>

<dependency>

         <groupId>org.apache.flink</groupId>

         <artifactId>flink-table-planner_2.11</artifactId>

         <version>1.10.0</version>

</dependency>

     <dependency>

         <groupId>org.apache.flink</groupId>

         <artifactId>flink-table-api-scala-bridge_2.11</artifactId>

         <version>1.10.0</version>

</dependency>

核心逻辑分为3步:1: 创建上下文环境 2: 读取一行模拟数据作为输入  3: 注册成表,执行 SQL,然后输出

整体代码结构如下:

public class WordCountSQL {

    public static void main(String[] args) throws Exception{

        //获取运行环境

        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();

        //创建一个tableEnvironment

        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        String words = "hello flink hello lagou";

        String[] split = words.split("\\W+");

        ArrayList<WC> list = new ArrayList<>();

        for(String word : split){

            WC wc = new WC(word,1);

            list.add(wc);

        }

        DataSet<WC> input = fbEnv.fromCollection(list);

        //DataSet 转sql, 指定字段名

        Table table = fbTableEnv.fromDataSet(input, "word,frequency");

        table.printSchema();

        //注册为一个表

        fbTableEnv.createTemporaryView("WordCount", table);

        Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");

        //将表转换DataSet

        DataSet<WC> ds3  = fbTableEnv.toDataSet(table02, WC.class);

        ds3.printToErr();

    }

    public static class WC {

        public String word;

        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {

            this.word = word;

            this.frequency = frequency;

        }

        @Override

        public String toString() {

            return  word + ", " + frequency;

        }

    }

}


############################################################################

python client参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/

官方例子:

https://github.com/apache/flink-playgrounds

flink learning

https://flink-learning.org.cn/corporate-practice

https://flink-learning.org.cn/developers/flink-training-course-basics/

基于 Flink 实现的商品实时推荐系统(附源码)

https://github.com/CheckChe0803/flink-recommandSystem-demo 


https://mp.weixin.qq.com/s/9EZP2zxvRHk2X61Natj60w

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论