一直觉得 Flink Sql 需要指定算子并行度的功能,哪怕是基于 SQL 解析出来的算子不能添加并行度,source、sink、join 的算子也应该有修改并行度的功能。
恰好看到大佬的博客,Kafka 是最常用的数据源组件了,所以决定在 sqlSubmit 中也加入相应的实现。
Streaming Api 设置并行度
基于 Flink Streaming api,要给 Kafka Source 指定并行度,只需要在 env.addSource() 后面调用 setParallelism() 方法指定并行度就可以,如下:
val kafkaSource = new FlinkKafkaConsumer[ObjectNode](topic, new JsonNodeDeserializationSchema(), Common.getProp)
val stream = env.addSource(kafkaSource)
.setParallelism(12)
Sql Api 设置并行度
先看一个读kafka 的 SQL
-- kafka source
CREATE TABLE user_log (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3)
,process_time as proctime()
, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_log'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
);
CREATE TABLE user_log_sink (
`day` string
,num bigint
,min_user_id bigint
,max_user_id bigint
) WITH (
'connector' = 'print'
);
insert into user_log_sink
select `day`
, num
, min_user_id, max_user_id
from(
select DATE_FORMAT(ts,'yyyyMMdd') `day`
,count(distinct user_id) num
,min(cast(replace(user_id,'xxxxxxxxxxxxx','') as bigint)) min_user_id
,max(cast(replace(user_id,'xxxxxxxxxxxxx','') as bigint)) max_user_id
from user_log
-- where DATE_FORMAT(ts,'yyyyMMdd') = date_format(current_timestamp, 'yyyyMMdd')
group by DATE_FORMAT(ts,'yyyyMMdd')
)t1
where num % 2 = 0
;
流图如下:
仔细看任务流图,所有的算子的并行度都是参数: table.exec.resource.default-parallelism 指定的
要修改 Source 的并行度,其他算子的并行度保持不变,从 Streaming Api 看,只需要给 sql 翻译后的 StreamSource 指定并行度,就可以做到我们想要的结果。
那就直接找到 flink sql 源码 kafka source 创建的地方: KafkaDynamicSource.getScanRuntimeProvider 方法
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final DeserializationSchema<RowData> keyDeserialization =
createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
final DeserializationSchema<RowData> valueDeserialization =
createDeserialization(context, valueDecodingFormat, valueProjection, null);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
final KafkaSource<RowData> kafkaSource =
createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
if (watermarkStrategy == null) {
watermarkStrategy = WatermarkStrategy.noWatermarks();
}
// 创建 DataStreamSource
return execEnv.fromSource(
kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
}
@Override
public boolean isBounded() {
return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
}
};
}
从源码可以看到创建了 KafkaSource ,并且 调用了 execEnv.fromSource 方法,按照 Streaming api 的思路,直接在 execEnv.fromSource 后面添加 setParallelism 就好了,改好的代码如下:
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final DeserializationSchema<RowData> keyDeserialization =
createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
final DeserializationSchema<RowData> valueDeserialization =
createDeserialization(context, valueDecodingFormat, valueProjection, null);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
final KafkaSource<RowData> kafkaSource =
createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
if (watermarkStrategy == null) {
watermarkStrategy = WatermarkStrategy.noWatermarks();
}
DataStreamSource<RowData> dataDataStreamSource = execEnv.fromSource(
kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
int defaultParallelism = execEnv.getParallelism();
// add by venn for custom source parallelism
// 很多任务不需要设置并行度,所以加了个判空条件
// 如果设置的并行度等于 全局的并行度也不做处理「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




