暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

87. doris如何导入本地数据?

大数据技能圈 2023-05-12
43

导入本地数据

本文档主要介绍如何从客户端导入本地的数据。

目前Doris支持两种从本地导入数据的模式:

1) Stream Load

2) MySql Load

1. Stream Load

Stream Load 用于将本地文件导入到 Doris 中。

不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。

该方式中涉及 HOST:PORT 应为 HTTP 协议端口。

• BE 的 HTTP 协议端口,默认为 8040。

• FE 的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 BE 所在机器。

本文文档我们以curl 命令为例演示如何进行数据导入。

文档最后,我们给出一个使用 Java 导入数据的代码示例

1.1 导入数据

Stream Load 的请求体如下:

    PUT /api/{db}/{table}/_stream_load

    创建一张表

    通过 CREATE TABLE 命令在demo创建一张表用于存储待导入的数据。具体的导入方式请查阅CREATE TABLE 命令手册。示例如下:

       CREATE TABLE IF NOT EXISTS load_local_file_test
      (
      id INT,
      age TINYINT,
      name VARCHAR(50)
      )
      unique key(id)
      DISTRIBUTED BY HASH(id) BUCKETS 3;

      导入数据

      执行以下 curl 命令导入本地文件:

        curl -u user:passwd -H "label:load_local_file_test" -T path/to/local/demo.txt http://host:port/api/demo/load_local_file_test/_stream_load

        – user:passwd 为在 Doris 中创建的用户。初始用户为 admin root,密码初始状态下为空。

        – host:port 为 BE 的 HTTP 协议端口,默认是 8040,可以在 Doris 集群 WEB UI页面查看。

        – label: 可以在 Header 中指定 Label 唯一标识这个导入任务。

        等待导入结果

        Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

         

          {
          "TxnId": 1003,
          "Label": "load_local_file_test",
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 1000000,
          "NumberLoadedRows": 1000000,
          "NumberFilteredRows": 1,
          "NumberUnselectedRows": 0,
          "LoadBytes": 40888898,
          "LoadTimeMs": 2144,
          "BeginTxnTimeMs": 1,
          "StreamLoadPutTimeMs": 2,
          "ReadDataTimeMs": 325,
          "WriteDataTimeMs": 1933,
          "CommitAndPublishTimeMs": 106,
          "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
          }

          – Status 字段状态为 Success 即表示导入成功。

          导入建议

          • Stream Load 只能导入本地文件。

          • 建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。

          1.2 Java 代码示例

          这里通过一个简单的 JAVA 示例来执行 Stream Load:

            package demo.doris;


            import org.apache.commons.codec.binary.Base64;
            import org.apache.http.HttpHeaders;
            import org.apache.http.client.methods.CloseableHttpResponse;
            import org.apache.http.client.methods.HttpPut;
            import org.apache.http.entity.FileEntity;
            import org.apache.http.impl.client.CloseableHttpClient;
            import org.apache.http.impl.client.DefaultRedirectStrategy;
            import org.apache.http.impl.client.HttpClientBuilder;
            import org.apache.http.impl.client.HttpClients;
            import org.apache.http.util.EntityUtils;


            import java.io.File;
            import java.io.IOException;
            import java.nio.charset.StandardCharsets;


            /*
            这是一个 Doris Stream Load 示例,需要依赖
            <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.13</version>
            </dependency>
            */
            public class DorisStreamLoader {
            //可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
            private final static String HOST = "your_host";
            private final static int PORT = 8040;
            private final static String DATABASE = "db1"; // 要导入的数据库
            private final static String TABLE = "tbl1"; // 要导入的表
            private final static String USER = "root"; // Doris 用户名
            private final static String PASSWD = ""; // Doris 密码
            private final static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径


            private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
            HOST, PORT, DATABASE, TABLE);


            private final static HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
            @Override
            protected boolean isRedirectable(String method) {
            // 如果连接目标是 FE,则需要处理 307 redirect。
            return true;
            }
            });


            public void load(File file) throws Exception {
            try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));


            // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
            put.setHeader("label","label1");
            put.setHeader("column_separator",",");


            // 设置导入文件。
            // 这里也可以使用 StringEntity 来传输任意数据。
            FileEntity entity = new FileEntity(file);
            put.setEntity(entity);


            try (CloseableHttpResponse response = client.execute(put)) {
            String loadResult = "";
            if (response.getEntity() != null) {
            loadResult = EntityUtils.toString(response.getEntity());
            }


            final int statusCode = response.getStatusLine().getStatusCode();
            if (statusCode != 200) {
            throw new IOException(
            String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
            }


            System.out.println("Get load result: " + loadResult);
            }
            }
            }


            private String basicAuthHeader(String username, String password) {
            final String tobeEncode = username + ":" + password;
            byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
            return "Basic " + new String(encoded);
            }


            public static void main(String[] args) throws Exception{
            DorisStreamLoader loader = new DorisStreamLoader();
            File file = new File(LOAD_FILE_NAME);
            loader.load(file);
            }
            }

            注意:这里 http client 的版本要是4.5.13

              <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
              <version>4.5.13</version>
              </dependency>

              2. MySql LOAD

               MySql LOAD样例

              2.1 导入数据

              创建一张表

              通过 CREATE TABLE 命令在demo创建一张表用于存储待导入的数据 

                CREATE TABLE IF NOT EXISTS load_local_file_test
                (
                id INT,
                age TINYINT,
                name VARCHAR(50)
                )
                unique key(id)
                DISTRIBUTED BY HASH(id) BUCKETS 3;

                导入数据
                在MySql客户端下执行以下 SQL 命令导入本地文件:

                  LOAD DATA
                  LOCAL
                  INFILE '/path/to/local/demo.txt'
                  INTO TABLE demo.load_local_file_test

                  等待导入结果

                  MySql Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

                    Query OK, 1 row affected (0.17 sec)
                    Records: 1 Deleted: 0 Skipped: 0 Warnings: 0

                    – 如果出现上述结果, 则表示导入成功。导入失败, 会抛出错误,并在客户端显示错误原因

                    导入建议

                    • MySql Load 只能导入本地文件(可以是客户端本地或者连接的FE节点本地), 而且支持CSV格式。

                    • 建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。


                    更多大数据相关内容请关注大数据技能圈公众号:

                    文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论