暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【用户投稿】使用 SeaTunnel 进行 HTTP 同步到 Doris 实战经验分享

SeaTunnel 2024-07-18
720

需求背景

由于我司的项目中需要接入不同的数据源的数据到数仓中,在选择了众多的产品中最后选择了Apache SeaTunnel,对比参考

目前我这边使用的接口,暂时没有接口认证,如果需要接口认证的方式接入数据,再做讨论及测试

实际使用

Apache SeaTunnel版本:2.3.4

话不多说,先贴最终的运行文件,由于我使用的json
rest-api
提交方式,所以结果如下图所示:

使用rest
conf
的区别就在于job执行的环境不同,conf使用的是ClientJobExecutionEnvironment
(经测试也支持json格式),而rest方式则使用的是RestJobExecutionEnvironment

接口返回的数据格式

{
  "code""0000",
  "msg""成功",
  "data": {
    "records": [
      {
        "id""1798895733824393218",
        "taskContent""许可证02",
        "taskType""许可证"
      }
    ]
  }
}
// 实际数据分页的很多,以上是示例

接入配置

{
  "env": {
    "job.mode""BATCH",
    "job.name""SeaTunnel_Job"
  },
  "source": [
    {
      "result_table_name""Table13367210156032",
      "plugin_name""Http",
      "url""http://*.*.*.*:*/day_plan_repair/page",
      "method""GET",  // Http请求方式 只支持GET和POST两种方式
      "format""json"// 默认值是text 只支持json和text两种方式
      "json_field": {   // 可以看看作是从上述接口返回的数据中取数据的路径和key的映射关系,value则是取值的JsonPath
        "id""$.data.records[*].id",
        "taskContent""$.data.records[*].taskContent",
        "taskType""$.data.records[*].taskType"
      },
      // "pageing": {
      //   "page_field": "current", // 当前页的key,就是分页接口的请求参数中的当前页的key,
      //   "batch_size": 10         // 每页取多少数据
      // },
      "schema": {
        "fields": {
          "id""BIGINT"// 主键列问题,详见下面的问题
          "taskContent""STRING",
          "taskType""STRING"
        }
      }
    }
  ],
  "transform": [
    {
      "field_mapper": { // key是source中的schema.field中的值,value是sink中使用的值,例如下面的save_mode_create_template里的${rowtype_fields}使用的就是value,可以更改value作为sink的新命名列
        "id""id"
        "taskContent""task_content",
        "taskType""task_type"
      },
      "result_table_name""Table13367210156033",
      "source_table_name""Table13367210156032",
      "plugin_name""FieldMapper"
    }
  ],
  "sink": [
    {
      "source_table_name""Table13367210156033",
      "plugin_name""Doris",
      "fenodes ""*.*.*.*:*",
      "database""test",
      "password""****",
      "username""****",
      "table""ods_day_plan",
      "sink.label-prefix""test-ods_day_plan"// Stream Load 导入使用的标签前缀。在 2pc 场景下,需要全局唯一性来保证 SeaTunnel 的 EOS 语义
      "sink.enable-2pc"false// 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。Doris的二阶段提交详见https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual
      "data_save_mode""APPEND_DATA"// 数据保存模式 DROP_DATA、APPEND_DATA、CUSTOM_PROCESSING、ERROR_WHEN_DATA_EXISTS官方提供了四种,我使用保留数据库结构,追加数据,可以详见源码中的DataSaveMode枚举
      "schema_save_mode""CREATE_SCHEMA_WHEN_NOT_EXIST"// Scheme保存模式 RECREATE_SCHEMA、CREATE_SCHEMA_WHEN_NOT_EXIST、ERROR_WHEN_SCHEMA_NOT_EXIST 我使用的是当Schema不存在时创建;具体释义详见SchemaSaveMode枚举
      "save_mode_create_template""CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP\n UNIQUE KEY (id)\n DISTRIBUTED BY HASH (id)\n PROPERTIES (\n \"replication_allocation\" = \"tag.location.default: 1\",\n \"in_memory\" = \"false\",\n  \"storage_format\" = \"V2\",\n \"disable_auto_compaction\" = \"false\"\n )",
      "sink.enable-delete"true//是否启用删除,此配置只有Doris的表模型是Unique模型,同时需要Doris表开启批量删除功能(默认开启 0.15+ 版本)
      "doris.config": {
        "format""json",
        "read_json_by_line""true"
      }
    }
  ]
}


实际使用中遇到的问题

Handle save mode failed

具体的报错日志中包含

Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:
 UNIQUE KEY ()
             ^
Encountered: )
Expected: IDENTIFIER

解决方案:

详见链接[issue](https://github.com/apache/seatunnel/issues/6
646)

使用了上述配置文件中的

save_mode_create_template
字段解决,目标中值的
可以自行根据业务配置。

NoSuchMethodError

java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;
 at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:179) ~[connector-influxdb-2.3.4.jar:2.3.4]
 at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:120) ~[connector-influxdb-2.3.4.jar:2.3.4]
 at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~[connector-influxdb-2.3.4.jar:2.3.4]

在使用influxdb的连接时,遇到了jar包冲突的问题,最终发现在创建http链接
的时候,
retrofit
2
的依赖与
datahub
连接器中的存在版本冲突,我这里没有
使用到
datahub
,所以删除datahub的连接器即可解决问题!

Apache Doris BIGINT类型精度丢失问题

详见帖子

配置主键

Doris配置save_mode_create_template
包含主键时,主键类型必须是数字或日期类型。

上面的source配置的schema中的id,接口返回的实际类型是字符串类型,但是是雪花算法的全数字类型,所以使用BIGINT
类型自动转换

