暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RisingWave 2.1 发布!优化订阅、支持 ASOF joins、共享 Kafka Source

我们非常高兴地宣布,RisingWave 2.1 版本已正式发布!此版本包含了许多重要更新,例如:完善了订阅功能、支持 Kafka source 的无模式数据摄取,支持 ASOF
joins 等。此外,从 2.1 版本开始,RisingWave 不再支持 etcd 元存储,建议使用 SQL 元存储,详细迁移步骤请参考迁移指南[1]

本文是对 RisingWave 2.1 版本重点更新内容的介绍,如果您对完整更新信息感兴趣,请查看更新日志[2]

1优化订阅和游标

自 1.9 版本推出订阅功能以来,我们持续优化着订阅及订阅游标,以提升其易用性。在 2.1 版本中,我们做了以下更新。

增强可观测性

更新后的 SHOW CURSORS
SHOW SUBSCRIPTION CURSOR
命令,可以从同一前端节点中所有活跃的 RisingWave 会话里输出所有游标信息。此外,命令返回的结果包括了每个游标和订阅游标的详细描述。此前仅列出了对象名称,现在还会输出游标的用户、主机和数据库信息。

为游标设置 timeout

自 2.1 版本起,支持通过指定 timeout
参数来配置游标为阻塞 (Blocking data fetch)或非阻塞模式 (Non-blocking data fetch)。

此前所有游标为非阻塞模式:即使游标没有返回新内容,也仍然可以从游标中获取数据,此时游标会返回空行。

此次更新后,对于阻塞游标 (Blocking cursor),游标会等待直到接收更新或达到 timeout
 数值。这种设计无需重复从游标中获取数据,从而节省资源消耗,对于事件驱动架构来说更加高效。如果从游标中获取数据花费的时间过长,比如订阅包含过多更新,游标将返回当前已获取的所有值。

以下 SQL 查询从阻塞游标中获取数据。超时值应以间隔格式的字符串指定。在此例中,timeout 为 20 秒。

FETCH 50 FROM cursor_name WITH (timeout = '20 second');


更多细节,请查看:

  • Blocking data fetch[3]
  • SHOW CURSORS
    [4]
  • SHOW SUBSCRIPTION CURSORS
    [5]

2ASOF
 joins

在实际数据中,时间戳值很少完美匹配,因此合并数据集可能会很困难。RisingWave 现已支持 ASOF INNER JOIN
 ASOF LEFT OUTER JOIN
,允许在时间戳值不完全匹配的情况下,基于时间戳的值进行连接。

在 RisingWave 中使用 ASOF
Join 时,需要满足两个条件。一个条件必须是非等式条件(通常是时间戳列),另一个条件必须是等式条件。此外,ASOF
Join 目前仅适用于流处理,支持 Ad-hoc 查询的功能敬请期待。

例如,以下查询基于两个条件对 t1
t2
执行 ASOF LEFT JOIN

  • t1.col_id = t2.col_id
    :一个等式条件,确保来自两个表的行具有相同的 col_id
  • t1.timestamp <= t2.timestamp
    :一个非等式条件,匹配 t2.timestamp
    接近且大于等于 t1.timestamp
    的行。
SELECT
    t1.col_id,
    t1.timestamp,
    t2.col2
FROM
    t1
ASOF LEFT JOIN
    t2
ON
     t1.col_id = t2.col_id AND
     t1.timestamp <= t2.timestamp;


更多细节,请查看:

  • ASOF joins[6]

3共享 Kafka source

此次更新引入了“Shared source” (共享 Source)功能,当前仅适用于 Kafka source,并且默认启用。此功能仅影响版本更新后创建的 Kafka source,不会影响已有的 Kafka source。若要禁用此功能,可以将会话变量 streaming_use_shared_source
设置为 false

RisingWave 中的 Sources 不像物化视图和 Sink 那样被视为流式作业,仅当物化视图引用 Source 时,才会创建 SourceExecutor
,并启动数据摄取过程。

此前每当另一个流作业引用 Source 时,都会创建一个 SourceExecutor
。但使用共享 Source 时,创建 Source 时也会创建一个 SourceExecutor
。任何引用该 Source的物化视图都可以重用这个 SourceExecutor
。现在,每个 Source 只有一个 SourceExecutor
,提高了资源利用率和多个流式作业之间的一致性。

更多细节,请查看:

  • 共享 Source[7]

4模式数据摄取

在 RisingWave 中创建 Source 或 Table 时,除非从 Web 位置或 Registry 读取模式,都必须指定数据模式。由于数据模式可能很大且复杂,这往往会浪费时间进行检查。现在,RisingWave 支持 JSON 格式的无模式数据摄取。

目前支持从以下 Sources 摄取 JSON 格式的数据:

  • Apache Kafka
  • Apache Pulsar
  • AWS Kinesis
  • AWS S3
  • Azure Blob

在创建 Source 或 Table 时,添加 INCLUDE payload 子句后就无需指定数据模式。例如,kafka_table
的列将包括来自 Kafka topic schemaless_ingestion
的所有数据。

