暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

3秒学不会Palo Doris的数据导入你打我!

857Hub 2021-12-03
2160

点击上方蓝字关注我们


导入总览


支持的数据源

Doris 提供多种数据导入方案,可以针对不同的数据源进行选择。


数据源导入方式
百度对象存储(BOS)、HDFS、AFS使用 Broker Load 导入数据
本地文件导入本地数据
百度消息服务(Kafka)订阅 Kafka 日志
MySQL、Oracle、PostgreSQL通过外部表同步数据
通过 JDBC 导入数据使用JDBC同步数据
导入 JSON 格式数据JSON 格式数据导入说明
MySQL binlog敬请期待



数据导入总体说明

Doris 的数据导入实现有以下共性特征,这里分别介绍,以帮助大家更好的使用数据导入功能


原子性保证

Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。

同时,一个导入作业都会有一个 Label。这个 Label 是在一个数据库(Database)下唯一的,用于唯一标识一个导入作业。Label 可以由用户指定,部分导入功能也会由系统自动生成。

Label 是用于保证对应的导入作业,仅能成功导入一次。一个被成功导入的 Label,再次使用时,会被拒绝并报错 Label already used
。通过这个机制,可以在 Doris 测做到 At-Most-Once
 语义。如果结合上游系统的 At-Least-Once
 语义,则可以实现导入数据的 Exactly-Once
 语义。


同步和异步

导入方式分为同步和异步。对于同步导入方式,返回结果即表示导入成功还是失败。而对于异步导入方式,返回成功仅代表作业提交成功,不代表数据导入成功,需要使用对应的命令查看导入作业的运行状态。


支持的数据格式

不同的导入方式支持的数据格式略有不同。

导入方式支持的格式
Broker LoadParquet,ORC,csv,gzip
Stream Loadcsv, gzip, json
Routine Loadcsv, json


正文来了 伙计们!!!!!!!!!!!!!!!!


导入本地数据


Stream Load 用于将本地文件导入到 Doris 中。

不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。

该方式中涉及 HOST:PORT 应为 HTTP 协议端口。

  • 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。

  • 私有化部署用户可以使用 Leader Node(FE)的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 Compute Node 所在机器。

本章节我们以 curl 命令为例演示如何进行数据导入。

文章最后,我给出一个使用 Java 导入数据的代码示例。


导入数据

Stream Load 的请求体如下:

PUT /api/{db}/{table}/_stream_load
  1. 创建一张表

    通过 CREATE TABLE
     命令创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:

CREATE TABLE IF NOT EXISTS load_test
(
id INT,
name VARCHAR(128)
)
DISTRIBUTED BY HASH(id) BUCKETS 8;


2. 导入数据

执行以下 curl 命令导入本地文件:

 curl -u user:passwd -H "label:example_label_1" -T /path/to/local/your_file.txt http://host:port/api/example_db/load_test/_stream_load
  • user:passwd 为在 Doris 中创建的用户。初始用户为 admin,密码为创建 Doris 集群时设置的密码。

  • host:port 为 Compute Node 的 HTTP 协议端口,默认是 8040,可以在智能云 Doris 集群详情页面查看。

  • label: 可以在 Header 中指定 Label 唯一标识这个导入任务。


3. 等待导入结果

Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

{
"TxnId": 1003,
"Label": "example_label_1",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 1,
"NumberUnselectedRows": 0,
"LoadBytes": 40888898,
"LoadTimeMs": 2144,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 325,
"WriteDataTimeMs": 1933,
"CommitAndPublishTimeMs": 106,
"ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}

  • Status
     字段状态为 Success
     即表示导入成功。


使用建议

  • Stream Load 只能导入本地文件。

  • 建议一个导入请求的数据量控制在 1 GB 以内。如果有大量本地文件,可以分批并发提交。


Java 代码示例

这里通过一个简单的 JAVA 示例来执行 Stream Load:

package demo.doris;

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

/*
这是一个 Doris Stream Load 示例,需要依赖
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
*/

public class DorisStreamLoader {
// 1. 对于公有云公户,这里填写 Compute Node 地址以及 HTTP 协议访问端口(8040)。
// 2. 对于开源用户,可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
private final static String HOST = "your_host";
private final static int PORT = 8040;
private final static String DATABASE = "db1"; // 要导入的数据库
private final static String TABLE = "tbl1"; // 要导入的表
private final static String USER = "root"; // Doris 用户名
private final static String PASSWD = ""; // Doris 密码
private final static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径

private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
HOST, PORT, DATABASE, TABLE);

private final static HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
// 如果连接目标是 FE,则需要处理 307 redirect。
return true;
}
});

public void load(File file) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));

// 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
put.setHeader("label","label1");
put.setHeader("column_separator",",");

// 设置导入文件。
// 这里也可以使用 StringEntity 来传输任意数据。
FileEntity entity = new FileEntity(file);
put.setEntity(entity);

try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}

final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(
String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}

System.out.println("Get load result: " + loadResult);
}
}
}

private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}

public static void main(String[] args) throws Exception{
DorisStreamLoader loader = new DorisStreamLoader();
File file = new File(LOAD_FILE_NAME);
loader.load(file);
}
}



导入BOS中的数据


这一章主要介绍如何导入 BOS 中存储的数据。


准备工作

请先通过以下步骤,在百度对象存储(Baidu Object Storage,BOS)上存放需导入到Doris中的数据。

  1. 开通 BOS 服务

    请参阅 https://cloud.baidu.com/doc/BOS/s/Jk4xttg03(使用BOS

  2. 创建 Bucket

    请参阅 https://cloud.baidu.com/doc/BOS/s/Fk4xtwbze(创建Bucket

    注意:Bucket 所属地域必须和 Doris 集群所属地域相同。Doris 地域通常可以在 Doris 控制台页面左上角查看

  3. 上传文件到 Bucket

    有两种方式可以上传文件到 Bucket。

    通过控制台直接上传,请参阅文档 https://cloud.baidu.com/doc/BOS/s/Gk4xty0f2(上传Object)。

    通过命令行工具上传:

    1. 请先https://cloud.baidu.com/doc/BOS/s/Ejwvyqobd#bos-cli%E4%B8%8B%E8%BD%BD%E5%9C%B0%E5%9D%80(下载 BOS CLI 命令行工具 )。这里以Linux操作系统的 bce-cli-0.10.10.zip 为例。

    2. 解压后,执行以下命令配置 BOS CLI:

./bce -c
BOS Access Key ID []: 353b8dexxxxxxxxxxb156d3
BOS Secret Access Key []: ea15a18xxxxxx29f78e8d77
BCE Security Token [None]:
Default region name [bj]:
Default domain [bj.bcebos.com]:
Default use auto switch domain [yes]:
Default breakpoint_file_expiration [7] days:
Default use https protocol [no]:
Default multi upload thread num [10]

  • BOS Access Key ID 和 BOS Secret Access Key 可在公有云页面右上角点击 账户头像 -> 安全认证
     获取。

  • Default region name 和 Default domain 请填写Bucket所在地域的缩写,可查阅 https://cloud.baidu.com/doc/BOS/s/Ck1rk80hn#%E8%AE%BF%E9%97%AE%E5%9F%9F%E5%90%8D%EF%BC%88endpoint%EF%BC%89(访问域名)获取。

  • 其他配置使用默认即可。

3. 使用以下命令上传文件:

./bce bos cp /path/to/local/your_file.txt bos:/your_bucket_name



开始导入

Doris 支持通过以下两种方式导入 BOS 中的数据。


通过 Broker Load 命令提交导入作业

Broker 是一个无状态的进程服务,已经内置在 Doris 集群中,主要用于对外部数据源的文件进行读写操作。Broker Load 则是利用 Broker 服务访问源数据,进行数据导入的一种方式。


1. 创建一张表

通过 CREATE TABLE
 命令创建一张表用于存储待导入的数据。

示例如下:

CREATE TABLE IF NOT EXISTS load_test
(
id INT,
name VARCHAR(128)
)
DISTRIBUTED BY HASH(id) BUCKETS 8;


2. 提交 Broker Load 导入作业

    示例如下:

LOAD LABEL example_db.exmpale_label_1
(
DATA INFILE("bos://your_bucket_name/your_file.txt")
INTO TABLE load_test
COLUMNS TERMINATED BY ","
)
WITH BROKER "bos"
(
"bos_endpoint" = "http://bj.bcebos.com",
"bos_accesskey" = "353b8dexxxxxxxxxxb156d3",
"bos_secret_accesskey" = "ea15a18xxxxxx29f78e8d77"
)
PROPERTIES
(
"timeout" = "3600"
);

  • LABEL:每个导入作业都需要指定一个唯一的 Label,后续可以通过这个 Label 查看导入作业的运行状态。

  • WITH BROKER "bos":"bos" 仅仅是 Broker 服务进程的名称,并不代表需要访问的数据源。Broker的名称可以使用 admin 用户连接 Doris 后,通过 SHOW BROKER
     命令查看。

  • "bos_accesskey" 和 "bos_secret_accesskey" 可在公有云页面右上角点击 账户头像 -> 安全认证
     获取。

  • "bos_endpoint" 和 BOS Bucket 所在地域有关。


3. 查看导入作业状态

Broker Load 是一个异步命令,第二步中的命令执行成功,仅代表作业提交成功。具体执行情况,须通过以下命令查看。

mysql> SHOW LOAD FROM example_db WHERE LABEL="exmpale_label_1"
*************************** 1. row ***************************
JobId: 10041
Label: exmpale_label_1
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=100000000
TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2020-11-17 09:38:04
EtlStartTime: 2020-11-17 09:38:09
EtlFinishTime: 2020-11-17 09:38:09
LoadStartTime: 2020-11-17 09:38:09
LoadFinishTime: 2020-11-17 09:42:07
URL: N/A
JobDetails: {"Unfinished backends":{},"ScannedRows":0,"TaskNumber":0,"All backends":{},"FileNumber":0,"FileSize":0}
1 row in set (0.01 sec)

其中 State
 字段状态为 FINISHED
 则代表导入成功,数据可查询。关于 SHOW LOAD
 返回结果的具体说明,可参阅 SHOW LOAD
 命令文档。


4. 取消导入作业

正在运行中的 Broker Load 导入作业可以使用以下命令取消:

CANCEL LOAD WHERE LABEL="exmpale_label_1";

取消成功后,所有已导入的数据也会被回滚。Doris 会自动保证一个导入作业中的数据原子生效。


通过外部表进行导入

Doris 也支持通过创建一张 Broker 外部表的方式引用BOS上存储的数据,然后通过 INSERT INTO SELECT
 的方式导入数据。


  1. 创建一张表

    创建一张用于存储数据的表。同上,不再赘述。

2. 创建 Broker 外部表

CREATE EXTERNAL TABLE IF NOT EXISTS example_db.example_ext_table
(
id INT,
name VARCHAR(128)
)
ENGINE=BROKER
PROPERTIES
(
"broker_name" = "bos",
"path" = "bos://your_bucket_name/your_file.txt",
)
BROKER PROPERTIES
(
"bos_endpoint" = "http://bj.bcebos.com",
"bos_accesskey" = "353b8dexxxxxxxxxxb156d3",
"bos_secret_accesskey" = "ea15a18xxxxxx29f78e8d77"
);


  • ENGINE:ENGINE 的类型为 BROKER,表示这是一张借助 Broker 服务访问数据的外部表。

  • "broker_name" 为 "bos","bos" 仅仅是 Broker 服务进程的名称,并不代表需要访问的数据源。Broker的名称可以使用 admin 用户连接 Doris 后,通过 SHOW BROKER
     命令查看。

  • "bos_accesskey" 和 "bos_secret_accesskey" 可在公有云页面右上角点击 账户头像 -> 安全认证
     获取。

  • "bos_endpoint" 和 BOS Bucket 所在地域有关。


注:外部表中的数据也可以通过 SELECT
 直接查询,但效率较低,推荐导入到 Doris 中后在执行查询。


3. 导入数据

使用以下命令从外部表导入数据到内部表。

INSERT INTO load_test SELECT * FROM example_ext_table;

该命令为同步命令(异步提交 INSERT 作业的操作正在开发中),命令返回成功即表示数据导入完成。当导入数据量较大时,可能会因查询超时而任务取消。



订阅Kafka日志


用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。

Doris 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once
 消费语义。


准备工作

开通百度消息服务

百度消息服务(BMS)基于 Kafka 在百度智能云提供托管服务,请先按照以下流程开通服务。

  1. 请根据 (BMS快速入门) https://cloud.baidu.com/doc/Kafka/s/9jwvygf3k 文档开通消息服务

  2. 下载证书压缩包 kafka-key.zip 并解压,解压后将得到以下文件

3.上传证书文件到 HTTP 服务器。

