Doris底层实现了统一的流式导入框架。而在这个框架之上,Doris提供了非常丰富的导入方式,以满足不同数据源的数据导入需求。Doris的数据导入语句不仅包括常规的INSERT INTO,还包括Stream Load、Broker Load、Routine Load、Binlog Load、DataX写入、Spark Load、Flink Connector、Spark Connector、JDBC Connector、ODBC Connector等。这里重点介绍其中一些成熟并且常用的语句。
在Doris数据库中,INSERT INTO语句的使用方式和在MySQL等数据库中INSERT INTO语句的使用方式类似。但在Doris中,所有的数据写入都是一个独立的导入作业,所以这里将INSERT INTO作为一种导入方式介绍。
INSERT INTO语句主要有3个应用场景。
1)导入几条测试数据,验证Doris的功能,此时适合使用INSERT INTO VALUES语法;
2)在执行数据ETL操作时,将Doris内部表的查询结果写入另一张新表,此时适合使用INSERT INTO SELECT语法;
3)先创建外部表,然后通过INSERT INTO SELECT语句将外部表数据导入Doris内部表存储。
01
用法详解
INSERT INTO语句模板如下:

从模板中可以看到,INSERT INTO语句中除了表名是必须有的以外,PARTITION、WITH LABEL、COLUMN、VALUES、QUERY都是可选部分。那么,可选部分有什么特殊用途呢?
PARTITION语句用于指定数据插入的分区,分区名必须是table_name中存在的分区,多个分区名称之间用逗号分隔。如果指定目标分区,只导入符合目标分区的数据;如果没有指定目标分区,默认导入表的所有分区。在大多数情况下,我们不需要指定分区。
WITH LABEL用于为导入作业指定一个标识名,标识名必须是单数据库内唯一的命名。如果不指定,系统会自动生成一个标识名。通过指定标识名,Doris可以异步检查数据导入是否成功。注意,标识名不需要用单引号,且命名要求和表命名要求一致。
COLUMN是导入数据对应的表字段清单,字段名必须是目标表中已存在的。和其他数据库的INSERT INTO语句一样,当插入的字段清单和目标表列表字段一致时,可以省略字段清单;如果不一致,需要在表名后通过括号列出所有字段。如果导入的字段和目标字段不一致,则会隐式转换,转换失败的字段会有提示。
VALUES可以是一条记录,也可以是多条记录,当存在多条记录时,需要用逗号隔开。VALUES语句仅适用于Demo环景,完全不适合在生产环境中应用。
QUERY语句可以是一个非常复杂的查询,支持任意Doris支持的查询语句,支持嵌套查询和WITH查询。INSERT INTO语句本身是一个SQL命令,执行结果分为成功和失败两种。执行成功的截图如图5-1所示,执行失败的截图如图5-2所示。

图5-1 INSERT INTO执行成功截图

