- 发表评论
- 76 views
- A+
doris实现数据聚合的三种方式
假设以
①ds、hour为维度计算pv
②ds为维度计算pv
1、三种聚合方式
(1)聚合模型+数据源多次写入
flink写入kafka代码
insert into log_exp_pvselect dshour,device_idfromdwd_kafka_log;insert into log_exp_pvselect ds'ALL' hour,device_idfromdwd_kafka_log;
doris建表语句
create table log_exp_pv(ds date,hour varchar(1000),pv bigint sum DEFAULT '0')AGGREGATE KEY(ds,hour)partition by range(ds)(start('20220514') end ('20220520') every (INTERVAL 1 day))distributed by hash(platform) buckets 32PROPERTIES("dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "DAY","dynamic_partition.start" = "-1","dynamic_partition.end" = "7","dynamic_partition.prefix" = "p","dynamic_partition.buckets" = "32","replication_num" = "1");
routine load 从kafka导入数据到doris
CREATE ROUTINE LOAD routine_load_log_exp_pv ON log_exp_pvCOLUMNS TERMINATED BY ",",COLUMNS (ds,hour,pv = 1)PROPERTIES("desired_concurrent_number"="3","max_error_number"="0","strict_mode" = "false","format" = "json")FROM KAFKA("kafka_broker_list"= "","kafka_topic" = "log_exp_pv","property.group.id"="routine_load_log_exp_pv");
(2)明细模型 + 物化视图
flink写入kafka代码
insert into log_exp_pvselect dshour,device_idfromdwd_kafka_log;
doris建表语句
create table log_exp_detail(ds date,hour varchar(1000),user_id bigint)duplicate key(ds,hour)partition by range(ds)(start('20220512') end ('20220520') every (INTERVAL 1 day))distributed by hash(channel) buckets 32PROPERTIES("dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "DAY","dynamic_partition.start" = "-1","dynamic_partition.end" = "7","dynamic_partition.prefix" = "p","dynamic_partition.buckets" = "32","replication_num" = "1");
routine load 从kafka导入数据到doris
CREATE ROUTINE LOAD routine_load_log_exp_detail ON log_exp_detailCOLUMNS TERMINATED BY ",",COLUMNS (ds,hour,user_id)PROPERTIES("desired_concurrent_number"="3","max_error_number"="0","strict_mode" = "false","format" = "json")FROM KAFKA("kafka_broker_list"= "","kafka_topic" = "log_exp_detail","property.group.id"="routine_load_log_exp_detail");
创建物化视图
CREATE MATERIALIZED VIEW mv_ds ASSELECT ds,count(user_id) pvFROM log_exp_detailGROUP BY ds;
CREATE MATERIALIZED VIEW mv_ds_hour ASSELECT ds,hour,count(user_id) pvFROM log_exp_detailGROUP BY ds,hour;
查询计划
explainSELECT ds,count(user_id) pvFROM log_job_exp_detailGROUP BY ds;

(3)聚合模型 + rollup
doris建表语句
create table log_exp_pv(ds date,hour varchar(1000),pv bigint sum DEFAULT '0')AGGREGATE KEY(ds,hour)partition by range(ds)(start('20220514') end ('20220520') every (INTERVAL 1 day))distributed by hash(platform) buckets 32rollup (r1(ds),r2(ds,hour))PROPERTIES("dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "DAY","dynamic_partition.start" = "-1","dynamic_partition.end" = "7","dynamic_partition.prefix" = "p","dynamic_partition.buckets" = "32","replication_num" = "1");
routine load 从kafka导入数据到doris
CREATE ROUTINE LOAD routine_load_log_exp_pv ON log_exp_pvCOLUMNS TERMINATED BY ",",COLUMNS (ds,hour,user_id)PROPERTIES("desired_concurrent_number"="3","max_error_number"="0","strict_mode" = "false","format" = "json")FROM KAFKA("kafka_broker_list"= "","kafka_topic" = "log_exp_pv","property.group.id"="routine_load_log_exp_pv");
查询计划
explainSELECT ds,count(user_id) pvFROM log_exp_pvGROUP BY ds;

2、三种导入方式对比
(1)聚合模型+数据源多次写入
优点:
①只需关心聚合模型即可
②存储为聚合模型,降低了数据量
缺点:
①假设维度为n,则数据量会膨胀2^n倍,导致数据导入压力增大
②维度过多时,明细数据开发复杂度增加
(2)明细模型 + 物化视图
优点:
①明细数据只需一份
②明细模型保留原始数据
缺点:
①存储为明细数据,导致存储数据量较大
②多维度时,物化视图需要建立多次
③物化视图语法有严格限制
(3)聚合模型 + rollup
优点:
①存储为聚合模型,降低了数据量
②建表时即可指定rollup,降低了开发复杂性
缺点:
①明细数据无法保留
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




