暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink sql kafka source 自定义并行度

原创 guassme 2022-06-16
2944

一直觉得 Flink Sql 需要指定算子并行度的功能,哪怕是基于 SQL 解析出来的算子不能添加并行度,source、sink、join 的算子也应该有修改并行度的功能。

恰好看到大佬的博客,Kafka 是最常用的数据源组件了,所以决定在 sqlSubmit 中也加入相应的实现。

Streaming Api 设置并行度

基于 Flink Streaming api,要给 Kafka Source 指定并行度,只需要在 env.addSource() 后面调用 setParallelism() 方法指定并行度就可以,如下:


val kafkaSource = new FlinkKafkaConsumer[ObjectNode](topic, new JsonNodeDeserializationSchema(), Common.getProp)
val stream = env.addSource(kafkaSource)
        .setParallelism(12)

Sql Api 设置并行度

先看一个读kafka 的 SQL

-- kafka source
CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_log'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'latest-offset'
  ,'format' = 'json'
);

CREATE TABLE user_log_sink (
   `day` string
   ,num bigint
   ,min_user_id bigint
   ,max_user_id bigint
) WITH (
   'connector' = 'print'
);

insert into user_log_sink
select `day`
, num
, min_user_id, max_user_id
from(
select DATE_FORMAT(ts,'yyyyMMdd') `day`
,count(distinct user_id) num
,min(cast(replace(user_id,'xxxxxxxxxxxxx','') as bigint)) min_user_id
,max(cast(replace(user_id,'xxxxxxxxxxxxx','') as bigint)) max_user_id
from user_log
-- where DATE_FORMAT(ts,'yyyyMMdd') = date_format(current_timestamp, 'yyyyMMdd')
group by DATE_FORMAT(ts,'yyyyMMdd')
)t1
 where num % 2 = 0
;

流图如下:

仔细看任务流图,所有的算子的并行度都是参数: table.exec.resource.default-parallelism 指定的

要修改 Source 的并行度,其他算子的并行度保持不变,从 Streaming Api 看,只需要给 sql 翻译后的 StreamSource 指定并行度,就可以做到我们想要的结果。

那就直接找到 flink sql 源码 kafka source 创建的地方: KafkaDynamicSource.getScanRuntimeProvider 方法


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
    final DeserializationSchema<RowData> keyDeserialization =
            createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);

    final DeserializationSchema<RowData> valueDeserialization =
            createDeserialization(context, valueDecodingFormat, valueProjection, null);

    final TypeInformation<RowData> producedTypeInfo =
            context.createTypeInformation(producedDataType);

    final KafkaSource<RowData> kafkaSource =
            createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);

    return new DataStreamScanProvider() {
        @Override
        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
            if (watermarkStrategy == null) {
                watermarkStrategy = WatermarkStrategy.noWatermarks();
            }
            // 创建 DataStreamSource
            return execEnv.fromSource(
                    kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
        }

        @Override
        public boolean isBounded() {
            return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
        }
    };
}

从源码可以看到创建了 KafkaSource ,并且 调用了 execEnv.fromSource 方法,按照 Streaming api 的思路,直接在 execEnv.fromSource 后面添加 setParallelism 就好了,改好的代码如下:


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
    final DeserializationSchema<RowData> keyDeserialization =
            createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);

    final DeserializationSchema<RowData> valueDeserialization =
            createDeserialization(context, valueDecodingFormat, valueProjection, null);

    final TypeInformation<RowData> producedTypeInfo =
            context.createTypeInformation(producedDataType);

    final KafkaSource<RowData> kafkaSource =
            createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);

    return new DataStreamScanProvider() {
        @Override
        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
            if (watermarkStrategy == null) {
                watermarkStrategy = WatermarkStrategy.noWatermarks();
            }

            DataStreamSource<RowData> dataDataStreamSource = execEnv.fromSource(
                    kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);
            int defaultParallelism = execEnv.getParallelism();

            // add by venn for custom source parallelism
            // 很多任务不需要设置并行度,所以加了个判空条件
            // 如果设置的并行度等于 全局的并行度也不做处理
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论