图5-2 INSERT INTO执行失败截图
执行成功会返回导入记录数、被过滤记录数、导入LABEL、数据提交状态、事务ID等。执行失败会显示失败的原因和失败数据的URL链接。(不是每次错误的URL链接都呈现。
在使用INSERT INTO SELECT时,数据量过大,容易造成导入任务超时,导入任务在设定的timeout时间内未完成会被系统取消,状态变成CANCELLED。超时时间可以通过两个参数调整:一个是FE配置参数insert_load_default_timeout_second,默认是1 h;一个是Session变量query_timeout,默认是5 min。query_timeout仅对当次数据库连接生效,对其他查询语句也生效。调整FE配置参数需要修改FE的配置文件并且重启FE节点;调整Session变量,仅需在导入语句前设置SET query_timeout=xxx;语句。
此外,我们还有一个比较重要的参数来控制导入数据的质量,即enable_insert_strict。enable_insert_strict参数值默认为true,表示如果有一条数据错误,则导入失败。我们可以通过SET enable_insert_strict=false;语句来调整该参数的值。
因为Doris的每个INSERT INTO语句都是一个独立的任务,所以每次INSERT INTO语句的执行都会产生一个新的数据版本。频繁小批量导入操作会产生过多的数据版本,而过多的数据版本会影响查询性能,所以并不建议频繁使用INSERT INTO语句导入数据。如果有流式导入或者小批量导入任务需求,我们可以使用Stream Load或者Routine Load语句进行导入。
02
应用举例
INSERT INTO语句用于向Doris OLAP表插入数据,常用于测试数据的导入和集群内部数据的关联写入。INSERT INTO语句的导入目标可以是一张表,也可以是一个分区
VALUES语句一般用于少量数据的初始化和Demo场景。一次插入单条记录的示例如下:

其中,第一条、第二条语句的效果是一样的,在不指定目标列时,将表中的列顺序作为默认目标列。第三条、第四条语句的效果是一样的:导入c2列的默认值。
一次插入多条记录的示例如下:

其中,第一条、第二条语句的效果一样,向test表中一次性导入两条数据;第三条、第四条语句效果一样,导入时c2列采用字段默认值。
实际项目应用中更多是通过其他方式导入数据,然后通过“INSERT INTO+查询”语句完成数据的合并、关联、映射等处理。查询语句支持WITH子句、复杂查询,例如:

查询语句也可以指定分区:
WITH LABEL还可以为本次导入操作指定一个Label,如果不指定,系统会自动生成一个Label。
正常执行的INSERT INTO语句会返回执行状态、插入记录数、执行时间和一个JSON格式字符串。

Query OK表示执行成功。2 rows affected表示插入2条记录。label表示用户指定的Label或自动生成的Label,是该INSERT INTO导入作业的标识。每个导入作业在数据库内部都有唯一的Label。status表示导入数据是否可见:如果可见,显示visible;如果不可见,显示committed。txnId表示该次导入作业对应的事务ID。err字段会显示一些其他非预期错误。
Stream Load是Doris用户常用的数据导入方式之一,是一种同步导入方式,允许用户通过HTTP将CSV格式或JSON格式的数据批量导入,并返回数据导入结果。用户可以直接通过HTTP请求的返回结果判断数据导入是否成功,也可以通过在客户端执行查询SQL命令来查询历史任务的结果。另外,Doris还针对Stream Load任务提供了日志审计功能,以通过审计日志对历史Stream Load任务进行审计。
01
执行原理
Doris中的FE节点收到用户发送的Stream Load请求(HTTP请求)后,会通过HTTP重定向将数据导入请求转发给某一个BE节点,该BE节点将作为本次Stream Load任务的Coordinator(协调者)。在这个过程中,接收请求的FE节点仅提供转发服务,由作为Coordinator的BE节点实际负责整个导入作业,比如负责向Master FE发送事务请求,从FE节点获取导入执行计划,接收实时数据,分发数据到其他Executor BE节点,以及数据导入结束后返回结果给用户。用户也可以将Stream Load请求直接提交给某一个指定的BE节点,然后该节点作为本次Stream Load任务的Coordinator。在Stream Load任务执行过程中,Executor BE节点负责将数据写入存储层。
Stream Load执行原理示意图如图5-3所示。在Coordinator BE节点中,一个线程池负责处理所有的HTTP请求,其中包括Stream Load请求。一次Stream Load任务拥有唯一一个Label。

图5-3 Stream Load执行原理示意图
Stream Load的详细执行流程如下。
1)用户提交Stream Load请求到FE节点,也可以直接提交Stream Load请求到BE节点
2)FE节点接收到用户提交的Stream Load请求后进行Header解析(其中包括解析数据导入的库、表、Label等信息),然后进行用户鉴权。如果Header解析成功并且用户鉴权通过,FE节点会将Stream Load请求转发到一个BE节点,由该BE节点作为本次Stream Load任务的Coordinator;否则,FE节点会直接向用户返回Stream Load失败的信息
3)Coordinator BE接收到Stream Load请求后进行Header解析和数据校验,其中包括解析数据的文件格式、消息体的大小、超时时间、用户鉴权信息等。如果Header解析和数据校验失败,Coordinator BE直接向用户返回Stream Load失败的信息。
4)Header解析和数据校验通过之后,Coordinator BE会通过Thrift RPC向FE节点发送Begin Transaction请求。
5)FE节点收到Coordinator BE发送的Begin Transaction请求之后,开启一个事务,并向Coordinator BE返回事务ID。
6)Coordinator BE收到事务ID后,通过Thrift RPC向FE节点发送获取导入计划的请求
7)FE节点收到Coordinator BE发送的获取导入计划请求之后,为Stream Load任务生成导入计划,并返回给Coordinator BE。
8)Coordinator BE接收到导入计划之后,开始执行导入计划,其中包括接收传来的实时数据以及将实时数据通过BRPC分发到其他Executor BE。
9)其他Executor BE接收到Coordinator BE分发的实时数据之后,将数据写入存储层。
10)Executor BE完成数据写入之后,Coordinator BE通过Thrift RPC向FE节点发送Commit Transaction请求。
11)FE节点收到Coordinator BE发送的Commit Transaction请求之后,对事务进行提交,并向Executor BE发送Publish Version任务,同时等待Executor BE执行Publish Version任务完成。
12)Executor BE异步执行Publish Version任务,并将数据导入时生成的Rowset变为可见数据版本。
13)当Publish Version任务正常完成或执行超时时,FE向Coordinator BE返回Commit Transaction和Publish Version任务结果。
14)Coordinator BE向用户返回Stream Load,执行最终结果。
02
用法详解
Stream Load用于向指定的Table导入数据。与普通Load的区别在于,这种导入方式是同步导入(即数据导入完成前,命令不会结束),整个数据导入工作完成后返给用户导入结果。这种导入方式能够保证导入任务的原子性,也就是说要么这批数据全部导入成功,要么全部导入失败。该操作会同时更新和与此基础表相关的Rollup Table数据。
Stream Load语句模板如下:

Stream Load支持HTTP Chunked与非Chunked上传两种方式,对于非Chunked上传方式,必须有Content-Length来标识上传内容长度,这样能保证数据的完整性。
1.参数解析
用户可以通过HTTP请求的Header部分来传入导入参数。
❑label:一次导入数据的标签,相同标签的数据无法多次导入。用户可以通过指定Label的方式来避免一份数据重复导入。Doris可保留30 min内最近成功导入数据的Label
❑column_separator:用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,并使用十六进制数作为列分隔符,如Hive文件的分隔符\x01,需要增加请求参数-H"column_separator:\x01",可以使用多个字符的组合作为列分隔符
❑line_delimiter:用于指定导入文件中的换行符,默认为\n。可以使用多个字符的组合作为换行符。
❑columns:用于指定导入文件中的列和Table中的列的对应关系。如果源文件中的数据与表中的列不一一对应,需要在请求中增加column参数进行数据转换。这里列举两种形式下的column字段:一种是根据源文件的实际列调整导入字段顺序;一种是基于已有列衍生新的列,语法为column_name=expression。示例如下。
例1:表中有3列c1、c2、c3,依次对应源文件中的3列c3、c2、c1;需要指定-H"columns: c3, c2, c1"。
例2:表中有3列c1、c2、c3,与源文件中前三列依次对应,但是多出最后一列,需要指定-H"columns:c1,c2,c3,xxx",最后一列随意指定名称占位即可。
例3:表中有3列year、month、day,源文件中只有一个时间列(格式为2018-06-01 01:02:03),可以指定-H"columns:col, year=year(col),month=month(col), day=day(col)"。
❑where:用于抽取部分数据。用户如果想将不需要的数据过滤掉,可以设定该选项来达到目的。例如只导入k1列等于20180601的数据,那么可以在导入数据时指定-H"where:k1=20180601"。
❑max_filter_ratio:最大容忍可过滤的数据比例,默认为零容忍。这里的过滤数据不包含通过where条件过滤掉的记录,主要是指数据类型不匹配导致导入失败的记录。
❑partitions:用于指定数据导入所涉及的分区。如果用户能够确定导入数据对应的分区,推荐指定该项,比如导入p1、p2分区,需要指定-H "partitions: p1,p2",不在指定分区的数据将被过滤掉。
❑timeout:用于指定数据导入的超时时间,单位为秒,默认是600 s,可设置超时范围为1~259200 s。
❑strict_mode:用于指定此次导入是否开启严格模式,默认为关闭。开启语句为-H"strict_mode:true"。
❑timezone:用于指定本次导入所使用的时区,默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。
❑exec_mem_limit:用于指定导入内存,默认为2 GB。
❑format:用于指定导入数据格式,默认是CSV,支持JSON格式。
❑merge_type:用于指定数据的合并类型,支持3种类型,APPEND、DELETE、MERGE。APPEND是默认类型,表示导入数据全部和现有数据合并;DELETE表示删除与导入数据中Key字段相同的所有行;MERGE类型需要与DELETE条件联合使用,表示满足DELETE条件的数据按照DELETE语义处理,其余的数据按照APPEND语义处理,示例-H"merge_type:MERGE"-H"delete:flag=1"。
❑delete:仅在MERGE类型中有意义,表示数据的删除条件。
❑send_batch_parallelism:整型,用于设置发送批处理数据的并行度,如果并行度超过BE配置中max_send_batch_parallelism_per_job的值,Coordinator BE将使用'max_send_batch_parallelism_per_job'的值。
❑load_to_single_tablet:布尔类型,默认值为false,值为true时表示支持一个任务只导入数据到对应分区的一个Tablet。该参数只允许导入带有Random分区的OLAP表时设置。其他导入场景也支持以上参数,只是表达方式不同,后文不再赘述。
2.返回结果
Stream Load执行完成后,Doris会以JSON格式展示导入结果,导入成功的截图如图5-4所示。


