暂无图片
暂无图片
6
暂无图片
暂无图片
暂无图片

【数据同步-seatunnel工具】将mysql同步到oracle实战

原创 virvle 2025-08-30
526

要说mysql同步到Oracle的工具,除了传统的OGG,应该考虑的就是 ST(seatunnel) 了吧,简直太好用啦 ~
当前生产已稳定运行4个月,源端3个库,完美支持!推荐给大家试试~

image.png

1. seatunnel怎么用?来看看帮助信息

./bin/seatunnel.sh -h Usage: seatunnel.sh [options] Options: --async Run the job asynchronously, when the job is submitted, the client will exit (default: false) -can, --cancel-job Cancel job by JobId --check Whether check config (default: false) -cj, --close-job Close client the task will also be closed (default: true) -cn, --cluster The name of cluster -c, --config Config file --decrypt Decrypt config file, When both --decrypt and --encrypt are specified, only --encrypt will take effect (default: false) -m, --master, -e, --deploy-mode SeaTunnel job submit master, support [local, cluster] (default: cluster) --encrypt Encrypt config file, when both --decrypt and --encrypt are specified, only --encrypt will take effect (default: false) --get_running_job_metrics Gets metrics for running jobs (default: false) -h, --help Show the usage message -j, --job-id Get job status by JobId -l, --list list job status (default: false) --metrics Get job metrics by JobId -n, --name SeaTunnel job name (default: SeaTunnel) -r, --restore restore with savepoint by jobId -s, --savepoint savepoint job by jobId --set-job-id Set custom job id for job -i, --variable Variable substitution, such as -i city=beijing, or -i date=20190318.We use ',' as separator, when inside "", ',' are treated as normal characters instead of delimiters. For example, -i city="beijing,shanghai". If you want to use dynamic parameters, you can use the following format: -i date=$(date +"%Y%m%d"). (default: [])

参考说明:

1)作业提交相关参数

参数 缩写 说明
–config -c 必选参数,指定配置文件路径
–name -n 设置作业名称(默认"SeaTunnel")
–master -m 或 -e 指定部署模式:local(本地)或cluster(集群,默认值)
–async 异步模式运行作业,提交后客户端立即退出(默认false)
–variable -i 变量替换,格式:-i key=value,多个变量用逗号分隔

2)作业管理相关参数

参数 缩写 说明
–job-id -j 通过JobId获取作业状态
–list -l 列出所有作业状态
–cancel-job -can 通过JobId取消作业
–close-job -cj 关闭客户端时同时关闭任务(默认true)
–savepoint -s 为指定JobId的作业创建保存点
–restore -r 从保存点恢复作业
–metrics 获取指定作业的监控信息
–get_running_job_metrics 获取正在运行的作业监控信息

2. 怎么安装,可以参考之前文章

【数据同步】Seatunnel初体验,磕磕绊绊-终于同步上Oracle-CDC

3. 日常运维之任务管理

3.1 查看任务:running 表示正在运行的,当然也会看到其他的状态

./bin/seatunnel.sh -l

image.png

3.2 暂停任务

./bin/seatunnel.sh -s 967714059992432641

image.png

3.3 再次启动已暂停的任务

./bin/seatunnel.sh -r 967714059992432641 -c $SEATUNNEL_HOME/config/mysql_virdb_config

由于恢复的时候未加入后台运行,导致一直前端,直接ctrl+c 退出后,状态为

image.png

PS: 故恢复时候,一定要加入相关的参数,job命名,是否后台运行等

./bin/seatunnel.sh -r 967714059992432641 -c $SEATUNNEL_HOME/config/mysql_virdb_config --async -n job_mysql_virdb

再次查看作业状态,这就是我们期待的样子

image.png

3.4 取消任务

该命令会取消指定作业,取消作业后,作业会被停止,作业的状态会变为CANCELED。

支持批量取消作业,可以一次取消多个作业。

被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r 恢复。

./bin/seatunnel.sh -can 967714059992432641

image.png

PS: 取消后的状态,与直接ctrl+c 退出后竟然的相识,若再启动应该会丢了一部分数据吧

4. SeaTunnel 日志配置

配置文件:$SEATUNNEL_HOME/config/log4j2.properties

4.1 为每个作业单独配置日志文件(重启其中一个job后生效)

如下配置

rootLogger.appenderRef.file.ref = fileAppender
appender.file.layout.pattern = [%X{ST-JID}] %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n

更改为如下

rootLogger.appenderRef.file.ref = routingAppender
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n

