在处理海量数据的实时分析场景中,我们团队最近完成了一个从Oracle生产环境到Apache Doris的数据同步项目。这套方案解决了我们长期以来面临的数据延迟问题,这里分享下具体实践经验。
为什么选择OGG和Doris组合?
传统ETL流程(之前使用的是kettle)在处理实时业务数据时显得力不从心,数据不能达到实时同步。我们生产环境中的Oracle数据库(多个系统的数据库)每分钟产生上万笔交易记录,而分析团队需要近实时的数据来支持决策。
经过评估,Oracle GoldenGate(OGG)成为首选工具:它能以最小影响捕获Oracle的变更数据,而Apache Doris作为一款高性能OLAP引擎,可以支持我们复杂的分析需求。
环境准备工作
我们的实施环境:
- 源端:Oracle 19c (生产环境)
- 目标端:Apache Doris 2.1.19 (4节点集群)
- 中间件:Oracle GoldenGate 21.3.0
前期准备时踩了不少坑。最痛苦的是Oracle源端需要开启ARCHIVELOG和补充日志,这在繁忙的生产系统上需要精心规划维护窗口。
配置OGG捕获进程
配置Extract进程是整个方案的基础。我们采用了两阶段配置:
GGSCI> ADD EXTRACT ext1, INTEGRATED TRANLOG, BEGIN NOW
GGSCI> ADD EXTTRAIL ./dirdat/et, EXTRACT ext1
EXTRACT ext1
SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
useridalias oggforncc
EXTTRAIL ./dirdat/et
TABLE ncc20.table1;
TABLE ncc20.table2;
值得一提的是,我们最初尝试使用经典捕获模式,但在高负载时出现了延迟问题,切换到集成捕获后情况有了明显改善。
数据投递到Doris
这步是最有挑战性的部分。由于OGG不直接支持Doris,我们采用了两种方案:
- OGG导出到Kafka,再由Doris消费
EXTRACT pump1
PASSTHRU
RMTHOST 10.0.53.**, PORT 9092
RMTTRAIL ./dirdat/rt
TABLE project.*;
-- 配置Doris Kafka消费
CREATE ROUTINE LOAD project.kafka_load ON project
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.53.**:9092",
"kafka_topic" = "ogg_doris_sync",
"property.group.id" = "doris_kafka_project",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
- 自定义适配器直接写入Doris HTTP接口
我们为特别重要的几张表实现了自定义Handler,直接将变更通过HTTP推送到Doris,这种方式延迟更低,但开发和维护成本较高。
遇到的问题及解决方案
实施过程中碰到了几个典型问题:
-
字段类型不匹配:Oracle的NUMBER映射到Doris时需要明确指定精度和范围,否则会出现精度丢失。我们为关键字段增加了额外检验逻辑。
-
大事务处理:某些批处理操作会产生数百万行变更的大事务,导致OGG处理缓慢。解决方法是配置OGG的BATCHSQL参数来批量提交。
-
主键冲突:增量同步时偶尔出现主键冲突问题,我们配置了合并策略来优化处理:
-- Doris表创建时指定合并策略
CREATE TABLE ods_t_rj_****
(
id INT,
name VARCHAR(50),
update_time DATETIME
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 8
PROPERTIES("replication_num" = "3");
4、创建Kafka连接配置文件(kafka.props)
gg.handlerlist=kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=kafkaproducer.properties gg.handler.kafkahandler.topicMappingTemplate=ogg_doris_${tableName} gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} gg.handler.kafkahandler.formatType=json gg.handler.kafkahandler.insertOpKey=I gg.handler.kafkahandler.updateOpKey=U gg.handler.kafkahandler.deleteOpKey=D gg.handler.kafkahandler.treatMissingTableAsError=false gg.handler.kafkahandler.includeTokens=true gg.handler.kafkahandler.includeTableName=true gg.handler.kafkahandler.includeOpType=true goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE
5、创建Kafka生产者配置(kafkaproject.properties)
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 acks=1 compression.type=snappy batch.size=131072 linger.ms=50 max.in.flight.requests.per.connection=1
性能优化
生产环境中,配置了OGG的BATCHSQL和GROUPTRANSOPS参数,将小事务合并处理,大幅提升了吞吐量。Doris端则通过调整分桶策略和预聚合提升了查询性能。
经过优化后,我们实现了从Oracle数据变更到Doris可查询的端到端延迟控制在10秒以内,满足了实时分析需求。
总结
通过这个项目,我对OGG和Doris的理解更深入了。虽然整合过程有些复杂,但这个组合确实能满足高性能实时数据集成的需求。如果你也面临类似挑战,希望这些经验能帮到你。
下一步,我们计划进一步优化数据质量监控环节,确保同步过程中出现的异常能被及时发现和处理。