图5-4 Stream Load导入成功截图
Stream Load返回结果包含非常丰富的信息,以键值对形式展现,如表5-1所示。
表5-1 Stream Load返回结果解析


由于Doris数据导入后台复用的是相同的流式导入框架,因此我们也可以通过导入标签查询到以上信息,后文将不再赘述。
3.查看错误数据
数据导入时可能会出现格式不匹配或者不满足非空要求的情况,导致导入失败。Stream Load失败截图如图5-5所示。

图5-5 Stream Load失败截图
我们也可以通过SHOW语句查看Stream Load失败的详细信息:

查询结果数据如图5-6所示。

图5-6 查询导入失败的结果数据
在客户端和Doris节点没有网络限制的情况下,我们也可以直接通过URL文件查看错误数据,据此找到导入失败的原因。
03
应用举例
Stream Load的用法有很多,下面一一举例。
示例1:将本地文件testData中的数据导入数据库testDb中的testTbl表,指定超时时间为100 s。

示例2:将本地文件testData中的数据导入数据库testDb中的testTbl表,只导入k1等于20180601的数据。

示例3:将本地文件testData中的数据导入数据库testDb中的testTbl表,允许20%的错误率。

示例4:将本地文件testData中的数据导入数据库testDb中testTbl的表,允许20%的错误率,并且调整列顺序。

示例5:导入含有HLL列或BITMAP列的表,其中v1和v3列基于导入的字段衍生而来,v2和v4列填充空值。

示例6:对导入数据进行严格模式过滤,并设置时区为Asia/Shanghai。

示例7:四种JSON格式数据导入。
假设testTbl表结构为:

1)单行JSON格式数据如下:

2)为了提高吞吐率,Doris支持一次性导入多行JSON格式数据,每行为一个JSON对象,默认使用\n为换行符,将read_json_by_line设置为true。多行JSON格式数据如下:

3)常规字符串格式数据如下:

针对这类数据,我们需要在指定jsonpaths的同时设置strip_outer_array为true进行精准导入,同时只导入category、author、price三个属性,命令如下:

4)根节点的JSON格式数据如下:

针对这类数据,我们需要通过指定json_root和jsonpaths进行精准导入,同样只导入category、author、price三个属性,命令如下:

示例8:删除与导入数据key相同的数据。

示例9:导入数据到含有sequence列的UNIQUE_KEYS表。

在Broker Load方式下,通过部署的Broker程序,Doris可读取对应数据源(如HDFS、S3)中的数据,利用自身的计算资源对数据进行预处理和导入。这是一种异步导入方式,用户需要通过MySQL协议创建Broker Load任务,并通过查看导入命令检查导入结果。
01
执行原理
用户在提交导入任务后,FE会生成对应的分片导入计划并根据目前BE的个数和文件的大小,将分片导入计划分给多个BE执行,每个BE执行一部分导入任务。在执行过程中,BE会通过Broker拉取数据,在对数据进行预处理之后将数据导入系统。所有BE均完成导入任务后,FE最终判断导入是否成功。图5-7展示了Broker Load执行的主要流程。

图5-7 Broker Load执行的主要流程
Broker Load详细执行流程如下。
1)用户创建Broker Load任务,提交给FE。
2)FE根据文件存储大小和文件个数,制定数据分片导入计划。
3)FE按照计划指挥多个BE节点导入指定的文件或者分片数据。
4)BE通过Broker拉取数据,写入磁盘。
5)BE完成数据导入后反馈消息给FE。
6)FE继续下发任务给BE,直到所有文件数据都导入完成。
7)FE收到所有文件数据导入完成的消息后,反馈给用户。
02
用法详解
Broker Load通过随Doris集群一同部署的Broker进程访问数据源中的数据,然后进行数据导入。不同的数据源需要部署不同的Broker进程。我们可以通过SHOW BROKER命令查看已经部署的Broker进程。
Broker Load语法模板如下:

