apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.20.0。与往常一样,这是一个充实的版本,包含了广泛的改进和新功能。总共有 142 人为此版本做出了贡献,完成了 13 个 FLIPs、解决了 300 多个问题。感谢各位贡献者的支持!
站在 Flink 2.0 的前夜
Apache Flink 1.0 发布至今已经 8 年了。因此,最近几个月以来,社区一直在积极朝着下一个大版本(Flink 2.0)迈进。最新发布的 Flink 1.20 版本将会是 Flink 2.0 (预计 2024 年底发布) 之前的最后一个小版本。
从 Flink 1.19 开始,社区决定正式开始废弃过时的公共 API。在 1.20 中,我们进一步整理了所有可能需要被替换或弃用的API,为 2.0 版本铺平道路:
为了提升易用性和可维护性,我们重新审视了所有运行时、Table、SQL 以及状态和检查点相关的配置项,对它们进行了归类,增强和废弃。
废弃过时的 SinkFunction 接口: Flink 1.12 引入了 Unified Sink V2,经过了多个版本的开发和迭代后, 它已经变得比较稳定和完善。根据社区在 FLIP-197 中提出的关于 API 演进的要求,我们把 Unified Sink V2 提升为了公共接口,并且废弃了 SinkFunction 接口。
提升数据加工链路开发体验:FLIP-435 引入了物化表功能,允许用户在动态表中通过统一的 SQL 语句来定义数据的流式/批式转换逻辑,从而加速 ETL 管道开发,并自动管理任务调度。完整内容和更多细节请参考 FLIP-435。
统一的检查点文件合并机制:Flink 1.20 中引入了统一的检查点文件合并机制,允许将零散的小的检查点文件合并到大文件中,减少文件创建和文件删除的次数,缓解大量小文件对文件系统元数据管理带来的压力。完整内容和更多细节请参考 FLIP-306。
更多信息:
SinkFunction:https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.html Unified Sink V2:https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/Sink.html FLIP-197:https://cwiki.apache.org/confluence/x/J5eqCw
FLIP-435:https://cwiki.apache.org/confluence/x/HYySEQ
FLIP-306:https://cwiki.apache.org/confluence/x/DwsNDw
1. Flink SQL 提升
1.1 引入物化表
Flink 1.20 版本 为 Flink SQL 引入了物化表(Materialized Table)抽象。这是一种新的表类型,旨在同时简化流和批处理的数据加工链路,同时提供一致的开发体验。
通过定义查询语句和数据新鲜度,引擎会自动推导出表结构并创建对应的数据加工链路,以保证查询结果满足所要求的数据新鲜度。用户无需理解流处理和批处理之间的概念和差异,也不必直接维护 Flink 流处理或批作业,所有操作都在物化表上完成,这可以显著加快 ETL 数据加工链路的开发速度。
下面是创建一个具备自动刷新能力的物化表的示例,数据新鲜度为 3 分钟。
-- 1. 创建物化表并定义新鲜度CREATE MATERIALIZED TABLE dwd_orders(PRIMARY KEY(ds, id) NOT ENFORCED)PARTITIONED BY (ds)FRESHNESS = INTERVAL '3' MINUTEAS SELECTo.dso.id,o.order_number,o.user_id,...FROMorders as oLEFT JOIN products FOR SYSTEM_TIME AS OF proctime() AS prodON o.product_id = prod.idLEFT JOIN order_pay AS payON o.id = pay.order_id and o.ds = pay.ds;-- 2. 暂停数据刷新ALTER MATERIALIZED TABLE dwd_orders SUSPEND;-- 3. 恢复数据刷新ALTER MATERIALIZED TABLE dwd_orders RESUME-- Set table option via WITH clauseWITH('sink.parallesim' = '10');-- 手动刷写历史数据ALTER MATERIALIZED TABLE dwd_orders REFRESH PARTITION(ds='20231023');
更多信息:
物化表文档:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/materialized-table/overview/
FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines:https://cwiki.apache.org/confluence/x/HYySEQ
1.2 完善 Catalog 相关语法
随着 Flink SQL 的广泛采用,Flink Catalog 发挥着越来越重要的作用。Flink 内置了 JDBC 和 Hive Catalog 实现,而其他开源项目(如 Apache Paimon)也实现了自己的 Catalog。
在 Flink 1.20 中,您可以使用 DQL 语法从现有 Catalog 中获取详细的元数据信息,并使用 DDL语法修改指定Catalog 的属性或注释等元数据。
Flink SQL> CREATE CATALOG `cat` WITH ('type'='generic_in_memory', 'default-database'='db');[INFO] Execute statement succeeded.Flink SQL> SHOW CREATE CATALOG `cat`;+---------------------------------------------------------------------------------------------+| result |+---------------------------------------------------------------------------------------------+| CREATE CATALOG `cat` WITH ('default-database' = 'db','type' = 'generic_in_memory')|+---------------------------------------------------------------------------------------------+1 row in setFlink SQL> DESCRIBE CATALOG `cat`;+-----------+-------------------+| info name | info value |+-----------+-------------------+| name | cat || type | generic_in_memory || comment | |+-----------+-------------------+3 rows in setFlink SQL> ALTER CATALOG `cat` SET ('default-database'='new-db');[INFO] Execute statement succeeded.Flink SQL> SHOW CREATE CATALOG `cat`;+-------------------------------------------------------------------------------------------------+| result |+-------------------------------------------------------------------------------------------------+| CREATE CATALOG `cat` WITH ('default-database' = 'new-db','type' = 'generic_in_memory')|+-------------------------------------------------------------------------------------------------+1 row in set
更多信息:
FLIP-436: Introduce Catalog-related Syntax:https://cwiki.apache.org/confluence/x/xAmpEQ
1.3 DDL 支持 DISTRIBUTED BY 语句
鉴于越来越多的 SQL 引擎对外暴露了 “分区”、“分桶”或“聚类”的概念,Flink 1.20 将“分桶”的概念引入了 Flink SQL。分桶操作通过将数据拆分为不相交的子集来实现数据在外部存储系统中的负载均衡。虽然它在很大程度上取决于底层连接器的语义,但是用户可以通过指定分桶数量、算法以及用于目标分桶计算的列(如果算法允许)来影响分桶的行为。所有分桶相关的关键字在 SQL 语法中都是可选的。
Apache Paimon 的分桶表和 Apache Kafka 的 topic 分区都将对接到该语法上,简化用户的建表操作,并让 Flink SQL 感知了外部数据的物理分布,为未来支持 bucket join 等优化打好了基础。
以下面的 SQL 语句为例:
-- 指定桶的个数和数据分配逻辑(按照 uid 列的哈希值进行分配)CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 4 BUCKETS;-- 不显示指定分桶算法,数据分配逻辑由 Connector 自己决定。CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid) INTO 4 BUCKETS;-- 不显式指定桶的数量,桶数量和数据分配逻辑均由 Connector 自己决定。CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid);-- 仅指定桶的数量CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 4 BUCKETS;
更多信息
FLIP-376: Add DISTRIBUTED BY clause:https://cwiki.apache.org/confluence/x/loxEE
文档:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/create/#distributed
2. 状态 & 检查点提升
2.1 统一的检查点文件合并机制
Flink 1.20 引入了统一的检查点文件合并机制,它将多个小的检查点文件合并为数量较少的大文件,从而减少了文件创建和文件删除操作的次数,并减轻了检查点期间文件系统元数据管理的压力。
可以通过将 execution.checkpointing.file-merging.enabled 设置为 true来启用该功能。有关更多高级选项以及此功能背后的原理,请参阅文档。
更多信息
FLIP-306: Unified File Merging Mechanism for Checkpoints:https://cwiki.apache.org/confluence/x/DwsNDw
文档:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/#unify-file-merging-mechanism-for-checkpoints-experimental
2.2 压缩小的 SST 文件
在某些情况下,RocksDB 状态后端生成的文件数量会无限制地增长。除了许多小文件造成的开销之外,此行为还可能导致任务状态信息超出 RPC 消息大小限制,从而导致检查点失败。从 1.20 版开始,Flink 可以使用 RocksDB API 在后台合并此类文件。
更多信息
FLINK-2605:https://issues.apache.org/jira/browse/FLINK-26050
3. 批处理能力提升
3.1 JobMaster 发生故障时更好的错误恢复机制
在 Flink 1.20 中,我们支持了一种新的批处理作业恢复机制,使批处理作业能够在 JobMaster故障转移后尽可能多地恢复进度,避免重新运行已经完成的任务。
更多信息
FLIP-383: Support Job Recovery from JobMaster Failures for Batch Jobs:https://cwiki.apache.org/confluence/x/QwqZE
文档:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/ops/batch/recovery_from_job_master_failure/
3.2 HiveSource 支持动态并发推断
在 Flink 1.20 中,我们为 Hive 数据源连接器增加了对动态并发推断的支持,这允许它基于动态分区修剪(DPP)的结果动态决定并行度。
此外,我们引入了一个新的配置选项 table.exec.hive.infer-source-parallelism.mode,使用户能够在数据源并行度的静态和动态推断模式之间进行切换。需要注意的是,在 Flink 1.20 中,以前的配置选项 table.exec.hive.infer-source-parallelism已被标记为弃用。
更多信息
FLIP-445: Support dynamic parallelism inference for HiveSource:https://cwiki.apache.org/confluence/x/Hgr9EQ
4. DataStream API 提升
DataSetAPI 已正式弃用,并将在 Flink 2.0 版本中被删除。我们建议 Flink 用户根据数据处理需求将作业从 DataSet API 逐步迁移到 DataStream API、TableAPI 和 SQL。
4.1 支持 DataStream API 上的全量分区数据处理
在 Flink 1.20 之前,DataStream API 不支持对非分区流上的数据做全量的数据聚合操作,这阻碍了用户从 DataSetAPI 的迁移。作为一种替代方案,用户可以将子任务的编号关联到数据上,并以此为数据键来构建分区流,但这会产生很大的额外开销。为此,Flink 1.20 引入了 FullPartitionWindow API,从而补齐了对全量分区数据处理的内置支持。
假设我们想要计算每个分区中的总记录数并输出到下游,可以按如下方式完成:
inputStream.fullWindowPartition().mapPartition(new MapPartitionFunction<Record, Long>() {@Overridepublic void mapPartition(Iterable<Record> values, Collector<Long> out)throws Exception {long counter = 0;for (Record value : values) {counter++;}out.collect(counter));}})
更多信息
FLIP-380: Support Full Partition Processing On Non-keyed DataStream:https://cwiki.apache.org/confluence/x/0gt1E
5. 重要配置项变更
随着 Apache Flink 即将来到 2.0 版本,一大批配置项在 Flink 1.20 版本被更改或弃用,以提高易用性和可维护性。
5.1 更新配置项为合适的类型
一系列与时间相关的配置项(例如
client.heartbeat.interval)的类型被更新为了Duration。完整列表可在 FLINK-35359 中找到。配置项
taskmanager.network.compression.codec和table.optimizer.agg-phase-strategy的类型被更新为了Enum。配置项
yarn.application-attempts的类型被更新为了Int。
更多信息
FLINK-35359:https://issues.apache.org/jira/browse/FLINK-35359
5.2 弃用多个配置项
在 Flink 1.20 中社区决定正式弃用多个即将在 Flink 2.0 停用的配置项:
由于我们正在逐步淘汰基于哈希的 Blocking Shuffle,以下配置项已被弃用并将在 Flink 2.0 中被删除:
taskmanager.network.sort-shuffle.min-parallelismtaskmanager.network.blocking-shuffle.type由于我们正在逐步淘汰旧的Hybrid Shuffle 模式,以下配置项已被弃用并将在 Flink 2.0 中被删除:
taskmanager.network.hybrid-shuffle.spill-index-region-group-sizetaskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-maxtaskmanager.network.hybrid-shuffle.enable-new-mode为了简化网络缓冲区相关配置,以下配置选项已被弃用并将在 Flink 2.0 中被删除:
taskmanager.network.memory.buffers-per-channeltaskmanager.network.memory.floating-buffers-per-gatetaskmanager.network.memory.max-buffers-per-channeltaskmanager.network.memory.max-overdraft-buffers-per-gatetaskmanager.network.memory.exclusive-buffers-request-timeout-ms(请使用taskmanager.network.memory.buffers-request-timeout代替)由于绝大多数批作业都会开启压缩,配置项
taskmanager.network.batch-shuffle.compression.enabled已被弃用并将在 Flink 2.0 中被删除。如确有需要,请将taskmanager.network.compression.codec设置为NONE以禁用压缩。以下与 Netty 相关的配置项过于底层,已在 Flink 1.20 被弃用,我们将在 Flink 2.0 中将其移除:
taskmanager.network.netty.num-arenastaskmanager.network.netty.server.numThreadstaskmanager.network.netty.client.numThreadstaskmanager.network.netty.server.backlogtaskmanager.network.netty.sendReceiveBufferSizetaskmanager.network.netty.transport以下配置项是不必要的,已在 Flink 1.20 被弃用并且将在 Flink 2.0 中被删除:
taskmanager.network.max-num-tcp-connections(将在 Flink 2.0 中被硬编码为1)fine-grained.shuffle-mode.all-blocking以下配置项用于微调 TPC 测试但当前 Flink 已不再需要,已被弃用并且将在 Flink 2.0 中被删除:
table.exec.range-sort.enabledtable.optimizer.rows-per-local-aggtable.optimizer.join.null-filter-thresholdtable.optimizer.semi-anti-join.build-distinct.ndv-ratiotable.optimizer.shuffle-by-partial-key-enabledtable.optimizer.smj.remove-sort-enabledtable.optimizer.cnf-nodes-limit以下配置项是为现已过时的
FilterableTableSource接口引入的,已被弃用并且将在 Flink 2.0 中被删除:table.optimizer.source.aggregate-pushdown-enabledtable.optimizer.source.predicate-pushdown-enabled配置选项
sql-client.display.max-column-width已被弃用并且将在 Flink 2.0 中被删除。请改用table.display.max-column-width替代。
更多信息
Runtime 相关配置项变更:https://issues.apache.org/jira/browse/FLINK-35461
Table/SQL 相关配置项变更:https://issues.apache.org/jira/browse/FLINK-35473
6. 配置项的其他变更
6.1 重新组织配置项
在 Flink 1.20 中,所有关于状态和检查点的配置项都被重新组织并按前缀分类:
execution.checkpointing.*:所有与检查点和保存点相关的配置选项。execution.state-recovery.*:所有与状态恢复相关的配置选项。state.*:所有与状态访问相关的配置选项。state.backend.*: 各个状态后端的配置选项,例如 RocksDB 状态后端。state.changelog.*:与状态变更日志相关的配置选项。state.latency-track.*:与状态访问的延迟追踪相关的配置选项。
6.2 新的公开配置项
以下与动态哈希聚合相关配置项已从
org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenerator移动至org.apache.flink.table.api.config并提升为@PublicEvolvingAPI:table.exec.local-hash-agg.adaptive.enabledtable.exec.local-hash-agg.adaptive.sampling-thresholdtable.exec.local-hash-agg.adaptive.distinct-value-rate-threshold以下与 LookupJoin 相关的配置项已从
org.apache.flink.table.planner.hint.LookupJoinHintOptions移动至org.apache.flink.table.api.config.LookupJoinHintOptions并提升为@PublicEvolvingAPI:tableasyncoutput-modecapacitytimeoutretry-predicateretry-strategyfixed-delaymax-attempts以下与优化器有关的配置项已从
org.apache.flink.table.planner.plan.optimize.RelNodeBlock移动至org.apache.flink.table.api.config.OptimizerConfigOptions并升级为@PublicEvolvingAPI:table.optimizer.union-all-as-breakpoint-enabledtable.optimizer.reuse-optimize-block-with-digest-enabledtable.optimizer.incremental-agg-enabled已从org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule移动至org.apache.flink.table.api.config.OptimizerConfigOptions并升级为@PublicEvolvingAPI.
更多信息
Runtime 相关配置项变更:https://issues.apache.org/jira/browse/FLINK-35461
Table/SQL 相关配置项变更:https://issues.apache.org/jira/browse/FLINK-35473
Checkpointing Options:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#checkpointing
Recovery Options:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#recovery
State Backend Options:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#state-backends
State Changelog Options:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#state-changelog-options
Latency-track Options:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#state-latency-tracking-options
升级说明
Apache Flink 社区努力确保升级过程尽可能平稳, 但是升级到 1.20 版本可能需要用户对现有应用程序做出一些调整。请参考 Release Notes 获取更多的升级时需要的改动与可能的问题列表细节。
Release Notes:https://nightlies.apache.org/flink/flink-docs-release-1.20/release-notes/flink-1.20/




