暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink CookBook | Hive Table并发设置

data之道 2021-06-20
3288
摘要:

本文介绍如何在Flink中配置程序的并发执行。Flink应由多个transform任务(Source、Sink、算子/Operators)组成,一个任务又可以被分成几个并行的实例来执行,每个并行实例处理任务输入数据的一部分;同时也详细探讨下Hive Table Source的并发原理。

并发设置

有多种方法控制Flink并发:

1.Flink配置文件里的parallelism.default参数,配置全局默认

2.DataStream API里,可以更详细的在不同粒度设置

    • StreamExecutionEnvironment.setParallelism()配置全局默认并发度,会覆盖配置文件里的parallelism.default参数

    • source、sink、operator上都可以通过setParallelism()设置每个Task的并行度,Task上设置的优先级比Environment上高

3.Table API,可以通过TableConfig对象的table.exec.resource.default-parallelism配置项,为所有算子设置默认并行度,此配置比StreamExecutionEnvironment的并行性具有更高的优先级,实际上会用该配置项覆盖StreamExecutionEnvironment的并行度。使用方式:

tEnv.getConfig()
.getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);

Hive Table Source

在用Hive Table Source时,有一个table.exec.hive.infer-source-parallelism参数,如果为false,则Source的并行度由config设置,如果为true,Flink将根据文件数和每个文件中的block数推断读取hive的最佳并行度,即根据hive表对应的hdfs文件splits number推断。默认是true,我们可以更改默认值:

tEnv.getConfig()
.getConfiguration()
.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
  自动推断的过程:

  在获取查询Hive表对应的InputSplit的步骤:
  1. 从Hive MetaStore获取表的分区,如果不是分区表,当成一个分区
  2. 遍历每个分区下面的文件,获取每个文件的splits
  3. 汇总所有splits,得到splits number,作为source table的并发度依据之一

为了避免在自动推断时,并发过度,特别的如果一个表里的文件很多,得到的splits number可能很大,可以用table.exec.hive.infer-source-parallelism.max控制并发上限,默认是1000:

tEnv.getConfig()
.getConfiguration()
.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 50);

小结下就是在用Flink SQL时,有两种方法设置Hive Table Source并发:table.exec.hive.infer-source-parallelism参数或者让程序自动推断,我们不能明确指定每一个table source的并发;当然,可以改下源码,比如在DDL里增加一个parallelism参数,当框架拿到这个参数时,调用sourceStream.setParallelism()方法设置,强制为我们指定的数值。

   Hive Table Sink的并发度是和inputstream的并发数保持一致的,现在还不能实现自定义。

文章转载自data之道,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论