暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【Apache Doris】部分列更新 最佳实践指南

一臻数据 2024-11-26
554

更多趣文请关注一臻数据

部分列更新,主要是指直接更新表中某些字段值,而不是全部的字段值。可以采用 Update 语句来进行更新,这种 Update 语句一般采用先将整行数据读出,然后再更新部分字段值,再写回。这种读写事务非常耗时,并且不适合大批量数据写入。 

Doris 在主键模型的导入更新,提供了可以直接插入或者更新部分列数据的功能,不需要先读取整行数据,这样更新效率就大幅提升了。 

本文适用范围:2.0.2 及之后的版本,将一起学习Doris的Stream Load最佳实践指南。

一、使用场景

部分列更新的使用场景主要包括以下几个方面:

  1. 实时动态更新:在需要频繁更新某些字段的场景中,例如用户标签表中的行为信息更新,以支持广告或推荐系统的实时分析和决策。
  2. 大宽表拼接:将多张源表的数据合并成一张大宽表,可以通过部分列更新来实现。
  3. 数据修正:在需要修正某些数据的场景中,部分列更新可以有效减少更新的开销。
  4. 高频并发写入:部分列更新支持高频的并发写入,适用于需要实时更新大量行但仅涉及少数列的场景。
  5. 性能优化:在更新少数列时,部分列更新可以显著提高性能,尤其是在涉及大量行的情况下。

这些场景中,部分列更新通过减少不必要的数据写入和锁定,提升了系统的整体性能和响应速度。

注意事项

  1. 2.0 版本仅在 Unique Key 的 Merge-on-Write 实现中支持了部分列更新能力
  2. 从 2.0.2 版本开始,支持使用 INSERT INTO 进行部分列更新
  3. 不支持在有同步物化视图的表上进行部分列更新

二、实践指南

  • doris建表