通过HELP BROKER LOAD语句,我们可以查看Broker Load作业的详细语法。这里主要介绍命令中的参数和注意事项。
数据描述类参数:主要指的是语句中data_desc部分的参数。每组data_desc表述了本次导入涉及的数据源地址、ETL函数、目标表及分区等信息。Broker Load支持一个导入任务导入多张表,每张表需要一个data_desc来指定属于该表的数据源文件地址,可以用多个file_path来指定导入同一个表的多个文件。Broker Load保证了单次导入的多张表的事务性,即要么都成功,要么都失败。
下面是对数据描述类部分参数的说明。
❑file_path:文件路径,可以匹配到一个文件,也可以用*通配符匹配到某个目录下的所有文件。可以使用的通配符有?、*、[]、{}、^。例如hdfs://hdfs_host:hdfs_port/user/data/tablename/*/*可以匹配tablename下所有分区内的所有文件,hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104*/*可以匹配tablename下2021年4月创建的分区内的所有文件。
❑negative:在data_desc中还可以设置数据取反导入。这个功能适用的场景是当数据表中聚合列的类型都为SUM时,如果希望撤销某一批导入数据,可以通过negative参数导入一批数据,Doris会自动为这批数据在聚合列取反,以达到删除该批数据的目的。
❑partition:在data_desc中可以指定待导入表的Partition信息。如果待导入数据不属于指定的Partition,则不会被导入。同时,不在指定Partition中的数据会被认为是“错误数据”。对于不想要导入、也不想要记录为“错误数据”的数据,可以使用where predicate参数来过滤。
❑column separator:用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,需要加\x作为前缀,使用十六进制来表示分隔符。
❑file type:用于指定导入文件的类型,例如Parquet、ORC、CSV,默认为CSV。Parquet类型文件可以通过文件后缀名.parquet或者.parq判断。
❑set column mapping:data_desc中的SET语句负责设置列函数变换。这里的列函数变换支持所有查询的等值表达式变换。
❑where predicate:data_desc中的WHERE语句负责过滤已经完成转换的数据。被过滤的数据不会进入错误容忍率统计。如果多个data_desc中声明了关于同一张表的多个条件,我们可用AND语义合并这些条件。
导入作业参数:指Broker Load任务创建语句中属于PROPERTIES部分的参数,作用于整个导入作业。下面对部分导入作业参数进行详细说明。
❑Timeout:导入作业的超时时间(单位为s)。用户可以在PROPERTIES中自行设置每个导入任务的超时时间。导入任务若在设定的时间内未完成,那么会被系统取消,状态变成CANCELLED。Broker Load任务的默认超时时间为4 h。
注意:通常情况下,用户不需要手动设置导入任务的超时时间。若在默认超时时间内无法完成导入,用户可以手动设置超时时间。
推荐超时时间的计算方式为:
超时时间>[(总文件大小(MB)×待导入的表及相关Rollup表的个数]/(10×导入并发数)
其中,公式中的10为目前BE节点导入任务的默认限速:10MB/s。
例如:对于1GB待导入数据文件,待导入表包含2个Rollup表,当前的导入并发数为3,则超时时间的最小值为(1×1024×3)/(10×3)≈102 s。
由于每个Doris集群环境不同且集群并发查询任务不同,因此Doris集群的最慢导入速度需要用户根据历史导入任务速度进行推测。
❑max_filter_ratio:导入任务的最大错误容忍率,默认为零容忍,取值范围是0~1。当导入错误率超过该值时,导入失败。如果用户希望忽略导入错误的行,我们可以通过设置该参数大于0来实现。
该参数计算公式为:
max_filter_ratio=dpp.abnorm.ALL/(dpp.abnorm.ALL+dpp.norm.ALL)
其中,dpp.abnorm.ALL表示数据质量不合格的行数,质量不合格是指类型不匹配、列数不匹配、长度不匹配等。dpp.norm.ALL表示导入正确数据的条数,可以通过SHOW LOAD命令查询。
原始文件数据的行数=dpp.abnorm.ALL+dpp.norm.ALL
❑exec_mem_limit:导入内存限制,默认是2GB。
❑strict_mode:Broker Load支持开启strict模式,开启方式为在PROPERTIES部分增加strict_mode=true,默认strict模式为关闭。strict模式的意思是对导入过程中的列类型进行严格检查,不符合目标数据类型的记录将被过滤。
严格过滤的策略如下。
对于列类型转换来说,如果strict_mode为true,错误数据将被过滤。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值。但一些场景除外,具体如下。
1)导入的某列由函数生成时,strict_mode对其不产生影响,即产生空值也不会被过滤。
2)导入的某列类型包含范围限制时,如果原始数据能正常通过类型转换,但无法通过范围限制,strict_mode对其也不产生影响。例如:如果类型是decimal(1, 0),原始数据为10,则该数据可以通过类型转换,但不在列声明的范围内。strict_mode对这种数据不产生影响。
03
应用举例
下面列举3个Stream Load任务案例。
1. Apache HDFS数据导入
简单的HDFS数据导入如下:


2.利用Python代码实现标准化导入
Broker Load是异步模式,用户可通过SHOW LOAD FROM dbname WHERE label =‘${label_name}’;命令查看导入进度。在具体项目中,由于导入任务很多,我们可使用Python代码来实现不同表的标准化导入。使用Python代码实现标准化导入的前提是源表和目标表字段顺序保持完全一致,并且类型匹配正确。(如果字段顺序不一致,则需要获取Hive的表字段顺序,这里为了简化逻辑,要求字段顺序完全一致。)
利用Python代码实现标准化导入的关键代码如下:


3.对象存储数据导入
除了HDFS,目前最常用的数据存储是对象存储。对象存储也被称为“面向对象的存储”,英文是Object-based Storage,被很多云厂商称为“云存储”。不同的云厂商对它有不同的英文命名,例如阿里云的OSS、华为云的OBS、腾讯云的COS、七牛的Kodo、百度的BOS、网易的NOS等。
百度BOS的导入语句如下:

导入成功的截图如图5-8所示。