因为后续 Doris 需要从某个 HTTP 服务器上下载这些整数以供访问 Kafka。因此我们需要先将这些证书上传到 HTTP 服务器。这个 HTTP 服务器必须要能够被 Doris 的 Leader Node 节点所访问。

如果您没有合适的 HTTP 服务器,可以参照以下方式借助百度对象存储(BOS)来完成:

    1. 根据 https://cloud.baidu.com/doc/BOS/s/Jk4xttg03(开始使用),https://cloud.baidu.com/doc/BOS/s/Fk4xtwbze(创建Bucket) 文档开通BOS服务并创建一个 Bucket。注意,Bucket所在地域必须和 Doris 集群所在地域相同

            2. 将以下三个文件上传到 Bucket

    • ca.pem

    • client.key

    • client.pem

      3. 在 BOS Bucket 文件列表页面,点击文件右侧的 文件信息
    ,可以获取 HTTP 访问连接。请将 连接有效时间
     设为 -1
    ,即永久。

    注:请不要使用带有 cdn 加速的 http 下载地址。这个地址某些情况无法被 Doris 访问。


    自建 Kafka 服务

    如果使用自建 Kafka 服务,请确保 Kafka 服务和 Doris 集群在同一个 VPC 内,并且相互之间的网络能够互通。


    订阅 Kafka 消息

    订阅 Kafka 消息使用了 Doris 中的例行导入(Routine Load)功能。

    用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。

    请注意以下使用限制:

    1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。

    2. 支持的消息格式如下:

    • csv 文本格式。每一个 message 为一行,且行尾不包含换行符。

    • Json 格式,详见 http://palo.baidu.com/docs/%E6%93%8D%E4%BD%9C%E6%89%8B%E5%86%8C/%E6%95%B0%E6%8D%AE%E5%AF%BC%E5%85%A5/JSON%E6%A0%BC%E5%BC%8F%E6%95%B0%E6%8D%AE%E5%AF%BC%E5%85%A5%E8%AF%B4%E6%98%8E/    (导入JSON数据

  1. 仅支持 Kafka 0.10.0.0(含) 以上版本。

  2. 访问 SSL 认证的 Kafka 集群

    例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。

    访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE
     命令上传到 Plao 中,并且 catalog 名称为 kafka

    这里给出示例:

    上传文件

    CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");

    CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");

    CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");

    上传完成后,可以通过 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-FILE/     (SHOW FLIES) 命令查看已上传的文件。


    创建例行导入作业

    访问无认证的 Kafka 集群

    CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl

    COLUMNS TERMINATED BY ","

    PROPERTIES
    (
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "property.group.id" = "xxx",
    "property.client.id" = "xxx",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );

      • max_batch_interval/max_batch_rows/max_batch_size
         用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。


      2. 访问 SSL 认证的 Kafka 集群

      CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
      COLUMNS TERMINATED BY ",",
      PROPERTIES
      (
      "max_batch_interval" = "20",
      "max_batch_rows" = "300000",
      "max_batch_size" = "209715200",
      )
      FROM KAFKA
      (
      "kafka_broker_list"= "broker1:9091,broker2:9091",
      "kafka_topic" = "my_topic",
      "property.security.protocol" = "ssl",
      "property.ssl.ca.location" = "FILE:ca.pem",
      "property.ssl.certificate.location" = "FILE:client.pem",
      "property.ssl.key.location" = "FILE:client.key",
      "property.ssl.key.password" = "abcdefg"
      );

      • 对于百度消息服务,property.ssl.key.password
         属性可以在 client.properties
         文件中获取。

      查看导入作业状态

      查看作业状态的具体命令和示例请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-ROUTINE-LOAD/   (SHOW ROUTINE LOAD) 命令文档。

      查看某个作业的任务运行状态的具体命令和示例请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-ROUTINE-LOAD-TASK/    (SHOW ROUTINE LOAD TASK) 命令文档。

      只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

      修改作业属性

      用户可以修改已经创建的作业的部分属性。具体说明请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E8%BE%85%E5%8A%A9%E5%91%BD%E4%BB%A4/ALTER-ROUTINE-LOAD/   (ALTER ROUTINE LOAD) 命令手册。


      作业控制

      用户可以通过 STOP/PAUSE/RESUME
       三个命令来控制作业的停止,暂停和重启。



      使用JDBC同步数据


      用户可以通过 JDBC 协议,使用 INSERT 语句进行数据导入。

      INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。INSERT 语句支持以下两种语法:

      * INSERT INTO table SELECT ...
      * INSERT INTO table VALUES(...)

      单次写入

      单次写入是指用户直接执行一个 INSERT 命令。示例如下:

      INSERT INTO example_tbl (col1, col2, col3) VALUES (1000, "baidu", 3.25);

      对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。

      因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频词的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。

      该方式仅用于线下简单测试或低频少量的操作。

      或者可以使用以下方式进行批量的插入操作:

      INSERT INTO example_tbl VALUES
      (1000, "baidu1", 3.25)
      (2000, "baidu2", 4.25)
      (3000, "baidu3", 5.25);

      我们建议一批次插入条数在尽量大,比如几千甚至一万条一次。或者可以通过下面的程序的方式,使用 PreparedStatement 来进行批量插入。

      JDBC 示例

      这里我们给出一个简单的 JDBC 批量 INSERT 代码示例:

      package demo.doris;

      import java.sql.Connection;
      import java.sql.DriverManager;
      import java.sql.PreparedStatement;
      import java.sql.SQLException;

      public class DorisJDBCDemo {

      private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
      private static final String DB_URL_PATTERN = "jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true";
      private static final String HOST = "127.0.0.1"; // Leader Node host
      private static final int PORT = 8030; // http port of Leader Node
      private static final String DB = "example_db";
      private static final String TBL = "example_tbl";
      private static final String USER = "admin";
      private static final String PASSWD = "my_pass";

      private static final int INSERT_BATCH_SIZE = 10000;

      public static void main(String[] args) {
      insert();
      }

      private static void insert() {
      // 注意末尾不要加 分号 ";"
      String query = "insert into " + TBL + " values(?, ?)";
      // 设置 Label 以做到幂等。
      // String query = "insert into " + TBL + " WITH LABEL my_label values(?, ?)";

      Connection conn = null;
      PreparedStatement stmt = null;
      String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT, DB);
      try {
      Class.forName(JDBC_DRIVER);
      conn = DriverManager.getConnection(dbUrl, USER, PASSWD);
      stmt = conn.prepareStatement(query);

      for (int i =0; i < INSERT_BATCH_SIZE; i++) {
      stmt.setInt(1, i);
      stmt.setInt(2, i * 100);
      stmt.addBatch();
      }

      int[] res = stmt.executeBatch();
      System.out.println(res);
      } catch (Exception e) {
      e.printStackTrace();
      } finally {
      try {
      if (stmt != null) {
      stmt.close();
      }
      } catch (SQLException se2) {
      se2.printStackTrace();
      }
      try {
      if (conn != null) conn.close();
      } catch (SQLException se) {
      se.printStackTrace();
      }
      }
      }
      }

      请注意以下几点:

      1. JDBC 连接串需添加 rewriteBatchedStatements=true
       参数,并使用 PreparedSta tement
       方式。

      目前 Doris 暂不支持服务器端的 PrepareStatemnt,所以 JDBC Driver 会在客户端进行批量 Prepare。

      rewriteBatchedStatements=true
       会确保 Driver 执行批处理。并最终形成如下形式的 INSERT 语句发往 Doris:

      INSERT INTO example_tbl VALUES
      (1000, "baidu1", 3.25)
      (2000, "baidu2", 4.25)
      (3000, "baidu3", 5.25);

      2. 批次大小

          因为是在客户端进行批量处理,因此一批次如果过大的话,话占用客户端的内存资源,需关注。

      Doris 后续会支持服务端的 PrepareStatemnt,敬请期待。

      3. 导入原子性

      和其他到导入方式一样,INSERT 操作本身也支持原子性。每一个 INSERT 操作都是一个导入事务,能够保证一个 INSERT 中的所有数据原子性的写入。

      前面提到,我们建议在使用 INSERT 导入数据时,采用 ”批“ 的方式进行导入,而不是单条插入。

      同时,我们可以为每次 INSERT 操作设置一个 Label。通过 Label 机制 可以保证操作的幂等性和原子性,最终做到数据的不丢不重。



      通过外部表同步数据


      Doris 可以创建通过 ODBC 协议访问的外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT
       的方式导入外部表的数据。

      本文档主要介绍如何创建通过 ODBC 协议访问的外部表,以及如何导入这些外部表的数据。目前支持的数据源包括:

      • MySQL

      • Oracle

      • PostgreSQL

      创建外部表

      这里仅通过示例说明使用方式。

      1. 创建 ODBC Resource

        ODBC Resource 的目的是用于统一管理外部表的连接信息。

      CREATE EXTERNAL RESOURCE `oracle_odbc`
      PROPERTIES (
      "type" = "odbc_catalog",
      "host" = "192.168.0.1",
      "port" = "8086",
      "user" = "test",
      "password" = "test",
      "database" = "test",
      "odbc_type" = "oracle",
      "driver" = "Oracle"
      );

      这里我们创建了一个名为 oracle_odbc
       的 Resource,其类型为 odbc_catalog
      ,表示这是一个用于存储 ODBC 信息的 Resource。odbc_type
       为 oracle
      ,表示这个 OBDC Resource 是用于连接 Oracle 数据库的。


      2. 创建外部表

      CREATE EXTERNAL TABLE `ext_oracle_tbl` (
      `k1` decimal(9, 3) NOT NULL COMMENT "",
      `k2` char(10) NOT NULL COMMENT "",
      `k3` datetime NOT NULL COMMENT "",
      `k5` varchar(20) NOT NULL COMMENT "",
      `k6` double NOT NULL COMMENT ""
      ) ENGINE=ODBC
      COMMENT "ODBC"
      PROPERTIES (
      "odbc_catalog_resource" = "oracle_odbc",
      "database" = "test",
      "table" = "baseall"
      );

      这里我们创建一个 ext_oracle_tbl
       外部表,并引用了之前创建的 oracle_odbc
       Resource。


      连接百度云数据库 RDS

      1. 创建 RDS

      注意:创建 RDS 实例时,网络类型 -> 选择网络
       处,需要选择和 Doris 集群相同的网络(VPC)。可用区可以不同。


      2. 创建资源

      CREATE EXTERNAL RESOURCE `rds_odbc`
      PROPERTIES (
      "type" = "odbc_catalog",
      "host" = "mysql56.rdsxxxxx.rds.gz.baidubce.com",
      "port" = "3306",
      "user" = "rdsroot",
      "password" = "12345",
      "odbc_type" = "mysql",
      "driver" = "MySQL"
      );

      需修改其中 host
      port
      user
      password
       对应的参数。host port 可以在 RDS 实例信息也查看。user 和 password 需要在 RDS 控制台创建账户后获取。


      3. 创建外部表

      CREATE EXTERNAL TABLE `mysql_table` (
      k1 int,
      k2 int
      ) ENGINE=ODBC
      PROPERTIES (
      "odbc_catalog_resource" = "rds_odbc",
      "database" = "mysql_db",
      "table" = "mysql_tbl"
      );

      创建之后,就可以进行查询等操作了。


      导入数据


      1. 创建 Doris 表

      这里我们创建一张 Doris 的表,列信息和上一步创建的外部表 ext_oracle_tbl
       一样:

      CREATE EXTERNAL TABLE `doris_tbl` (
      `k1` decimal(9, 3) NOT NULL COMMENT "",
      `k2` char(10) NOT NULL COMMENT "",
      `k3` datetime NOT NULL COMMENT "",
      `k5` varchar(20) NOT NULL COMMENT "",
      `k6` double NOT NULL COMMENT ""
      )
      COMMENT "Doris Table"
      DISTRIBUTED BY HASH(k1) BUCKETS 2;
      PROPERTIES (
      "replication_num" = "1"
      );

      2. 导入数据 (从 ext_oracle_tbl
      表 导入到 doris_tbl
       表)

      INSERT INTO doris_tbl SELECT k1,k2,k3 FROM ext_oracle_tbl limit 100;

      INSERT 命令是同步命令,返回成功,即表示导入成功。


      注意事项

      • 必须保证外部数据源与 Doris 集群在同一个VPC内,并且 Compute Node 可以和外部数据源的网络是互通的。

      • ODBC 外部表本质上是通过单一 ODBC 客户端访问数据源,因此并不合适一次性导入大量的数据,建议分批多次导入。




      JSON格式数据导入说明


      Doris 支持导入 JSON 格式的数据。本文档主要说明在进行JSON格式数据导入时的注意事项。

      支持的导入方式

      目前只有以下导入方式支持 Json 格式的数据导入:

      • 将本地 JSON 格式的文件通过 STREAM LOAD 方式导入。

      • 通过 ROUNTINE LOAD 订阅并消费 Kafka 中的 JSON 格式消息。

      暂不支持其他方式的 JSON 格式数据导入。


      支持的 Json 格式

      当前前仅支持以下两种 Json 格式:

      1. 以 Array 表示的多行数据

        以 Array 为根节点的 Json 格式。Array 中的每个元素表示要导入的一行数据,通常是一个 Object。示例如下:

      [
      { "id": 123, "city" : "beijing"},
      { "id": 456, "city" : "shanghai"},
      ...
      ]

      [
      { "id": 123, "city" : { "name" : "beijing", "region" : "haidian"}},
      { "id": 456, "city" : { "name" : "beijing", "region" : "chaoyang"}},
      ...
      ]

      这种方式通常用于 Stream Load 导入方式,以便在一批导入数据中表示多行数据。

      这种方式必须配合设置 strip_outer_array=true
       使用。Doris 在解析时会将数组展开,然后依次解析其中的每一个 Object 作为一行数据。

      2.以 Object 表示的单行数据

      以 Object 为根节点的 Json 格式。整个 Object 即表示要导入的一行数据。示例如下:

      { "id": 123, "city" : "beijing"}
      { "id": 123, "city" : { "name" : "beijing", "region" : "haidian" }}

      这种方式通常用于 Routine Load 导入方式,如表示 Kafka 中的一条消息,即一行数据。

      fuzzy_parse 参数

      在 STREAM LOAD 中,可以添加 fuzzy_parse
       参数来加速 JSON 数据的导入效率。

      这个参数通常用于导入 以 Array 表示的多行数据 这种格式,所以一般要配合 strip_outer_array=true
       使用。

      这个功能要求 Array 中的每行数据的字段顺序完全一致。Doris 仅会根据第一行的字段顺序做解析,然后以下标的形式访问之后的数据。该方式可以提升 3-5X 的导入效率。


      Json Path

      Doris 支持通过 Json Path 抽取 Json 中指定的数据。

      注:因为对于 Array 类型的数据,Doris 会先进行数组展开,最终按照 Object 格式进行单行处理。所以本文档之后的示例都以单个 Object 格式的 Json 数据进行说明。

      1. 不指定 Json Path

      如果没有指定 Json Path,则 Doris 会默认使用表中的列名查找 Object 中的元素。示例如下:

      表中包含两列: id
      city

      Json 数据如下:

      { "id": 123, "city" : "beijing"}

      则 Doris 会使用 id
      city
       进行匹配,得到最终数据 123
       和 beijing

      如果 Json 数据如下:

      { "id": 123, "name" : "beijing"}

      则使用 id
      city
       进行匹配,得到最终数据 123
       和 null

      2. 指定 Json Path

      通过一个 Json 数据的形式指定一组 Json Path。数组中的每个元素表示一个要抽取的列。示例如下:

      ["$.id", "$.name"]
      ["$.id.sub_id", "$.name[0]", "$.city[0]"]

      Doris 会使用指定的 Json Path 进行数据匹配和抽取。


      3. 匹配非基本类型

      前面的示例最终匹配到的数值都是基本类型,如整型、字符串等。Doris 当前暂不支持复合类型,如 Array、Map 等。所以当匹配到一个非基本类型时,Doris 会将该类型转换为 Json 格式的字符串,并以字符串类型进行导入。示例如下:

      Json 数据为:

      { "id": 123, "city" : { "name" : "beijing", "region" : "haidian" }}

      Json Path 为 ["$.city"]
      。则匹配到的元素为:

      { "name" : "beijing", "region" : "haidian" }

      该元素会被转换为字符串进行后续导入操作:

      "{'name':'beijing','region':'haidian'}"


      4. 匹配失败

      当匹配失败时,将会返回 null
      。示例如下:

      Json 数据为:

      { "id": 123, "name" : "beijing"}

      Json Path 为 ["$.id", "$.info"]
      。则匹配到的元素为 123
       和 null

      Doris 当前不区分 Json 数据中表示的 null 值,和匹配失败时产生的 null 值。假设 Json 数据为:

      { "id": 123, "name" : null }

      则使用以下两种 Json Path 会获得相同的结果:123
       和 null

      ["$.id", "$.name"]
      ["$.id", "$.info"]


      5. 完全匹配失败

      为防止一些参数设置错误导致的误操作。Doris 在尝试匹配一行数据时,如果所有列都匹配失败,则会认为这个是一个错误行。假设 Json 数据为:

      { "id": 123, "city" : "beijing" }

      如果 Json Path 错误的写为(或者不指定 Json Path 时,表中的列不包含 id
       和 city
      ):

      ["$.ad", "$.infa"]

      则会导致完全匹配失败,则该行会标记为错误行,而不是产出 null, null


      Json Path 和 Columns

      Json Path 用于指定如何对 JSON 格式中的数据进行抽取,而 Columns 指定列的映射和转换关系。两者可以配合使用。

      换句话说,相当于通过 Json Path,将一个 Json 格式的数据,按照 Json Path 中指定的列顺序进行了列的重排。之后,可以通过 Columns,将这个重排后的源数据和表的列进行映射。举例如下:

      数据内容:

      {"k1" : 1, "k2": 2}

      表结构:

      k2 int, k1 int

      导入语句1(以 Stream Load 为例):

      curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load

      导入语句1中,仅指定了 Json Path,没有指定 Columns。其中 Json Path 的作用是将 Json 数据按照 Json Path 中字段的顺序进行抽取,之后会按照表结构的顺序进行写入。最终导入的数据结果如下:

      +------+------+
      | k1 | k2 |
      +------+------+
      | 2 | 1 |
      +------+------+

      会看到,实际的 k1 列导入了 Json 数据中的 "k2" 列的值。这是因为,Json 中字段名称并不等同于表结构中字段的名称。我们需要显式的指定这两者之间的映射关系。

      导入语句2:

      curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, k1" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load

      相比如导入语句1,这里增加了 Columns 字段,用于描述列的映射关系,按 k2, k1
       的顺序。即按Json Path 中字段的顺序抽取后,指定第一列为表中 k2 列的值,而第二列为表中 k1 列的值。最终导入的数据结果如下:

      +------+------+
      | k1 | k2 |
      +------+------+
      | 1 | 2 |
      +------+------+

      当然,如其他导入一样,可以在 Columns 中进行列的转换操作。示例如下:

      curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, tmp_k1, k1 = tmp_k1 * 100" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load

      上述示例会将 k1 的值乘以 100 后导入。最终导入的数据结果如下:

      +------+------+
      | k1 | k2 |
      +------+------+
      | 100 | 2 |
      +------+------+

      NULL 和 Default 值

      示例数据如下:

      [
      {"k1": 1, "k2": "a"},
      {"k1": 2},
      {"k1": 3, "k2": "c"},
      ]

      表结构为:k1 int null, k2 varchar(32) null default "x"

      导入语句如下:

      curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load

      用户可能期望的导入结果如下,即对于缺失的列,填写默认值。

      +------+------+
      | k1 | k2 |
      +------+------+
      | 1 | a |
      +------+------+
      | 2 | x |
      +------+------+
      | 3 | c |
      +------+------+

      但实际的导入结果如下,即对于缺失的列,补上了 NULL。

      +------+------+
      | k1 | k2 |
      +------+------+
      | 1 | a |
      +------+------+
      | 2 | NULL |
      +------+------+
      | 3 | c |
      +------+------+

      这是因为通过导入语句中的信息,Doris 并不知道 “缺失的列是表中的 k2 列”。如果要对以上数据按照期望结果导入,则导入语句如下:

      curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.k1\", \"$.k2\"]" -H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2, 'x')" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load

      应用示例

      Stream Load

      因为 Json 格式的不可拆分特性,所以在使用 Stream Load 导入 Json 格式的文件时,文件内容会被全部加载到内存后,才开始处理。因此,如果文件过大的话,可能会占用较多的内存。

      假设表结构为:

      id      INT     NOT NULL,
      city VARHCAR NULL,
      code INT NULL

      1. 导入单行数据1

      {"id": 100, "city": "beijing", "code" : 1}

      不指定 Json Path

      curl --location-trusted -u user:passwd -H "format: json" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load

      导入结果:

      100     beijing     1

      指定 Json Path

      curl --location-trusted -u user:passwd -H "format: json" -H "jsonpaths: [\"$.id\",\"$.city\",\"$.code\"]" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load

      导入结果:

      100     beijing     1

      2. 导入单行数据2

      {"id": 100, "content": {"city": "beijing", "code" : 1}}

      指定 Json Path

      curl --location-trusted -u user:passwd -H "format: json" -H "jsonpaths: [\"$.id\",\"$.content.city\",\"$.content.code\"]" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load

      导入结果:

      100     beijing     1

      3. 导入多行数据

      [
      {"id": 100, "city": "beijing", "code" : 1},
      {"id": 101, "city": "shanghai"},
      {"id": 102, "city": "tianjin", "code" : 3},
      {"id": 103, "city": "chongqing", "code" : 4},
      {"id": 104, "city": ["zhejiang", "guangzhou"], "code" : 5},
      {
      "id": 105,
      "city": {
      "order1": ["guangzhou"]
      },
      "code" : 6
      }
      ]

      指定 Json Path

      curl --location-trusted -u user:passwd -H "format: json" -H "jsonpaths: [\"$.id\",\"$.city\",\"$.code\"]" -H "strip_outer_array: true" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load

      导入结果:

      100     beijing                     1
      101 shanghai NULL
      102 tianjin 3
      103 chongqing 4
      104 ["zhejiang","guangzhou"] 5
      105 {"order1":["guangzhou"]} 6

      4. 对导入数据进行转换

      数据依然是示例3中的多行数据,现需要对导入数据中的 code
       列加1后导入。

      curl --location-trusted -u user:passwd -H "format: json" -H "jsonpaths: [\"$.id\",\"$.city\",\"$.code\"]" -H "strip_outer_array: true" -H "columns: id, city, tmpc, code=tmpc+1" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load

      导入结果:

      100     beijing                     2
      101 shanghai NULL
      102 tianjin 4
      103 chongqing 5
      104 ["zhejiang","guangzhou"] 6
      105 {"order1":["guangzhou"]} 7

      Routine Load

      Routine Load 对 Json 数据的处理原理和 Stream Load 相同。在此不再赘述。

      对于 Kafka 数据源,每个 Massage 中的内容被视作一个完整的 Json 数据。如果一个 Massage 中是以 Array 格式的表示的多行数据,则会导入多行,而 Kafka 的 offset 只会增加 1。而如果一个 Array 格式的 Json 表示多行数据,但是因为 Json 格式错误导致解析 Json 失败,则错误行只会增加 1(因为解析失败,实际上 Doris 无法判断其中包含多少行数据,只能按一行错误数据记录)。



      导入事务和原子性


      导入原子性

      Doris 中的所有导入操作都有原子性保证,即一个导入作业中的数据要么全部成功,要么全部失败。不会出现仅部分数据导入成功的情况。

      在 BROKER LOAD 中我们也可以实现多多表的原子性导入。

      对于表所附属的 [物化视图](TODO),也同时保证和基表的原子性和一致性。


      Label 机制

      Doris 的导入作业都可以设置一个 Label。这个 Label 通常是用户自定义的、具有一定业务逻辑属性的字符串。

      Label 的主要作用是唯一标识一个导入任务,并且能够保证相同的 Label 仅会被成功导入一次。

      Label 机制可以保证导入数据的不丢不重。如果上游数据源能够保证 At-Least-Once 语义,则配合 Doris 的 Label 机制,能够保证 Exactly-Once 语义。

      Label 在一个数据库下具有唯一性。Label 的保留期限默认是 3 天。即 3 天后,已完成的 Label 会被自动清理,之后 Label 可以被重复使用。


      最佳实践

      Label 通常被设置为 业务逻辑+时间
       的格式。如 my_business1_20201010_125000

      这个 Label 通常用于表示:业务 my_business1
       这个业务在 2020-10-10 12:50:00
       产生的一批数据。通过这种 Label 设定,业务上可以通过 Label 查询导入任务状态,来明确的获知该时间点批次的数据是否已经导入成功。如果没有成功,则可以使用这个 Label 继续重试导入。



      列的映射、转换与过滤

      Doris 支持丰富的列映射、转换和过滤操作。可以非常灵活的处理需要导入的原始数据。

      本章节主要介绍如何在导入中使用这些功能。

      总体介绍

      Doris 在导入过程中对数据处理步骤分为以下几步:

      1. 数据按原始文件中的列的顺序读入到 Doris

      2. 通过前置过滤条件(PRECEDING FILTER)对原始数据进行一次过滤。

      3. 通过列映射和转换,将原始数据映射到目标列顺序。

      4. 通过后置过滤条件(WHERE)对转换后的数据在进行一次过滤。

      5. 写入最终数据。

      列的映射、转换和过滤参数在导入作业中皆为可选操作。在默认空缺的情况下,Doris 会将源文件中的行按默认的列分割符 \t
       分割后,按顺序对应到表中的列。如果源文件中的列数量和表中的列数量不匹配,则会产生数据质量问题,导致数据无法导入。此时则需要显式的描述列的映射、转换和过滤信息。


      支持的导入方式

      BROKER LOAD

      LOAD LABEL example_db.label1(

      DATA INFILE("bos://bucket/input/file")
      INTO TABLE `my_table`
      (k1, k2, tmpk3)
      PRECEDING FILTER k1 = 1
      SET (
      k3 = tmpk3 + 1
      )
      WHERE k1 > k2
      )
      WITH BROKER bos
      (
      ...
      );

      STREAM LOAD

      curl--location-trusted

      -u user:passwd
      -H "columns: k1, k2, tmpk3, k3 = tmpk3 + 1"
      -H "where: k1 > k2"
      -T file.txt
      http://host:port/api/testDb/testTbl/_stream_load

      ROUTINE LOAD

      CREATE ROUTINE LOAD example_db.label1 ON my_tableCOLUMNS(k1, k2, tmpk3, k3 = tmpk3 +1),

      PRECEDING FILTER k1 = 1,
      WHERE k1 > k2
      ...

      以上导入方式都支持对源数据进行列映射、转换和过滤操作:

      • 前置过滤:对读取到的原始数据进行一次过滤。

      PRECEDING FILTER k1 = 1

      映射:定义源数据中的列。如果定义的列名和表中的列相同,则直接映射为表中的列。如果不同,则这个被定义的列可以用于之后的转换操作。如上面示例中的:

      (k1, k2, tmpk3)

      转换:将第一步中经过映射的列进行转换,可以使用内置表达式、函数、自定义函数进行转化,并重新映射到表中对应的列上。如上面示例中的:

      k3 = tmpk3 + 1

      后置过滤:对经过映射和转换后的列,通过表达式进行过滤。被过滤的数据行不会导入到系统中。如上面示例中的:

      WHERE k1 > k2


      列映射

      列映射的目的主要是描述导入文件中各个列的信息,相当于为源数据中的列定义名称。通过描述列映射关系,我们可以将于表中列顺序不同、列数量不同的源文件导入到 Doris 中。下面我们通过示例说明:

      假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):

      假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):

      列1列2列3列4
      1100beijing1.1
      2200shanghai1.2
      3300guangzhou1.3
      4\Nchongqing1.4

      注:\N
       在源文件中表示 null。


      1. 调整映射顺序

      假设表中有 k1,k2,k3,k4
       4列。我们希望的导入映射关系如下:

      列1 -> k1
      列2 -> k3
      列3 -> k2
      列4 -> k4

      则列映射的书写顺序应如下:

      (k1, k3, k2, k4)

      2. 源文件中的列数量多于表中的列

      假设表中有 k1,k2,k3
       3列。我们希望的导入映射关系如下:

      列1 -> k1
      列2 -> k3
      列3 -> k2

      则列映射的书写顺序应如下:

      (k1, k3, k2, tmpk4)

      其中 tmpk4
       为一个自定义的、表中不存在的列名。Doris 会忽略这个不存在的列名。

      3. 源文件中的列数量少于表中的列,使用默认值填充

      假设表中有 k1,k2,k3,k4,k5
       5列。我们希望的导入映射关系如下:

      列1 -> k1
      列2 -> k3
      列3 -> k2

      这里我们仅使用源文件中的前3列。k4,k5
       两列希望使用默认值填充。

      则列映射的书写顺序应如下:

      (k1, k3, k2)

      如果 k4,k5
       列有默认值,则会填充默认值。否则如果是 nullable
       的列,则会填充 null
       值。否则,导入作业会报错。



      列前置过滤

      前置过滤是对读取到的原始数据进行一次过滤。目前仅支持 BROKER LOAD 和 ROUTINE LOAD。

      前置过滤有以下应用场景:

      1. 转换前做过滤

      希望在列映射和转换前做过滤的场景。能够先行过滤掉部分不需要的数据。

      2.过滤列不存在于表中,仅作为过滤标识

      比如源数据中存储了多张表的数据(或者多张表的数据写入了同一个 Kafka 消息队列)。数据中每行有一列表名来标识该行数据属于哪个表。用户可以通过前置过滤条件来筛选对应的表数据进行导入。


      列转换

      列转换功能允许用户对源文件中列值进行变换。目前 Doris 支持使用绝大部分内置函数、用户自定义函数进行转换。

      注:自定义函数隶属于某一数据库下,在使用自定义函数进行转换时,需要用户对这个数据库有读权限。

      转换操作通常是和列映射一起定义的。即先对列进行映射,再进行转换。下面我们通过示例说明:

      假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):

      列1列2列3列4
      1100beijing1.1
      2200shanghai1.2
      3300guangzhou1.3
      4400chongqing1.4

      1. 将源文件中的列值经转换后导入表中

      假设表中有 k1,k2,k3,k4
       4列。我们希望的导入映射和转换关系如下:

      列1       -> k1
      列2 * 100 -> k3
      列3 -> k2
      列4 -> k4

      则列映射的书写顺序应如下:

      (k1, tmpk3, k2, k4, k3 = tmpk3 * 100)

      这里相当于我们将源文件中的第2列命名为 tmpk3
      ,同时指定表中 k3
       列的值为 tmpk3 * 100
      。最终表中的数据如下:

      k1k2k3k4
      1beijing100001.1
      2shanghai200001.2
      3guangzhou300001.3
      nullchongqing400001.4

      2. 通过 case when 函数,有条件的进行列转换。

      假设表中有 k1,k2,k3,k4
       4列。我们希望对于源数据中的 beijing, shanghai, guangzhou, chongqing
       分别转换为对应的地区id后导入:

      列1                  -> k1
      列2 -> k2
      列3 进行地区id转换后 -> k3
      列4 -> k4

      则列映射的书写顺序应如下:

      (k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)

      最终表中的数据如下:

      k1k2k3k4
      110011.1
      220021.2
      330031.3
      null40041.4

      3. 将源文件中的 null 值转换成 0 导入。同时也进行示例2中的地区id转换。

      假设表中有 k1,k2,k3,k4
       4列。在对地区id转换的同时,我们也希望对于源数据中 k1 列的 null 值转换成 0 导入:

      列1 如果为null 则转换成0   -> k1
      列2 -> k2
      列3 -> k3
      列4 -> k4

      则列映射的书写顺序应如下:

      (tmpk1, k2, tmpk3, k4, k1 = ifnull(tmpk1, 0), k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)

      最终表中的数据如下:

      k1k2k3k4
      110011.1
      220021.2
      330031.3
      040041.4

      列过滤

      经过列映射和转换后,我们可以通过过滤条件将不希望导入到Doris中的数据进行过滤。下面我们通过示例说明:

      假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):

      列1列2列3列4
      1100beijing1.1
      2200shanghai1.2
      3300guangzhou1.3
      4400chongqing1.4

      1. 在列映射和转换缺省的情况下,直接过滤

      假设表中有 k1,k2,k3,k4
       4列。我们可以在缺省列映射和转换的情况下,直接定义过滤条件。如我们希望只导入源文件中第4列为大于 1.2 的数据行,则过滤条件如下:

      where k4 > 1.2

      最终表中的数据如下:

      k1k2k3k4
      3300guangzhou1.3
      null400chongqing1.4

      缺省情况下,Doris 会按照顺序进行列映射,因此源文件中的第4列自动被映射到表中的 k4
       列。

      2. 对经过列转换的数据进行过滤

      假设表中有 k1,k2,k3,k4
       4列。在 列转换 示例中,我们将省份名称转换成了id。这里我们想过滤掉 id 为 3 的数据。则转换、过滤条件如下:

      (k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
      where k3 != 3

      最终表中的数据如下:

      k1k2k3k4
      110011.1
      220021.2
      null40041.4

      这里我们看到,执行过滤时的列值,为经过映射和转换后的最终列值,而不是原始数据。

      3. 多条件过滤

      假设表中有 k1,k2,k3,k4
       4列。我们想过滤掉 k1
       列为 null
       的数据,同时过滤掉 k4
       列小于 1.2 的数据,则过滤条件如下:

      where k1 is null and k4 < 1.2

      最终表中的数据如下:

      k1k2k3k4
      220021.2
      330031.3

      数据质量问题和过滤阈值

      导入作业中被处理的数据行可以分为如下三种:

      1. Filtered Rows

      因数据质量不合格而被过滤掉的数据。数据质量不合格包括类型错误、精度错误、字符串长度超长、文件列数不匹配等数据格式问题,以及因没有对应的分区而被过滤掉的数据行。

      2. Unselected Rows

      这部分为因 preceding filter
       或 where
       列过滤条件而被过滤掉的数据行。

      3. Loaded Rows

      被正确导入的数据行。

      Doris 的导入任务允许用户设置最大错误率(max_filter_ratio
      )。如果导入的数据的错误率低于阈值,则这些错误行将被忽略,其他正确的数据将被导入。

      错误率的计算方式为:

      #Filtered Rows / (#Filtered Rows + #Loaded Rows)

      也就是说 Unselected Rows
       不会参与错误率的计算。



      严格模式

      严格模式(strict_mode)为导入操作中的一个参数配置。该参数会影响某些数值的导入行为和最终导入的数据。

      本文档主要说明如何设置严格模式,以及严格模式产生的影响。


      如何设置

      严格模式默认情况下都为 False,即关闭状态。

      不同的导入方式设置严格模式的方式不尽相同。

      1. BROKER LOAD

      LOAD LABEL example_db.label1(

      DATA INFILE("bos://my_bucket/input/file.txt")
      INTO TABLE `my_table`
      COLUMNS TERMINATED BY ","
      )
      WITH BROKER bos
      (
      "bos_endpoint" = "http://bj.bcebos.com",
      "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
      "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
      )
      PROPERTIES
      (
      "strict_mode" = "true"
      )

      2. STREAM LOAD

      curl --location-trusted -u user:passwd \-H "strict_mode: true" \

      -T 1.txt \
      http://host:port/api/example_db/my_table/_stream_load

      3. ROUTINE LOAD

      CREATE ROUTINE LOAD example_db.test_job ON my_tablePROPERTIES

      (
      "strict_mode" = "true"
      )
      FROM KAFKA
      (
      "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
      "kafka_topic" = "my_topic"
      );

      4. INSERT

      通过会话变量设置:

      SET enable_insert_strict = true;
      INSERT
      INTO my_table ...;

      严格模式的作用

      严格模式的意思是,对于导入过程中的列类型转换进行严格过滤。

      严格过滤的策略如下:

      对于列类型转换来说,如果开启严格模式,则错误的数据将被过滤。这里的错误数据是指:原始数据并不为 null
      ,而在进行列类型转换后结果为 null
       的这一类数据。

      这里说指的 列类型转换
      ,并不包括用函数计算得出的 null
       值。

      对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,严格模式对其也不产生影响。例如:如果类型是 decimal(1,0)
      , 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。

      1. 以列类型为 TinyInt 来举例:

      原始数据类型原始数据举例转换为 TinyInt 后的值严格模式结果
      空值\NNULL开启或关闭NULL
      非空值"abc" or 2000NULL开启非法值(被过滤)
      非空值"abc"NULL关闭NULL
      非空值11开启或关闭正确导入

      说明:

      1. 表中的列允许导入空值

      2. abc
       及 2000
       在转换为 TinyInt 后,会因类型或精度问题变为 NULL。在严格模式开启的情况下,这类数据将会被过滤。而如果是关闭状态,则会导入 null

        以列类型为 Decimal(1,0) 举例

        原始数据类型原始数据举例转换为 Decimal 后的值严格模式结果
        空值\Nnull开启或关闭NULL
        非空值aaaNULL开启非法值(被过滤)
        非空值aaaNULL关闭NULL
        非空值1 or 101 or 10开启或关闭正确导入

        说明:
        1. 表中的列允许导入空值
        2.abc
         在转换为 Decimal 后,会因类型问题变为 NULL。在严格模式开启的情况下,这类数据将会被过滤。而如果是关闭状态,则会导入 null

        3.10
         虽然是一个超过范围的值,但是因为其类型符合 decimal 的要求,所以严格模式对其不产生影响。10
         最后会在其他导入处理流程中被过滤。但不会被严格模式过滤。

      word很大,各位忍一下,万字更新!!!!!!!!


      世间所谓

      美丽的遗憾

      大概便是如此



      END


      往期推荐

      Kafka详解日志结构

      Spark基础

      五分钟了解Palo Doris的索引原理及应用场景!

      Apache Doris 数据备份及恢复

      Palo Doris关系模型与数据划分你还不知道?




      文章转载自857Hub,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论