CREATE TABLE `test_partial_update` (
  `id` int(11NULL,
  `value` varchar(20NULL,
  `date_time` datetime(6not NULL DEFAULT CURRENT_TIMESTAMP,
  `dt` datetime(6default current_timestamp(6on update current_timestamp(6)
ENGINE=OLAP
unique KEY(`id`)
COMMENT '部分列更新'
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

on update current_timestamp

是否在该行有列更新时将该列的值更新为当前时间 (current_timestamp
)。该特性只能在开启了 Merge-on-Write 的 Unique 表上使用,开启了这个特性的列必须声明默认值,且默认值必须为 current_timestamp
。如果此处声明了时间戳的精度,则该列默认值中的时间戳精度必须与该处的时间戳精度相同。

  • 首先插入一条数据:
mysql> insert into test_partial_update(idvaluevalues(1"doris");
Query OK, 1 row affected (0.04 sec)
{'label':'label_3ed18eb2142c42a6_b02373286719ab46', 'status':'VISIBLE', 'txnId':'60072'}

mysql> select * from test_partial_update;
+------+-------+----------------------------+----------------------------+
| id   | value | date_time                  | dt                         |
+------+-------+----------------------------+----------------------------+
|    1 | doris | 2024-10-13 10:45:22.000000 | 2024-10-13 10:45:22.257000 |
+------+-------+----------------------------+----------------------------+
1 row in set (0.01 sec)

2.1 insert

如果只想更改value值,并且保留数据的插入时间,可以开启insert的部分列更新功能并指定插入的列名:

set enable_unique_key_partial_update=true;

mysql> insert into test_partial_update(idvaluevalues(1"SelectDB");
Query OK, 1 row affected (0.03 sec)
{'label':'label_6a1e2c7306e84c81_a3ba841f7b302bb3', 'status':'VISIBLE', 'txnId':'60074'}

mysql> select * from test_partial_update;
+------+----------+----------------------------+----------------------------+
| id   | value    | date_time                  | dt                         |
+------+----------+----------------------------+----------------------------+
|    1 | SelectDB | 2024-10-13 10:45:22.000000 | 2024-10-13 10:48:12.619000 |
+------+----------+----------------------------+----------------------------+
1 row in set (0.01 sec)

可以看出此时date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。

2.2 stream load

  • 新建一份数据

vim test_partial_update.csv

1, "SelectDB"

  • 在columns处指定key列(必须包含所有 key 列,不然无法更新)以及要更新的value列,设置header:"partial_columns:true":
curl --location-trusted -u root:123456 -T test_partial_update.csv -H "format:csv" -H "column_separator:," -H "partial_columns:true"  -H "columns:id, value" http://10.16.10.6:8030/api/demo/test_partial_update/_stream_load

  • 查询数据可知date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。
mysql> select * from test_partial_update;
+------+-------------+----------------------------+----------------------------+
| id   | value       | date_time                  | dt                         |
+------+-------------+----------------------------+----------------------------+
|    1 |  "SelectDB" | 2024-10-13 10:45:22.000000 | 2024-10-13 11:11:51.795000 |
+------+-------------+----------------------------+----------------------------+
1 row in set (0.01 sec)

2.3 Flink Doris Connector

  • mysql建表,插入数据:
CREATE TABLE `test_partial_update` (
  `id` INT(11NULL,
  `value` VARCHAR(20NULL,
  `date_time` DATETIME(6NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
  `dt` DATETIME(6DEFAULT CURRENT_TIMESTAMP(6ON UPDATE CURRENT_TIMESTAMP(6)
);

mysql> insert into test_partial_update(idvaluevalues(1"SelectDB");
Query OK, 1 row affected (0.00 sec)

mysql> select * from test_partial_update;
+------+----------+----------------------------+----------------------------+
| id   | value    | date_time                  | dt                         |
+------+----------+----------------------------+----------------------------+
|    1 | SelectDB | 2024-10-14 16:08:14.058728 | 2024-10-14 16:08:14.058728 |
+------+----------+----------------------------+----------------------------+
1 row in set (0.00 sec)

  • 进入flink
./bin/sql-client.sh embedded

  • 开启Checkpoint

Flink作业周期性执行checkpoint,记录Binlog位点,当作业发生Failover时,便会从之前记录的Binlog位点继续处理。

Flink SQL> SET execution.checkpointing.interval = 3s;

  • 创建MySQL CDC表和Dois sink表,并设置'sink.properties.partial_columns' = 'true'
CREATE TABLE cdc_mysql_source (
    id INT,
    `value` STRING,
    date_time TIMESTAMP,
    dt TIMESTAMP
WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '10.16.10.6',
    'port' = '3326'
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test',
    'scan.incremental.snapshot.chunk.key-column'='id',
    'table-name' = 'test_partial_update'
);

CREATE TABLE doris_sink (
    id INT,
    `value` STRING,
    date_time TIMESTAMP,
    dt TIMESTAMP

WITH (
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'table.identifier' = 'demo.test_partial_update',
  'username' = 'root',
  'password' = '123456',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial_columns' = 'true' -- 开启部分列更新
);


insert into doris_sink(id`value`select id,`value` from cdc_mysql_source;

  • 去 doris 查询数据可知date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。
mysql> select * from test_partial_update;
+------+-------------+----------------------------+----------------------------+
| id   | value       | date_time                  | dt                         |
+------+-------------+----------------------------+----------------------------+
|    1 |  "SelectDB" | 2024-10-13 10:45:22.000000 | 2024-10-14 16:11:23.546000 |
+------+-------------+----------------------------+----------------------------+
1 row in set (0.01 sec)

常见问题FAQ

Q1. 如果开启了部分列更新之后插入报错"errCode = 2, detailMessage = Insert has filtered data in strict mode"

  • 原因:控制 insert 语句是否开启严格模式的会话变量enable_insert_strict的默认值为 true,即 insert 语句默认开启严格模式,而在严格模式下进行部分列更新不允许更新不存在的 key。所以,在使用 insert 语句进行部分列更新的时候如果希望能插入不存在的 key,需要在enable_unique_key_partial_update设置为 true 的基础上同时将enable_insert_strict设置为 false。

解决方法:set enable_insert_strict=false;

Q2. MySQL CDC表数据同步失败。

  • 原因1:mysql未开启binlog

解决办法:

可以通过以下命令检查 binlog 是否已启用:

SHOW VARIABLES LIKE 'log_bin';

如果 log_bin
的值是 OFF
,需要在 MySQL 配置文件中启用 binlog:

[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1

重启 MySQL 服务后生效。

  • 原因2:检查 MySQL 表 test_partial_update
    的字段类型与 Flink 表 DDL 是否匹配。字段类型不匹配可能会导致数据读取问题。

  • 原因3:flink 任务报错 MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

解决方法:

  set time_zone='+8:00';
  #或者
  set persist time_zone='+8:00';

下期,我们将一起探讨Doris其它更有趣有用有价值的内容,敬请期待!



 一臻数据致力于大数据AI时代的前沿内容分享,会持续分享更多有趣有用有态度的知识。同时也欢迎大家投稿,共建共进,帮助圈友们冲破认知壁垒,实现自我提升!

另外,一臻整理了一份《Apache Doris知识库》,其中包含 Apache Doris 学习资料、方案中心、企业实践  问题指南 等内容,会持续更新,欢迎关注公众号,免费领取

资料获取 🔗 欢迎扫描下方二维码图片 加入【Apache Doris社区】免费领取❗️



往期推荐

走进开源,拥抱开源

大数据平台开发规范示例

大数据仓库开发规范示例

大数据质量管制规范示例

Flink CDC 1.0至3.0回忆录

【Apache Doris】Manager 极致丝滑地运维管理

【Apache Doris】如何一键实现MySQL万表整库同步?

【Apache Doris】如何实现高并发点查?(原理+实践全析)

为什么Apache Doris适合做大数据的复杂计算,MySQL不适合?

如何正确地使用ChatGPT(角色扮演+提示工程)

超强满血不收费的AI绘图教程来了(在线Stable Diffusion一键即用)

 

点击下方蓝字关注一臻数据

文章转载自一臻数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论