暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SeaTunnel 2.3.9同步Oracle数据至Doris出现乱码?别慌,这么做

SeaTunnel 2025-04-30
190




点击蓝字 关注我们


Oracle 字符集为 ASCII,如果想要同步数据到 Doris,就会出现中文乱码问题。那么出现这个问题该怎么办呢?实际上有办法解决。




解决思路




数据从数据库读取到Source的时候去处理,也就是从ResultSet 里面获取数据的时候,去重新编码

先下载好 2.3.9 版本的 SeaTunnel 源码,进入到seatunnel-connectors-v2
下的connector-jdbc
模块

SeaTunnel 读取数据的流程大致如下:

JdbcSourceFactory 工作流程

JdbcSourceFactory 会加载我们的 source 配置参数,构建 JdbcSourceConfigJdbcDialect,创建 JdbSource

JdbSource 任务处理

JdbSource 会创建 SourceSplitEnumerator 去拆分任务,然后创建 JdbcSourceReader 根据拆分去执行任务。

JdbcSourceReader 数据处理

JdbcSourceReader 会构建 JdbcInputFormat,走 pollNext 方法去循环处理数据。

pollNext 方法执行步骤

JdbcSourceReader 的 pollNext 方法会先调用 JdbcInputFormat 的 open 方法获取 ParpareStatement 和 ResultSet 等信息,然后调用 JdbcInputFormat 的 nextRecord 方法去处理 ResultSet,把数据格式转换成 SeaTunnel 需要 SeaTunnelRow

nextRecord 方法处理

JdbcInputFormat 的 nextRecord 会调用 JdbcRowConverter 的 toInternal 方法处理 ResultSetJdbcRowConverter 的实现类 AbstractJdbcRowConverter,是我们需要修改的(针对于 Oracle 数据库)。

问题及解决思路

进入到 AbstractJdbcRowConverter 的 toInternal 方法,发现在处理 String 类型字段时:它默认是 JdbcFieldTypeUtils.getString (rs, resultSetIndex),这一步拿到的数据就已经乱码了。我们需要修改为 JdbcFieldTypeUtils.getBytes (rs, resultSetIndex),然后根据字节数组去重新编码。




解决方案




下面给出我们的解决方案:

针对于 Oracle 字符集可以存储中文的情况,没有必要先得到字节数组,再重新编码,直接 getString 是可以的

为此我们需要一个开关,来判断是否需要转码

JdbcInputFormat
类里面,我们加上一个 params 的参数:

    private final Map<StringString> params;

    JdbcInputFormat
    构造方法里,为这个参数附上值:

      public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, CatalogTable> tables) {
          this.jdbcDialect = JdbcDialectLoader.load(
                  config.getJdbcConnectionConfig().getUrl(), 
                  config.getCompatibleMode()
          );
          this.chunkSplitter = ChunkSplitter.create(config);
          this.jdbcRowConverter = jdbcDialect.getRowConverter();
          this.tables = tables;
          // 在此处把去得到 properties 的参数信息
          this.params = config.getJdbcConnectionConfig().getProperties();
      }


      然后在JdbcInputFormat
      nextRecord
      方法里的 jdbcRowConverter
      toInternal
      方法,增加一个 params 参数: 

        // 这里比原来多传递了一个 params 参数,用于做是否转码的处理
        SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitTableSchema, params);


        接着到实现类AbstractJdbcRowConverter
        添加一个编码的方法:

          public static String convertCharset(byte[] value, String charSet) {
              if (value == null || value.length == 0) {
                  return null;
              }
              log.info("value code is ::::{}"Arrays.toString(value));
              try {
                  return new String(value, charSet);
              } catch (UnsupportedEncodingException e) {
                  throw new RuntimeException(e);
              }
          }
          在该实现类的 toInternal 方法,针对 String 类型的处理方式,修改如下:
            case STRING:
                // params 的 sourceCharset 存在且值为 GBK,我才去转码,否则就走默认的 getString()
                if (params == null || params.isEmpty()) {
                    fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
                } else {
                    String sourceCharset = params.get("sourceCharset");
                    if ("GBK".equalsIgnoreCase(sourceCharset)) {
                        fields[fieldIndex] = convertCharset(JdbcFieldTypeUtils.getBytes(rs, resultSetIndex), sourceCharset);
                    } else {
                        fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
                    }
                }
                break;

            修改完之后需要把connector-jdbc
            重新打包,替换掉 seatunnel 的connectors 目录下的connector-jdbc-2.3.9.jar
            ,然后重启集群即可。

            我们的配置参数脚本:

            如果 Oracle 不存在乱码的情况,这个properties 就不需要传递

            ps:如果您在connector-jdbc
            里面打印信息,请到 seatunnel 的 logs 目录下,去查看你的 worker节点日志信息。

            转载自花载酒779
            原文链接:https://blog.csdn.net/m0_66532138/article/details/146957750

            Apache SeaTunnel

            Apache SeaTunnel是一个云原生的高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会ApacheSoftware Foundation正式宣布Apache SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达8k+,社区达到6000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等170多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。




            同步Demo

            MySQL→Doris | MySQLCDC | MySQL→Hive | HTTP → Doris  | HTTP → MySQL | MySQL→StarRocks|MySQL→Elasticsearch |Kafka→ClickHouse

            新手入门

            SeaTunnel 让数据集成变得 So easy!3 分钟入门指南
             0 到 1 快速入门 /初探/深入理解 
              分布式集群部署 | CDC数据同步管道 | Oracle-CDC

            最佳实践

            OPPO|清风|天翼云|马蜂窝|孩子王|哔哩哔哩|唯品会|众安保险|兆原数通 | 亚信科技|映客|翼康济世|信也科技|华润置地|Shopee|京东科技|58同城|互联网银行|JPMorgan

            测试报告

            SeaTunnel VS GLUE |  VS Airbyte |  VS DataX|SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

            源码解析

            Zeta引擎源码解析(一) |(二) |(三)| API 源码解析 |2.1.1源码解析|封装 Flink 连接数据库解析



            Apache SeaTunnel





            Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

            仓库地址: 
            https://github.com/apache/seatunnel
            网址:
            https://seatunnel.apache.org/
            Apache SeaTunnel 下载地址:
            https://seatunnel.apache.org/download
            衷心欢迎更多人加入!
            我们相信,在Community Over Code(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
            我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
            提交问题和建议:
            https://github.com/apache/seatunnel/issues
            贡献代码:
            https://github.com/apache/seatunnel/pulls
            订阅社区开发邮件列表 : 
            dev-subscribe@seatunnel.apache.org
            开发邮件列表:
            dev@seatunnel.apache.org
            加入 Slack:
            https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
            关注 Twitter: 
            https://twitter.com/ASFSeaTunne

            文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论