暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink数据倾斜实战案例剖析

大数据从业者 2021-07-17
1501

前言

大数据领域实际处理场景中,数据倾斜问题是所有大数据工程师都绕不开的。那么使用Flink实时计算引擎时,如何解决数据倾斜问题呢?本文以一个实战案例演示,如何采用分而治之的思想、处理keyBy数据倾斜问题。

数据倾斜
通常情况下,我们用Flink处理数据时期望的拓扑如图:

而实际的拓扑可能如图:

但是呢,我们的业务需求就是要根据key进行聚合统计。

那么怎么才能将相同key放在不同subtask计算呢?

解决方案:

    1.打散热点key(增加前缀或后缀随机数标签,确保数据散列)
    2.进行第一次聚合统计
    3.还原散列key为原来的key
    4.进行第二次聚合统计

    实战案例剖析

    通过DataStream API中socketTextStream读取nc数据(port:4444),进行wordcount计算。

    说明:数据倾斜是指大数量下数据不均匀的问题,小数据量可以不考虑。本案例仅以小数据为例,演示具体处理方法而已。

    我们首先看不考虑数据倾斜问题的实现代码:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream<String> socketData = env.socketTextStream("10.121.198.220"4444);
      DataStream<Tuple2<String, Integer>> tupleData = socketData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
      @Override
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
      Tuple2<String, Integer> data = new Tuple2<>();
      data.setFields(value.split(",")[0], Integer.parseInt(value.split(",")[1]));
      out.collect(data);
      }
      }).setParallelism(4);


      tupleData.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
      @Override
      public String getKey(Tuple2<String, Integer> value) throws Exception {
      return value.f0;
      }
      }).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
      Tuple2<String, Integer> data = new Tuple2<>();
      data.setFields(value1.f0, value1.f1 + value2.f1);
      return data;
      }
      }).print();


      env.execute("SocketApplication");

      测试结果:

      如上图所示,我们可以看到keyBy之后,数据出现不均衡,1:10。

      下面,我们使用两阶段keyBy处理这种倾斜问题。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        DataStream<String> socketData = env.socketTextStream("10.121.198.220", 4444);


        Random random = new Random(1);
        DataStream<Tuple2<String, Integer>> tupleData = socketData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        Tuple2<String, Integer> data = new Tuple2<>();
        data.setFields(value.split(",")[0] + "_" + random.nextInt(50), Integer.parseInt(value.split(",")[1]));
        out.collect(data);
        }
        }).setParallelism(4);


        KeyedStream<Tuple2<String, Integer>, String> keyedStream = tupleData.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
        return value.f0;
        }
        });


        keyedStream.print("增加后缀标签打散key");


        SingleOutputStreamOperator<HashMap<String, Integer>> dataStream1 = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new MyCountAggregate()).setParallelism(4);
        dataStream1.print("第一次keyBy输出");


        DataStream<Tuple2<String, Integer>> dataStream2 = dataStream1.flatMap(new FlatMapFunction<HashMap<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(HashMap<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        value.forEach((k, v) -> {
        Tuple2<String, Integer> data = new Tuple2<>();
        data.setFields(k.split("_")[0], v);
        out.collect(data);
        });
        }
        }).setParallelism(4);


        dataStream2.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
        @Override
        public Object getKey(Tuple2<String, Integer> value) throws Exception {
        return value.f0;
        }
        }).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        Tuple2<String, Integer> data = new Tuple2<>();
        data.setFields(value1.f0, value1.f1 + value2.f1);
        return data;
        }
        }).setParallelism(4).print("第二次keyBy输出");


        env.execute("SocketApplicationV2");

        其中,用到的MyCountAggregate类:

          public class MyCountAggregate implements AggregateFunction<Tuple2<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>> {
          @Override
          public HashMap<String, Integer> createAccumulator() {
          return new HashMap<>();
          }


          @Override
          public HashMap<String, Integer> add(Tuple2<String, Integer> value, HashMap<String, Integer> accumulator) {
                  accumulator.merge(value.f0, value.f1, Integer::sum);
          return accumulator;
          }


          @Override
          public HashMap<String, Integer> getResult(HashMap<String, Integer> accumulator) {
          return accumulator;
          }


          @Override
          public HashMap<String, Integer> merge(HashMap<String, Integer> a, HashMap<String, Integer> b) {
          for (Map.Entry<String, Integer> entry : a.entrySet()) {
                      b.merge(entry.getKey(), entry.getValue(), Integer::sum);
          }
          return b;
          }
          }

          测试结果:


          从上述结果可以看到,我们已经将数据倾斜从1:10改为第一阶段4:7,第二阶段1:3。再次强调说明:因为数据量比较小,所以优化效果显示并不那么好,仅以此案例演示原理而已。

          至此我们既保证了数据定时聚合统计wordcount,也保证了数据不倾斜问题。

          文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论