暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

「Flink」工具人职业规划与Flink执行计划(一)

将咖啡转化为程序的工具人 2021-09-09
509

最近,有位刚出道的年轻人问了工具人一个很残酷的问题:工具人们的未来在哪里?生锈了之后,是去滴滴拉黄包车,还是去美团做小黄哥?

对于这个问题工具人有上中下三策:

下策:买点东方红基金,有了睡后收入,可以温饱无忧。  

风险在于,工具人工资太低,买不了多少份额

中策:找个有“家文化”的公司,发挥发挥余热。

风险在于,在这种公司里,容易遇到钻石不粘锅,还抹了油

上策:努力提升自己,做好长远的职业规划。

风险在于,也许职业规划,一不小心就成了傀儡养成计划

于是,为了避免年轻人走弯路,所以工具人分享了自己的职业规划,如有误人子弟,请各位看官不吝赐教~

      工具人很幸运,身在一个重视人才储备的公司,为了区区二三十人的开发团队,不计代价,汇聚了一批万中无一的顶级产品经理,并正在招募一堆千里挑一的王者BA,产品+BA与开发人数比近乎达到1:1,作为百无一用的工具人,非常惭愧,自觉还有很长的路要走,才能跟得上这些优秀同伴的脚步。

      嘴炮不如实干,空想不如行动!有了职业规划,也得有执行计划,今天我们就从Flink执行计划开始,一步一步向理想进发。

       假如我们有如下一段程序:

    public class WordCount {


    public static void main(String[] args) throws Exception {
    定义socket的端口号
    int port;
    try{
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    port = parameterTool.getInt("port");
    }catch (Exception e){
    System.err.println("没有指定port参数,使用默认值9090");
    port = 9090;
            }
    获取运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    连接socket获取输入的数据
            DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n");
    计算数据
    DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
    String[] splits = value.split("\\s");
    for (String word:splits) {
    out.collect(new WordWithCount(word,1L));
    }
    }
    })//打平操作,把每行的单词转为<word,count>类型的数据
    .keyBy("word")//针对相同的word数据进行分组
    .timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
    .sum("count");


    把数据打印到控制台
    windowCount.print()
    .setParallelism(1);//使用一个并行度
    注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
            
            ;
        }
    **
    * 主要为了存储单词以及单词出现的次数
    */
    public static class WordWithCount{
    public String word;
    public long count;
            ........
        }
    }

    然后我们将程序打包,上传至flink

    那么问题来了:flink程序是怎么跑起来的?

    1,谁执行了程序的main函数?

    2,分布式计算调度的最小单元是什么?

    带着这两个问题,我们先来了解下flink集群中的几个重要角色:

    官网的这张图比较老了,不过不影响我们理解这几个重要的角色的职责。图左下角的是JobClient,实际上就是main函数执行者,也是我们本次讨论的重点。他主要负责这几个任务:1,生成执行计划。2,优化执行单元。3,将JobGraph发送到JobManager。图的右下角是JobManager,他是Flink的大脑,即调度中心。图的上方TaskManager,程序逻辑的运行者,也是实际的运行资源。

    对于流计算来说,JobClient 在执行时,首先会解析我们的程序,其过程分为如下几步:

    一,算子转换

    flink程序开始都会创建一个运行环境StreamExecutionEnvironment:

      //创建运行时
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      StreamExecutionEnvironment中有个成员变量transformations,在这里记录了我们所有的算子信息(这里看似是个列表,实际上每个元素都带有父子信息,整体可以理解为一个树)。

        protected final List<StreamTransformation<?>> transformations = new ArrayList<>();

        对于每一个算子,在加入到transformations列表的过程中,都会做一系列转化,将不同的算子转换为不同的transformation对象,并最终生成算子树。如刚才程序中的flatMap:

          text.flatMap(new FlatMapFunction<String, WordWithCount>() {
          public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
          String[] splits = value.split("\\s");
          for (String word:splits) {
          out.collect(new WordWithCount(word,1L));
          }
          }

          以Flatmap为例,首先,flink会构造Function  →  Operator的转化,同时初始化chainingStragy,chainingStrategy 类型分为 ALWAYS,HEAD,NEVER,表示该算子是否能被其他算子串连,一般默认是可以串连的(ALWAYS),只有source算子为HEAD,或者手工在程序中强制指定为不可传来(NEVER)

          然后,构造当前算子的OneInputTransformation 对象 初始化slot的共享组(slotSharingGroup),算子的并行度(parallelism),以及把当前的transformation作为新的OneInputTransformation 的输入保存(input),以此记录算子的上下游关系。

          以OneInputTransformation为例:


            private String slotSharingGroup; //表示slot的共享组
            private int parallelism; //该算子的并行度
            private final StreamTransformation<IN> input;
            private final OneInputStreamOperator<IN, OUT> operator;

            最后,将该transformation加入到transformations队列中。并返回新的stream对象(根据算子进行stream转换)。

            最终我们的程序会生成如下的的transforation树,记录了我们所有的算子的连接关系。

            二,生成StreamGraph       

                    程序最后的调用:env.execute();Jobclient会遍历env的transformations列表中的所有transformation对象,递归构建DAG(为什么是递归呢?为了保证是从后向前构建的,递归的退出条件是遇到了SourceTransformation)最后构造出StreamGraph对象。


                    其中,node的概念比较容易理解,一一对应了我们的算子方法。而边的性质代表数据如何下发,不同的算子类型以及并发度,决定了不同的分发策略,比如:当上下游并发度相同时,则为forward,如果不同则为rebalance,特殊算子还有特殊的分发方式,如broadcast。在识别并构造完这些数据结构后,将交给下一步生成JobGraph时,优化执行计划。


            三,生成JobGraph

                   JobGraph是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。

                    在建立jobGraph的一开始,会设置jobGraph的ScheduledMode,ScheduledMode分为eager和lazy,流式计算的ScheduledMode被固定设置为了eager。其中,Eager表示所有计算节点将会在开始时立即启动。而Lazy表示上游计算节点计算完成后下游节点再启动。

                   其核心过程在于递归建立计算节点链,当满足特定条件node会被合并,创建为一个JobVertx,缓存vertx两头的StreamEdge,接着创建JobEdge(分发策略ALL_TO_ALL,还是POINTWISE:一对多Or一对一)与上下游的JobVertx相连,同时创建设置IntermediateDataSet,用来记录输出是否需要背压控制,输出是否需要缓冲区限制。

            如上图,为什么streamGraph的最后两个节点,能够被合并成了一个JobVertex,而其他节点不行呢?

            因为被合并需要满足以下几个重要条件

                        1.下游的输入边只有一条

                        2.下游操作operator不为空

                        3.上游操作operator不为空

                        4.上游必须有相同的solt组

                        5.下游chain策略为always

                        6.上游chain策略为head或上游chain策略为always

                        7.边的类型为forwardpartition

                        8.上下游并行度相同

                        9.用户代码设置的operator是否可以chain


                  合并之后有什么用处呢?很明显,可以节省flink的计算资源,让合并后的任务可以在同一个节点,同一个线程中执行,减少了io和线程切换等额外开销。

            四,上传JobGraph

            JobGraph生成完后,会被序列化,并通过HTTP协议发送到JobManager存储起来,JobManager会根据JobGraph生成最终的执行计划ExecutionGraph。关于ExecutionGraph的生成,我们下次再学习。


            工具人相信,在这样一群优秀的产品经理和BA的带领下,公司APP产品一定能在短时间内碾压同草顺,小聪明,东方赤贫等各大龙头。


            如果工具人们迟钝的意识

            无法体会那些产品

            精简的需求背后多样的场景;


            如果工具人们浅薄的学识

            无法领悟那些产品

            抽象的草图背后神秘的哲学;


            如果工具人们残破的躯壳,

            无法容纳那些产品,

            矛盾的设计背后隐忍的自洽;


            请大家包容,给工具人们一些时间,学习、提升,努力不成为后腿,不辜负大家的期望!


            文章转载自将咖啡转化为程序的工具人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论