image.png

4.2 SeaTunnel 支持定时删除旧日志文件,以避免磁盘空间不足

您可以在 $SEATUNNEL_HOME/config/seatunnel.yaml 配置文件中添加以下配置:
默认配置项如下(时间按分钟计算,比如1440为1440分钟)

seatunnel: engine: history-job-expire-minutes: 1440 telemetry: logs: scheduled-deletion-enable: true

说明:
history-job-expire-minutes: 设置历史作业和日志的保留时间(单位:分钟)。系统将在指定的时间后自动清除过期的作业信息和日志文件。
scheduled-deletion-enable: 启用定时清理功能,默认为 true。系统将在作业达到 history-job-expire-minutes 设置的过期时间后自动删除相关日志文件。关闭该功能后,日志将永久保留在磁盘上,需要用户自行管理,否则可能影响磁盘占用。建议根据需求合理配置。

5. Web UI 查看任务情况

ps:终于等到了,居然有 Web UI界面监控作业情况了,太棒了吧!!!来来来,看下如何配置及访问

5.1 配置Web UI

配置文件:$SEATUNNEL_HOME/config/seatunnel.yaml,默认配置如下

seatunnel: engine: http: enable-http: true port: 8080

5.2 访问Web UI 页面

打开浏览器,访问 http://ip:8080 即可

5.3 同步数据比对

通最新创建时间及更新时间,以及行数进行比对

image.png

6 常见错误

6.1 必须配置数据库名及表名,不然会报错

2025-04-24 17:59:42,640 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error, 2025-04-24 17:59:42,641 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues 2025-04-24 17:59:42,641 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed 2025-04-24 17:59:42,642 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a sink for identifier 'jdbc'. at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:250) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:669) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:592) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:240) at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123) at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:165) ... 2 more Caused by: org.apache.seatunnel.api.configuration.util.OptionValidationException: ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - There are unconfigured options, the options('database') are required because ['generate_sink_sql' == true] is true. at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:200) at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:107) at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:47) at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:239) ... 8 more 2025-04-24 17:59:42,642 ERROR [o.a.s.c.s.SeaTunnel ] [main] - =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a sink for identifier 'jdbc'. at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:250) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:669) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:592) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:240) at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123) at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:165) ... 2 more Caused by: org.apache.seatunnel.api.configuration.util.OptionValidationException: ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - There are unconfigured options, the options('database') are required because ['generate_sink_sql' == true] is true. at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:200) at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:107) at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:47) at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:239) ... 8 more

6.2 报错2,ID问题

[967787050499571713] 2025-04-24 22:00:16,190 ERROR [.s.e.s.c.CheckpointCoordinator] [hz.main.generic-operation.thread-14] - report error from task
org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.IllegalArgumentException: can't find field [ID]
        at org.apache.seatunnel.api.table.type.SeaTunnelRowType.indexOf(SeaTunnelRowType.java:87)
        at org.apache.seatunnel.api.table.type.SeaTunnelRowType.indexOf(SeaTunnelRowType.java:77)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.createWriter(JdbcSink.java:133)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.createWriter(JdbcSink.java:66)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSink.createWriter(MultiTableSink.java:82)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:342)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
        at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$runInternal$0(NotifyTaskRestoreOperation.java:107)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

        at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:391) ~[seatunnel-starter.jar:2.3.10]
        at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:182) ~[seatunnel-starter.jar:2.3.10]
        at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48) ~[seatunnel-starter.jar:2.3.10]
        at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.10]

image.png

6.3 已存在会提示

Error Msg = ORA-00955: name is already used by an existing object

        at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:530)
        ... 43 more

        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$6(CoordinatorService.java:656)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Web UI的任务展示与服务器查询有不一致的地方

jobid:967961857958608897 之前暂停过,再启动后,在web ui存在2条记录

image.png

image.png

6.4 可能是数据延迟导致,过几天再看的,已经没有

image.png

7. 参考文档

  • 下载地址
    https://seatunnel.apache.org/download/

  • source 源端 mysql-cdc配置
    https://seatunnel.apache.org/zh-CN/docs/2.3.10/connector-v2/source/MySQL-CDC

  • sink 目标端 oracle 配置
    https://seatunnel.apache.org/zh-CN/docs/2.3.10/connector-v2/sink/Oracle

  • 为每个作业配置单独的配置项
    https://seatunnel.apache.org/zh-CN/docs/2.3.10/seatunnel-engine/logging

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论