Routine Load是一种例行导入方式。Doris通过这种方式支持从Kafka持续不断地导入数据,并且支持通过SQL控制导入任务的暂停、重启、停止。
Routine Load在数据仓库中主要有两种应用场景。
1)接口数据导入。由于批处理存在大量重复抽取数据的情况,越来越多的交易系统采用Binlog或者直接提供接口更新数据到Kafka的方式来完成数据的对接。针对Binlog或者Kafka消息队列数据,批处理程序是无法抽取的,需要采用流式数据写入方式。
2)实时数据导入。根据Lambda架构,实时数据通过Kafka对接以后,继续经由Flink加工,加工完的数据返回Kafka,然后由Routine Load加载到Doris数据库,即可直接供数据分析应用。
01
执行原理
在介绍Routine Load执行原理之前,我们先解释几个名词。
❑RoutineLoadJob:用户提交的一个例行导入任务。
❑JobScheduler:例行导入任务调度器,用于调度和拆分一个RoutineLoadJob为多个Task。
❑Task:RoutineLoadJob被JobScheduler根据规则拆分的子任务。
❑TaskScheduler:任务调度器,用于调度Task的执行。
Routine Load执行流程如图5-9所示。

图5-9 Routine Load执行流程
1)用户通过支持MySQL协议的客户端向FE提交Routine Load任务。
2)FE通过JobScheduler将导入任务拆分成若干个Task,每个Task负责导入指定的一部分数据。
3)每个Task被TaskScheduler分配到指定的BE上执行。在BE上,一个Task被视为一个普通的导入任务,基于Stream Load机制执行任务。
4)BE导入任务完成后,向FE汇报。
5)FE中的JobScheduler根据汇报结果,继续生成新的Task,或者对失败的Task进行重试。
6)FE不断产生新的Task,以完成数据不间断导入任务。
Routine Load是持续运行的,不可避免会与在其他数据库中的操作发生冲突,主要有以下几种情况。
(1)例行导入作业和删除表操作的关系
例行导入作业不会阻塞Schema变更和Rollup操作。但是,如果Schema变更完成后,列映射关系无法匹配,会导致错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显式指定列映射关系,以及增加Nullable列或带Default值的列来减少这类问题发生。删除表分区可能会导致导入数据无法找到对应分区,进而引发作业暂停。
(2)例行导入作业和其他导入作业的关系
例行导入作业和其他导入作业没有冲突。当执行DELETE操作时,对应表分区不能有任何正在执行的导入任务,所以在执行DELETE操作前,需要先暂停例行导入作业,并等待已下发的Task全部完成后,再执行DELETE操作。
(3)例行导入作业和删除Database操作的关系
当例行导入作业对应的Database被删除后,作业会自动取消。
(4)例行导入作业和Kafka_Topic的关系
当例行导入声明的Kafka_Topic在Kafka集群中不存在时,如果用户方的Kafka集群中的broker进程设置了auto.create.topics.enable=true,Kafka_Topic会被自动创建。自动创建的Partition个数由用户方的Kafka集群中的broker进程配置的num.partitions决定的。例行导入作业会不断读取该Kafka_Topic的数据。如果Kafka集群中的broker进程设置了auto.create.topics.enable=false,Kafka_Topic不会被自动创建,例行导入作业会在没有读取任何数据之前就被暂停,状态为PAUSED。
(5)例行导入作业和Kafka集群
创建Routine Load任务时指定的Broker列表必须能被Doris服务访问。Kafka集群中如果配置了advertised.listeners, advertised.listeners必须能被Doris服务访问。
02
用法详解
Routine Load语句模板如下:

1)[db.]job_name:表示导入作业的名称,在同一个数据库内,相同名称只能有一个Job在运行。
2)tbl_name:用于指定需要导入的表的名称。
3)merge_type:用于指定数据合并类型,默认为APPEND,表示导入操作是追加写入数据。MERGE和DELETE类型仅适用于Unique模型表。其中,MERGE类型需要配合DELETE ON语句使用,以标注Delete Flag列;而DELETE类型表示根据导入数据的Key值删除目标表数据。
4)load_properties:用于描述导入数据,包含以下参数。
❑column_separator:用于指定列分隔符,默认为\t。
❑columns_mapping:用于指定文件中的列和表中的列的映射关系,以及各种列转换等。
❑columns_mappin:主要有两种情况:映射列,如目标表有三列col1、col2、col3,源数据有4列,其中第1、2、4列分别对应col2、col1、col3,可写为COLUMNS(col2, col1, temp, col3),其中temp列不存在,用于跳过源数据中的第三列;衍生列,除了直接读取源数据的列内容之外,Doris还提供对列数据的加工服务。假设目标表后加入第四列col4,可写为COLUMNS(col2, col1, temp,col3, col4=col1+col2)。
❑preceding_filter:用于过滤原始数据。
❑where_predicates:用于根据条件对导入的数据进行过滤,主要用于经过映射和转换后的列,可通过表达式进行过滤,例如WHERE k1>100 and k2=1000。
❑partitions:用于指定导入目标表的哪些分区,如果不指定,系统会自动导入数据到对应目标表的分区中,例如PARTITION(p1, p2, p3)。
❑DELETE ON:配合MEREGE类型一起使用,仅针对Unique模型表,用于指定导入数据中表示Delete Flag的列和计算关系。
❑ORDER BY:仅针对Unique模型表,用于指定导入数据中表示Sequence Col的列,主要用于保证导入数据的顺序。
5)job_properties:用于指定例行导入作业的通用参数,主要包含以下参数。
❑desired_concurrent_number:用于设置期望的并发度。一个例行导入作业会被分成多个子任务。这个参数可指定一个作业最多有多少任务可以同时执行,值必须大于0,默认为3。该并发度并不是实际的并发度,实际的并发度由集群的节点数、负载情况,以及数据源决定。
❑子任务导入参数。子任务导入参数有max_batch_interval、max_batch_rows、max_batch_size三个,分别表示每个子任务最大执行时间(单位是s)、最多读取行数、最多读取字节数(单位是B)。这三个参数用于控制一个子任务的执行时间和数量处理量。当达到其中任意一个参数的阈值,子任务结束等待并进行数据写入。
❑max_error_number:用于指定采样窗口内允许的最大错误行数,必须大于或等于0,默认是0,即不允许有错误行。
❑strict_mode:用于指定是否开启严格模式,默认为关闭。如果开启,当非空原始数据的列类型变换结果为NULL,该数据会被过滤。
❑timezone:用于指定导入作业所使用的时区,默认使用Session变量的timezone参数。该参数会影响所有导入作业涉及的和时区有关的函数的结果。
❑format:用于指定导入数据格式,默认是CSV,支持JSON格式。
❑Jsonpaths:当导入数据格式为JSON时,用户可以通过jsonpaths指定抽取JSON格式数据中的字段,例如-H“jsonpaths:[\”$.k2\”,\”$.k1\”]”。
❑strip_outer_array:默认值为false,当导入数据格式为JSON时,strip_outer_array为true表示JSON格式数据以数组的形式展现,数组中的每一个元素将被视为一行数据。
❑json_root:当导入数据格式为JSON时,用户可以通过json_root指定JSON格式数据的根节点。Doris可通过json_root抽取根节点的元素进行解析。
6)FROM data_source [data_source_properties]:用于指定数据源的类型,当前Doris只支持Kafka数据源。data_source_properties支持如下数据源属性。
❑kafka_broker_list:用于指定Kafka的Broker连接,格式为ip:host。多个Broker之间以逗号分隔,例如“kafka_broker_list”=“broker1:9092,broker2:9092”。
❑kafka_topic:用于指定要订阅的Kafka Topic,例如“kafka_topic”=“topic1”。
❑kafka_partitions和kafka_offsets:用于指定需要订阅的Kafka Partition,以及每个Partition对应的起始Offset。Offset可以是大于0的整数(必须是确实存在的Offset值)或者具体时间(例如“2022-06-22 11:00:00”),也可以是OFFSET_BEGINNING(从有数据的位置开始订阅)或者OFFSET_END(从末尾开始订阅)。如果没有指定,Doris默认OFFSET_END开始订阅Topic下的所有Partition。
data_source_properties还支持用户自定义Kafka参数,功能等同于Kafka Shell中的--property参数。当参数的Value为文件时,Value前需要加上关键词FILE。
这里要用到一个新功能——CREATE FILE创建文件命令,举例如下:

然后在data_source_properties中使用该文件:

更多Doris支持的Kafka自定义参数,请参阅librdkafka官方CONFIGURATION文档中Client端的配置项。
03
国际载人航天日
案例一:为example_db的example_tbl创建一个名为test1的Kafka例行导入任务,流数据为逗号分隔的文本字符串,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅。


案例二:通过SSL认证方式,从Kafka集群导入数据,同时设置client.id名称,导入任务为非严格模式,时区为Asia/Shanghai。

案例三:导入JSON格式数据,并通过jsonpaths抽取字段,并指定JSON格式数据的根节点。


案例四:加载Kafka集群中的Binlog数据,记录数据操作类型,不进行数据删减。

案例五:加载Kafka集群中的Binlog数据,根据数据操作类型进行新增和删除操作(更新操作可拆分成为删除和新增两个操作)。
首先目标表必须是Unique模型表,并且设置目标表支持批量删除:

然后创建导入任务:


Binlog Load提供了实时同步MySQL数据库操作的功能,拓展了数据实时同步应用场景。Binlog Load可以绕过Kafka,直接读取MySQL的CDC(Change Data Capture,数据变更捕获)日志,实时同步MySQL数据库的增、删、改操作。Doris 0.15及以上版本具有该功能。目前,Binlog Load任务只支持对接Canal,从Canal Server上获取解析后的Binlog数据并导入Doris。
01
基本原理
Binlog Load以Canal为中间媒介,让Canal伪装成一个从节点去获取MySQL主节点上的Binlog数据并解析,再由Doris获取Canal上解析后的数据,总体执行流程如图5-10所示。
Binlog Load具体执行流程如下。
1)用户向FE提交数据同步作业。
2)FE为每个数据同步作业启动一个Canal Client,以向Canal Server端订阅并获取数据。
3)Canal Client中的Receiver负责通过Get命令接收数据,每获取到一个数据Batch,都会由Consumer根据对应表分发到不同的Channel,每个Channel都会为此数据Batch产生一个发送数据的子任务Task。在FE上,一个Task包含分发到当前Channel的同一个Batch的数据。
4)Channel控制着单个表事务的开始、提交、终止。一个事务周期内,一般会从Consumer获取到多个Batch的数据,因此会产生多个向BE发送数据的子任务Task。在提交事务成功前,这些Ta s k不会实际生效。
5)满足一定条件时(比如超过一定时间、达到提交最大数据大小),Consumer将会阻塞并通知各个Channel提交事务。
6)当且仅当所有Channel都提交成功,Canal Client才会通过Ack命令通知Canal Server继续获取并消费数据。7)如果有任意Channel提交失败,Doris将会重新从上一次消费成功的位置获取数据并再次提交。(已提交成功的Channel不会再次提交,以保证幂等性。)
7)如果有任意Channel提交失败,Doris将会重新从上一次消费成功的位置获取数据并再次提交。(已提交成功的Channel不会再次提交,以保证幂等性。)
8)整个数据同步作业中,FE通过以上流程不断从Canal获取数据并提交到BE,来完成数据同步。
9)数据由Coordinator BE接收并分发给对应的BE存储。

图5-10 Binlog Load执行流程
02
用法详解
创建Binlog Load任务使用的是CREATE SYNC命令,表明Doris不仅仅只是想实现BinLog数据同步,后续还会支持其他数据库的数据同步。
创建同步任务之前,首先在fe.conf里配置enable_create_sync_job=true(默认值是false,表示不启用),否则不能创建同步任务。
CREATE SYNC语法如下:

其中,job_name表示同步作业名称,是作业在当前数据库内的唯一标识。相同job_name的作业只能有一个在运行。channel_desc表示作业下的数据通道,用来描述MySQL源表到Doris目标表的映射关系。binlog_desc用来描述远端数据源,目前仅支持Canal数据源,应用语法如下:

针对Canal数据源,binlog_desc有以下参数。
1)canal.server.ip:Canal Server的地址。
2)canal.server.port:Canal Server的端口。
3)canal.destination:Instance的标识。
4)canal.batchSize:获取的Batch容量的最大值,默认为8192。
5)canal.username:Instance的用户名。
6)canal.password:Instance的密码。
7)canal.debug:可选,设置为true时,将Batch和每一行数据的详细信息都打印出来。
Binlog Load和其他导入任务不同的是,Binlog Load既可支持导入一个目标表,也可支持导入多个目标表。也就是说,在channel_desc中配置多条记录可以实现一个Binlog Load任务同步多张MySQL表的数据到多个Doris内部表。
例如,为test_db的多张表创建一个名为job1的数据同步作业,一一对应多张MySQL源表,并显式地指定列映射。


待Binlog Load任务创建成功以后,我们可以通过SHOW SYNC JOB命令查看数据同步作业状态。
03
应用举例
首先安装和配置MySQL数据库,使其记录CDC日志,其次安装配置、启动Canal,最后在Doris中对接Binlog数据。接下来我们就详细展开介绍这个过程。
1)安装和配置MySQL数据库。安装MySQL有两种:一种是Docker模式安装,另一种是在Linux系统上安装。网上相关资料比较多,这里不再赘述。
安装好MySQL数据库以后,需要开启Binlog配置。进入Docker容器或者物理机上修改/etc/my.cnf文件,在[mysqld]下添加以下内容:

通过show variables like '%log_bin%'命令检查配置是否起作用,以及Binlog配置是否开启,如图5-11所示。

图5-11 查看MySQL数据库的Binlog配置
重启MySQL服务:

创建MySQL表:

2)安装和配置Canal。
首先下载Canal软件并解压到指定目录:

其次在conf文件夹下新建目录并重命名,作为Instance的根目录一般使用系统简称或者数据库名作为conf文件夹名,例如这里使用的是我的数据库名:demodb。
具体的Canal参数配置可以参考Canal官方文档(地址为https://github.com/alibaba/canal/wiki/QuickStart)。下面给出一个精简的Canal配置模板(去掉了大量无效配置行):


3)启动Canal。
接下来是启动Canal,启动目录为sh bin/startup.sh。
需要注意的是,canal.instance.user/passwd在Canal 1.1.5版本的canal.properties里加上以下两个配置:

登录默认密码为canal/canal,canal.passwd的密码值可以通过select password("xxx")来获取。
启动后,验证Canal是否启动成功,启动成功的截图如图5-12所示。


图5-12 Canal启动成功截图
4)创建Doris目标表。
用户需要先在Doris端创建好与MySQL端对应的目标表。Binlog Load只支持Unique模型的目标表,且必须激活目标表的Batch Delete功能。


5)开始同步MySQL表中数据到Doris。
创建Binlog Load任务,从MySQL的Binlog日志中读取数据并写入Doris表。

6)查看同步任务状态和目标表中的数据。
用命令SHOW SYNC JOB查看Binlog Load同步任务状态,截图如图5-13所示。

图5-13 查看Binlog Load同步任务状态
查看目标表中的数据,截图如图5-14所示。

图5-14 查看目标表中的数据
7)删除数据并查看目标表中的变化。
在MySQL表中删除数据,然后查看目标表中的变化。

8)修改数据并查看目标表中的变化。
在MySQL表里修改数据,然后查看目标表中的变化。

多表同步的Binlog Load任务创建语句如下:

虽然以上这些导入方式在某些方面都有广泛应用,但是在实际的数据仓库和数据中台项目中,最常用的方式还是DataX。DataX支持广泛的数据源,非常适合离线数据同步。
为了更好地扩展Doris生态,为Doris用户提供更方便的数据导入方式,Apache社区开发扩展了DataX DorisWriter,以便使用DataX工具进行数据接入。
此外,Doris针对Apache SeaTunnel(原Waterdrop)提供了连接器。Apache SeaTunnel是开源应用中仅次于DataX和Kettle的数据同步工具。
01
DataX执行原理
DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间数据稳定高效的同步。DataX作为离线数据同步框架起到中转作用,将不同来源数据的读取功能抽象为Reader插件,将向不同目标写入数据的功能抽象成Writer插件。理论上,DataX框架可以支持任意类型数据源的数据同步。同时,DataX插件体系作为一套生态系统,新数据源一旦接入即可实现和现有数据源互通。数据同步旧模式和新模式对比如图5-15所示。

图5-15 数据同步旧模式和新模式对比
DataX作为离线数据同步框架,采用“FrameWork+Plugin”架构构建,纳入数据源读取和写入插件,工作流程示意图如图5-16所示。

图5-16 DataX工作流程示意图
1)Reader插件:数据采集模块,负责采集数据源中的数据,将数据发送给FrameWork。
2)Writer插件:数据写入模块,负责不断向FrameWork获取数据,并将数据写入目的端。
3)FrameWork:用于连接Reader和Writer插件,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。目前,DataX已经有比较全面的插件体系,已接入主流的RDBMS数据库、NoSQL数据库、大数据计算系统等,如表5-2所示。
表5-2 DataX支持读写的数据库列表

02
DataX DorisWriter插件
前文已经说到,Doris FE兼容MySQL协议,这就意味着大部分场景,例如数据查询、数据导出、数据对象创建以及少量数据插入,只需要复用MySQL的接口即可。但是需要一次性同步万级别数据时,INSERT INTO方式就不适用了。为了弥补这个缺陷,Doris社区开发了DataX DorisWriter插件,以批量导入数据时直接和BE节点进行通信,并行写入数据。
根据官方文档的介绍,DataX DorisWriter复用了Stream Load方式,将Reader插件读取的数据进行缓存,拼接成JSON文本,然后批量导入Doris。
DataX DorisWriter于2021年9月底发布,在GitHub上可获取源码,地址为https://github.com/apache/incubator-doris/tree/master/extension/DataX。这里直接下载张家锋老师编译的DataX Doris Writer安装包,解压后文件如图5-17所示。