原因是Sink配置中的save_mode_create_template
UNIQUE KEY
使用的id作为主键,Doris要求主键列类型必须是数字或者日期类型!!

个人经验

  1. 当sink、source、transform只有一个时,可以省略result_table_name、source_table_name
    配置项
  2. 下载源码,修改源码,在源码中增加log日志,并打包替换SeaTunnel运行时的jar,以方便根据日志得到自己想知道的结果或者方便理解代码
  3. 根据1的运用,熟知代码后可以进行二次开发,例如需要token认证的接口该怎么处理,值得深思。
  4. 另外source配置中的json_field中的value的JsonPath值,不支持 列表中复杂类型取值的问题Array或Map<String, Object>。也可以考虑二开解决
    // 举例:
    {
      "code""0000",
      "msg""成功",
      "data": {
        "records": [
          {
            "id""1798895733824393218",
            "taskContent""许可证02",
            "taskType""许可证",
            "region_list": [ // 此格式中的region_list无法解析和同步 $.data.records[*].region_list[*].id 会报数据和总数不匹配的错误
              {
                "id":"1",
                "name""11"
              },
              {
                "id":"1",
                "name""11"
              }
            ]
          }
        ]
      }
    }

    附上 测试代码 (使用的是JDK17)

        private static final Option[] DEFAULT_OPTIONS = {
                Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL
        };
        private JsonPath[] jsonPaths;
        private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);

        @Test
        public void test5() {
            String data = """
                    {
                        "
    code": "0000",
                        "
    msg": "成功",
                        "
    data": {
                            "
    records": [
                                {
                                    "
    id": "1798895733824393218",
                                    "
    taskContent": "12312312313"
                                }
                            ]
                        }
                    }
                    "
    "";
            Map<String, String> map = new HashMap<>();
            map.put("id""$.data.records[*].id");
            map.put("taskContent""$.data.records[*].taskContent");
            JsonField jsonField = JsonField.builder().fields(map).build();
            initJsonPath(jsonField);
            data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString();
            log.error(data);
        }
        // 以下代码都是HttpSourceReader中的代码
        private void initJsonPath(JsonField jsonField) {
            jsonPaths = new JsonPath[jsonField.getFields().size()];
            for (int index = 0; index < jsonField.getFields().keySet().size(); index++) {
                jsonPaths[index] =
                        JsonPath.compile(
                                jsonField.getFields().values().toArray(new String[] {})[index]);
            }
        }

        private List<Map<String, String>> parseToMap(List<List<String>> datas, JsonField jsonField) {
            List<Map<String, String>> decodeDatas = new ArrayList<>(datas.size());
            String[] keys = jsonField.getFields().keySet().toArray(new String[] {});

            for (List<String> data : datas) {
                Map<String, String> decodeData = new HashMap<>(jsonField.getFields().size());
                final int[] index = {0};
                data.forEach(
                        field -> {
                            decodeData.put(keys[index[0]], field);
                            index[0]++;
                        });
                decodeDatas.add(decodeData);
            }

            return decodeDatas;
        }

        private List<List<String>> decodeJSON(String data) {
            ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data);
            List<List<String>> results = new ArrayList<>(jsonPaths.length);
            for (JsonPath path : jsonPaths) {
                List<String> result = jsonReadContext.read(path);
                results.add(result);
            }
            for (int i = 1; i < results.size(); i++) {
                List<?> result0 = results.get(0);
                List<?> result = results.get(i);
                if (result0.size() != result.size()) {
                    throw new HttpConnectorException(
                            HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
                            String.format(
                                    "[%s](%d) and [%s](%d) the number of parsing records is inconsistent.",
                                    jsonPaths[0].getPath(),
                                    result0.size(),
                                    jsonPaths[i].getPath(),
                                    result.size()));
                }
            }

            return dataFlip(results);
        }

        private List<List<String>> dataFlip(List<List<String>> results) {

            List<List<String>> datas = new ArrayList<>();
            for (int i = 0; i < results.size(); i++) {
                List<String> result = results.get(i);
                if (i == 0) {
                    for (Object o : result) {
                        String val = o == null ? null : o.toString();
                        List<String> row = new ArrayList<>(jsonPaths.length);
                        row.add(val);
                        datas.add(row);
                    }
                } else {
                    for (int j = 0; j < result.size(); j++) {
                        Object o = result.get(j);
                        String val = o == null ? null : o.toString();
                        List<String> row = datas.get(j);
                        row.add(val);
                    }
                }
            }
            return datas;
        }

    以上是我的一些经验分享,希望对大家有帮助!

同步Demo

 MySQL→Doris
MySQLCDC
MySQL→Hive

新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南
从 0 到 1 快速入门 Apache SeaTunnel 
初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel

 MySQL 同步到 Hive / 从MySQL同步到StarRocks
通过 SeaTunnel 将数据写入 OSS-HDFS 
MySQL 到 Elasticsearch 实时同步解决方案

启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 
 部署 Apache SeaTunnel 分布式集群
Apache SeaTunnel Web部署指南
基于Apache SeaTunnel构建CDC数据同步管道
【用户投稿】Apache SeaTunnel 2.3.3+Web 1.0.0版本安装部署
【安装部署】Apache SeaTunnel 和 Web快速安装详解
【保姆级教程】使用SeaTunnel同步Kafka的数据到ClickHouse

最佳实践

 OPPO 清风 天翼云 马蜂窝
孩子王 哔哩哔哩 唯品会
众安保险 兆原数通 亚信科技

测试报告


 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!
最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!
比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布
SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比


Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台


仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论