s3连接器,转为指定 AWS S3 source 连接器为
s3_v2等。此外,本版本还新增了许多实用的 SQL 命令和函数,例如创建订阅功能等。一起来了解本次更新的主要亮点吧!
1Recover 命令
RECOVER命令。该命令可以触发临时性的恢复,当存在较高延迟时可能需要使用。需要注意的是,只有超级用户才能使用
RECOVER命令,以限制可以触发恢复的用户范围。此外,RisingWave 仅能从已提交的时间点进行恢复,且该命令不会等待恢复完成。
DROP或
CANCEL命令都将立即生效。
RECOVER
命令[1]
2配置全局参数
ALTER SYSTEM命令在整个服务器上设置运行时参数或系统参数。使用此命令后,新参数值也将成为每个新会话的默认值。如果您喜欢使用不同的参数运行每个 RisingWave 会话,用此命令可以更轻松地设置每个会话。
rw_enable_join_ordering设置为
true。
ALTER SYSTEM SET rw_enable_join_ordering TO true;
SHOW ALL命令查看所有运行时参数,如果想查看当前运行时某个特定运行时参数的值,可使用
SHOW parameter_name命令。如果想查看所有系统参数及其当前值,则可使用
SHOW PARAMETERS命令。
ALTER SYSTEM 命令[2] 查看和配置运行时参数[3] 查看和配置系统参数[4]
3增强时间连接
stream_source是一个非仅追加的源,而
prod是一个表,则以下 SQL 命令使用了时间连接来连接两个表。
SELECT *
FROM stream_source
JOIN prod FOR SYSTEM_TIME AS OF PROC_TIME()
ON source_id = prod_id;
非仅追加模式的时间连接[5]
4支持订阅和订阅游标
CREATE SUBSCRIPTION命令创建订阅。您还可以删除、修改和显示现有的订阅。创建订阅时可以选择增量数据的保留时间以及订阅可以访问的时间。以下是 SQL 查询在物化视图上创建订阅,并将数据保留一天的示例。
CREATE SUBSCRIPTION sub1 FROM tbl1 WITH (retention = '1D' );
DECLARE cursor1 SUBSCRIPTION CURSOR FOR sub1;
FETCH NEXT FROM cursor1;
----结果
col1 | col2 | col3 | op | rw_timestamp
----+----+----+----+-------------------
1 | 2 | 3 | 1 |
(1 行)
col1、
col2和
col3是订阅的表中的列,
op则是生成的特殊列。
op的值可以是
1、
2、
3或
4,分别对应于
INSERT、
DELETE、
UPDATE_DELETE和
UPDATE_INSERT。
UPDATE语句在这里被转换为
UPDATE_DELETE和
UPDATE_INSERT。请注意,每次从游标中提取时,只会返回表中的一行数据。要查看其他行的数据,必须再次调用
FETCH命令。
订阅[6]
5使用 Iceberg source 进行 Time travel
AS OF语法从特定时间段或版本中选择数据。RisingWave 目前仅支持从 Iceberg source 进行批量查询,拥有此功能后,随着时间推移跟踪数据集的变化会更容易。同时,从特定版本选择数据则使得数据可重现性增强,这对于调试非常重要:如果在数据中检测到错误,您可以轻松选择之前版本的数据。
SELECT * FROM iceberg_source FOR system_version AS OF 1567123456789;
SELECT * FROM iceberg_source FOR system_time AS OF '2006-09-21 00:00:00+00:00';
从 Apache Iceberg 中摄取数据[7]
6PostgreSQL 和 MySQL 连接器的新配置
配置 SSL/TLS
ssl.mode参数设置 SSL/TLS 加密级别。配置 SSL/TLS 可确保传输的数据的完整性和机密性,保护敏感信息免受威胁和攻击。许多监管标准和最佳实践也使用 SSL/TLS,这使得它成为许多用户关心的重要功能。
ssl.mode参数接受
disabled、
preferred和
required这三个值,默认值为
disabled。
CREATE SOURCE pg_source WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'user',
password = 'password',
database.name = 'mydb',
slot.name = 'mydb_slot',
ssl.mode = 'required'
);
配置快照
除了为 CDC 表配置快照之外,您现在还可以配置屏障间隔和批次大小。在与 MySQL 或 PostgreSQL 数据库建立连接时,您可以创建 CDC source 或 CDC 表。CDC source 连接到整个数据库,而 CDC 表连接到单个 MySQL 或 PostgreSQL 表。快照相关的参数选项仅适用于创建表时。
snapshot.interval可以设置开始快照读取时的屏障间隔;
snapshot.batch_size可以配置快照读取的批次大小。
配置超时
cdc_source_wait_streaming_start_timeout来增加超时阈值。
SET cdc_source_wait_streaming_start_timeout to 90;
从 MySQL CDC 中摄取数据[8] 从 PostgreSQL CDC 中摄取数据[9]
7支持更多 Sink 连接器
支持 Snowflake sink 连接器
CREATE SINKSQL 语句即可完成。
CREATE SINK snowflake_sink FROM mv WITH (
connector = 'snowflake',
type = 'append-only',
snowflake.database = 'db',
snowflake.schema = 'schema',
snowflake.pipe = 'pipe',
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',
snowflake.user = 'user',
snowflake.rsa_public_key_fp = 'fp',
snowflake.private_key = 'pk',
snowflake.s3_bucket = 's3_bucket',
snowflake.aws_access_key_id = 'aws_id',
snowflake.aws_secret_access_key = 'secret_key',
snowflake.aws_region = 'region',
snowflake.max_batch_row_num = '1030',
snowflake.s3_path = 's3_path',
);
支持 BigQuery sink 的 upsert 类型
append-only类型的 BigQuery sink,这意味着只有
INSERT操作会传递到 BigQuery 表中。现在,1.9 版本新增支持
upsert类型的 Sink,允许传递
UPDATE和
DELETE操作。创建
upsertSink 时,需要在 BigQuery 中进行额外的配置,并且必须设置相应的权限和主键。
CREATE SINK big_query_sink
FROM mv
WITH (
connector = 'bigquery',
type = 'upsert',
bigquery.s3.path= '${s3_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
access_key = '${aws_access_key}',
secret_access = '${aws_secret_access}',
region = '${aws_region}',
);
Delta Lake sink 支持 GCS
CREATE SINK命令。
CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'deltalake',
type = 'append-only',
location = 'gs://bucket-name/path/to/file',
gcs.service.account = '{type: service_acct,
project_id: id,
private_key:key,
client_email: email}'
);
将数据从 RisingWave 导出到 Snowflake[11] 将数据从 RisingWave 导出到 BigQuery[12] 将数据从 RisingWave 导出到 Delta Lake[13]
8总结
RECOVER 命令: https://docs.risingwave.com/docs/current/sql-recover/
[2]ALTER SYSTEM 命令: https://docs.risingwave.com/docs/current/sql-alter-system/
[3]查看和配置运行时参数: https://docs.risingwave.com/docs/current/view-configure-runtime-parameters/
[4]查看和配置系统参数: https://docs.risingwave.com/docs/current/view-configure-system-parameters/
[5]非仅追加模式的时间连接: https://docs.risingwave.com/docs/current/query-syntax-join-clause/#process-time-temporal-joins
[6]订阅: https://docs.risingwave.com/docs/current/subscription/
[7]从 Apache Iceberg 中摄取数据: https://docs.risingwave.com/docs/current/ingest-from-iceberg/#time-travel
[8]从 MySQL CDC 中摄取数据: https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/
[9]从 PostgreSQL CDC 中摄取数据: https://docs.risingwave.com/docs/current/ingest-from-postgres-cdc/
[10]集成页面: https://docs.risingwave.com/docs/current/rw-integration-summary/
[11]将数据从 RisingWave 导出到 Snowflake: https://docs.risingwave.com/docs/current/sink-to-snowflake/
[12]将数据从 RisingWave 导出到 BigQuery: https://docs.risingwave.com/docs/current/sink-to-bigquery/
[13]将数据从 RisingWave 导出到 Delta Lake: https://docs.risingwave.com/docs/current/sink-to-delta-lake/
[14]发布说明: https://docs.risingwave.com/release-notes/
关于 RisingWave


往期推荐
技术内幕