图5-17 DataX DorisWriter安装包解压后文件
官方提供了一份Doris DorisWriter配置文件,内容如下:


DataX DorisWriter配置参数及其解释如表5-3所示。
表5-3 DataX DorisWriter配置信息


03
应用举例
首先,创建MySQL表pos_order,并插入1万条记录。

然后,对照pos_order创建Doris表,命名为ods_pos_order,建表语句如下:

接着,上传DataX安装包并解压:

接着,创建同步配置文件,内容如下:


最后,执行DataX命令python data/datax/bin/datax.py mysql2doris.json,执行结果如图5-18所示。

图5-18 DataX执行结果截图
Spark Load通过外部的Spark资源实现对导入数据的预处理,提高Doris数据导入性能并且节省Doris集群计算资源,主要用于初次迁移、大量数据导入Doris(数据量可到TB级别)。Spark Load的核心原理是复用Broker Load的功能,增加了Spark预处理数据功能。
Spark Load是一种异步导入方式,需要用户通过MySQL协议创建Spark类型导入任务,支持通过Show Load命令查看导入结果。
Spark Load主要涉及的专业名词如下。
❑Spark ETL:在导入流程中主要负责数据ETL工作,包括全局字典构建(BITMAP类型)、分区、排序、聚合等。
❑Broker:独立的无状态进程,封装了文件系统接口,提供读取远端存储系统中文件的能力。
❑全局字典:保存了从原始值到编码值映射的数据结构,原始值可以是任意数据类型,编码后的值为整型,主要应用于精确去重预计算场景。
01
执行原理
用户通过FE节点提交Spark Load任务,Spark集群读取数据并写入HDFS,FE通知BE通过Broker读取HDFS数据,完成数据导入,具体的执行流程如图5-19所示。

图5-19 Spark Load执行流程
Spark Load详细执行流程如下。
1)用户创建Spark Load任务。
2)FE调度并提交ETL任务到Spark集群执行。
3)Spark集群执行ETL任务,完成对导入数据的预处理,包括全局字典构建(BITMAP类型)、分区、排序、聚合等。
4)ETL任务完成后,FE获取预处理后的每个分片的数据路径,并调度相关的BE推送任务。
5)BE通过Broker读取数据,并将数据格式转化为Doris存储格式。
6)FE调度将最新导入的数据设置为有效版本,完成导入任务。
02
用法详解
在Doris中,Spark作为一种外部资源被用来完成ETL任务。未来,Doris可能还会引入其他外部资源,如Spark、GPU用于查询,HDFS、S3用于外部存储,MapReduce用于ETL等,因此Doris引入Resource Management来管理使用的这些外部资源。
提交Spark Load任务之前,用户需要配置执行ETL任务的Spark外部资源,语法如下:

创建Spark外部资源的常用PROPERTIES如下:
❑type:资源类型,必填,目前仅支持Spark。
❑spark.master:必填,目前支持Yarn和standa lone两种模式。
❑spark.submit.deployMode:Spark部署模式,必填,支持Cluster、Client两种。
❑spark.hadoop.yarn.resourcemanager.address:Master为Yarn时必填。
❑spark.hadoop.fs.defaultFS:Master为Yarn时必填。
❑working_dir:执行ETL时使用的临时数据存放目录。
❑hadoop.security.authentication:指定认证方式为Kerberos。
❑kerberos_principal:指定Kerberos的用户主体。
❑kerberos_keytab:指定kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件的绝对路径,并且可以被Broker进程访问。
❑kerberos_keytab_content:指定kerberos中keytab文件内容经过Base64编码之后的内容,和kerberos_keytab配置二选一即可。
❑broker:Broker名称,Spark作为ETL资源使用时必填,需要通过ALTER SYSTEM ADD BROKER命令提前完成配置。
❑broker.property_key:Broker读取ETL操作生成的中间文件时需要指定的认证信息等。
创建Spark Load任务,语法如下:

Spark Load任务创建语法中主要包含以下参数。
❑Label:导入任务的标识。每个导入任务都有在单数据库内部唯一的标识,具体使用规则与Broker Load方式中的Label使用规则一样。
❑数据描述类参数:目前支持的数据源有CSV和Hive表,其他使用规则与Broker Load方式中的一致。
❑导入作业参数:主要指Spark Load任务创建语句中属于opt_properties部分的参数,作用于整个导入作业。规则与Broker Load方式中的一致。
❑Spark资源参数:Spark资源需要提前配置到Doris系统并且赋予用户USAGE_PRIV权限后才能使用。
03
应用举例
根据前面的用法介绍,我们知道了Spark Load执行流程,即先创建Spark资源对象,然后分配Spark资源使用权限,最后创建Spark Load任务。
第一步:创建Spark资源对象。Spark部署有Yarn集群模式、Standalone部署模式。在两种模式下,Spark资源对象配置参数略有不同。


第二步,分配Spark资源使用权限。普通账户只能看到USAGE_PRIV权限的Spark资源,root和admin账户可以看到所有的Spark资源。资源权限通过GRANT REVOKE来管理,目前Doris仅支持USAGE_PRIV使用权限。我们可以将USAGE_PRIV权限赋予某个用户或者某个角色。

第三步,通过Spark资源将上游数据源HDFS中的数据导入Doris。

通常情况下,我们需要通过Spark读取Hive中的数据并写入Doris。为此,我们还需要创建Hive资源。

Spark Load的配置过程比较复杂,因此较少使用。一般情况下,使用Broker Load或者创建Doris外部表也可以达到相同的目的,只有在导入数据量特别大的情况下才使用Spark Load。