CREATE TABLE kafka_table (c1 int)
INCLUDE payload
WITH (
    connector = 'kafka',
    topic = 'schemaless_ingestion',
    properties.bootstrap.server = 'kafka:9092'
FORMAT PLAIN ENCODE JSON;


该语句简化了从 Source 摄取数据的过程。如有需要,可以在之后清理表以去除不必要的列。

详情请参见:

  • 从 Kafka 摄取数据[8]
  • 从 Pulsar 摄取数据[9]
  • 从 AWS Kinesis 摄取数据[10]
  • 从 S3 buckets 摄取数据[11]
  • 从 Azure Blob 摄取数据[12]

5PostgreSQL 表值函数

此次更新引入了表值函数(TVF)postgres_query
,允许从 PostgreSQL table 中检索数据。

与通过 CDC source 更新相比,这是一种更轻量级的替代方案。对于仅包含静态数据且更新不频繁的 PostgreSQL 数据库,运行此函数比创建 CDC source 和 table 更节省计算资源。

例如,假设在 Postgres 中有 users
表:

first_name | last_name | age | city
-----------+-----------+-----+------
Aaron      | Jones     | 24  | NYC
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC


使用 postgres_query
函数可以从表中查询数据,还可以使用 CREATE TABLE
函数将查询结果创建为新表。

SELECT * FROM postgres_query(
 'localhost',
 '5432',
 'postgres',
 'postgres',
 'mydb',
 'SELECT * FROM users WHERE age >= 25;'
);
---
first_name | last_name | age | city
-----------+-----------+-----+------
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC


更多细节,请查看:

  • 从 PostgreSQL 表摄取数据[13]

6PostgreSQL source 连接器更新

分区表

RisingWave 支持从分区的 PostgreSQL 表中摄取数据,但需要一些额外的设置。表分区是根据一个或多个列将大表拆分成更小的部分。例如,表可以根据日期列进行分区,每个分区包含某个时间范围的数据。

首先,确保在 PostgreSQL 数据库中创建发布时设置 publish_via_partition_root
true
,以便数据更改将作为来自根表的数据发布,而非来自单独的分区。

CREATE PUBLICATION publication_name
FOR table_name WITH
 (publish_via_partition_root = true);


接着在 RisingWave 中必须为每个分区创建具有专用发布名称的 Sources,以摄取来自各分区的数据。

Schema 自动更改

RisingWave 现支持自动更改 Postgres CDC source 和 table 的 Schema ,让管理数据更加便捷。如果 PostgreSQL table 的 Schema 发生变化(例如添加列或删除列),RisingWave 中的 CDC table 也会自动更新以反映这种变化。

该功能为 Premium 版功能。了解更多关于 RisingWave Premium 的信息,请参阅 RisingWave Premium|常见问题解答

要启用此功能,在创建 PostgreSQL source 时将参数 auto.schema.change
设置为 true

更多细节,请查看:

  • 从 PostgreSQL CDC 摄取数据[14]

7WebHDFS sink

WebHDFS 是一种 RESTful API,能够与 Hadoop 分布式文件系统(HDFS)进行交互。通过 WebHDFS 可以访问 HDFS 并进行文件的读取、写入和删除,而无需依赖 Hadoop 集群。

使用 CREATE SINK
命令可以将数据直接从 RisingWave 接收并写入 WebHDFS 或间接写入 HDFS。

CREATE SINK webhdfs_sink AS
SELECT * FROM mv
WITH (
    connector='webhdfs',
    webhdfs.path = 'test/path',
    webhdfs.endpoint = 'hdfs_endpoint',
    type = 'append-only',
);


更多细节,请查看:

  • 将数据导出至 WebHDFS[15]

8总结

以上只是 RisingWave 2.1 版本新增的部分功能,如果您想了解本次更新的完整列表,请查看更详细的发布说明[16]如果您想提前了解下个月的版本及其新功能,请访问 RisingWave GitHub repository[17]如果您想了解 RisingWave 的所有动态,请在官网[18]订阅我们的邮件月刊。

参考资料
[1]

迁移指南: https://docs.risingwave.com/deploy/migrate-to-sql-backend

[2]

更新日志: https://docs.risingwave.com/changelog/release-notes

[3]

Blocking data fetch: https://docs.risingwave.com/delivery/subscription#blocking-data-fetch

[4]

SHOW CURSORS
: https://docs.risingwave.com/sql/commands/sql-show-cursors#show-cursors

[5]

SHOW SUBSCRIPTION CURSORS
: https://docs.risingwave.com/sql/commands/sql-show-subscription-cursors#show-subscription-cursors

[6]

ASOF joins: https://docs.risingwave.com/processing/sql/joins#asof-joins

[7]

共享 Source: https://docs.risingwave.com/sql/commands/sql-create-source#shared-source

[8]

从 Kafka 摄取数据: https://docs.risingwave.com/integrations/sources/kafka#examples

[9]

从 Pulsar 摄取数据: https://docs.risingwave.com/integrations/sources/pulsar#example

[10]

从 AWS Kinesis 摄取数据: https://docs.risingwave.com/integrations/sources/kinesis#example

[11]

从 S3 buckets 摄取数据: https://docs.risingwave.com/integrations/sources/s3#examples

[12]

从 Azure Blob 摄取数据: https://docs.risingwave.com/integrations/sources/azure-blob#examples

[13]

从 PostgreSQL 表摄取数据: https://docs.risingwave.com/integrations/sources/postgresql-table

[14]

从 PostgreSQL CDC 摄取数据: https://docs.risingwave.com/integrations/sources/postgresql-cdc

[15]

将数据导出至 WebHDFS: https://docs.risingwave.com/integrations/destinations/webhdfs

[16]

发布说明: https://docs.risingwave.com/release-notes/

[17]

RisingWave GitHub repository: https://github.com/risingwavelabs/risingwave

[18]

官网: https://www.risingwave.com/

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

文章转载自RisingWave中文开源社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论