Doris是目前OLAP分析领域的翘楚,虽性能略逊于Clickhouse,但是易用性和并发度都好于Clickhouse。Doris架构和早期的MPP架构数据库Greenplum非常类似,但是Doris比Greenplum性能提升很多,并且并发度高很多。
Doris和Greenplum相比,主要有以下优化和提升:
Doris将Greenplum数据库Master节点的压力分散到BE节点上,大幅提高了并发查询能力;
Doirs通过分桶技术,提高了数据分布的灵活性;
Doirs通过多副本,提高了数据的可用性和查询并发度;
Doirs 抽象出来的三种数据模型(Aggregate、UNIQUE、Duplicate),让OLAP特性更为显著,查询性能更高
Doirs的数据存储相对于Greenplum有很大的提升;
Doirs对实时数据支持更加友好;
Doris对内存的使用率更好也更合理。Greenplum的内存策略一直饱用户受吐槽,合理的使用内存意味着更高效的查询速度。
根据笔者的经验,在配置接近的情况下,Doris查询速度比Greenplum快10倍左右。综上,Doris是OLAP领域的“后起之秀”,是Greenplum强有力的替代者。

我们都知道Greenplum通过GPload可以快速加载Hive数据,详细参考文章《数仓实战|两步搞定Hive数据加载到Greenplum》。Doirs也提供类似的功能,包括从文件系统加载数据的Stream load和从HDFS加载数据的Broker load。本文主要介绍Broker load。
Broker load 创建导入语句语法:
LOAD LABEL db_name.label_name(data_desc, ...)WITH BROKER broker_name broker_properties[PROPERTIES (key1=value1, ... )]* data_desc:DATA INFILE ('file_path', ...)[NEGATIVE]INTO TABLE tbl_name[PARTITION (p1, p2)][COLUMNS TERMINATED BY separator ][(col1, ...)][PRECEDING FILTER predicate][SET (k1=f1(xx), k2=f2(xx))][WHERE predicate]* broker_properties:(key1=value1, ...)
Broker load导入示例如下:
LOAD LABEL db1.label1(DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")INTO TABLE tbl1COLUMNS TERMINATED BY ","(tmp_c1,tmp_c2)SET(id=tmp_c2,name=tmp_c1),DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")INTO TABLE tbl2COLUMNS TERMINATED BY ","(col1, col2)where col1 > 1)WITH BROKER 'broker'("username"="user","password"="pass")PROPERTIES("timeout" = "3600");
Broker load是异步模式,需要通过show load from dbname where label ='${label_name}'; 命令查看导入进度。
由于项目中导入的任务会很多,所以,我们通常使用python代码来实现导入逻辑,实现不同表的导入。前提是源表和目标表字段列表保持一致。代码分享如下。
#!/usr/bin/python# -*- coding: UTF-8 -*-import osimport sysimport jsonimport codecsimport subprocessimport time#import MySQLdbimport pymysql# 不带缓存输出数据def printf(text):sys.stdout.write(text)sys.stdout.write("\n")sys.stdout.flush()# 同步运行任务,如果命令异常则退出def runCmd(cmdText):printf(cmdText)child_process = subprocess.Popen(cmdText, shell=True)(stdout, stderr) = child_process.communicate()sys.stdout.flush()printf('return code: %s, cmd: %s' %(child_process.returncode, cmdText))if (child_process.returncode):sys.exit(child_process.returncode)# 创建目标数据库连接def getConn():#连接数据库conn = pymysql.connect(host='xx.xx.xx.xx', user='root', passwd='root', db='xxx', port=3306)return conndef executeSql(sql):#获取数据库连接conn = getConn()# 使用cursor()方法获取操作游标cursor = conn.cursor()# 使用execute方法执行SQL语句cursor.execute(sql)# 使用 fetchone() 方法获取一条数据data = cursor.fetchall()# 关闭数据库连接cursor.close()conn.close()return datadef querySql(sql):#获取数据库连接conn = getConn()# 使用cursor()方法获取操作游标cursor = conn.cursor()# 使用execute方法执行SQL语句cursor.execute(sql)# 使用 fetchone() 方法获取所有数据data = cursor.fetchall()# 关闭数据库连接cursor.close()conn.close()return dataif __name__ == '__main__':tabname = 'dm_gd_hw_goods_sql_day'collist_sql="""select group_concat(column_name) as column_listfrom (select column_name, ordinal_positionfrom information_schema. columnswhere table_schema = 'hw_mbi'and table_name = '${tabname}'order by ordinal_position) t;"""collist_sql = collist_sql.replace('${tabname}', tabname)printf("查询字段的SQL:%s"%collist_sql)collist_str = querySql(collist_sql)[0][0]printf("查询到的字段列表:%s"%collist_str)label_name = 'load_' + tabname + '_' + time.strftime("%Y%m%d_%H%M%S", time.localtime())load_sql=""" LOAD LABEL dm.${label_name}(DATA INFILE("hdfs://hadoopcluster/hive/warehouse/hw_dm.db/${tabname}/*")INTO TABLE ${tabname}FORMAT AS "orc"(${column_list}))WITH BROKER broker_name ("username"="xxx", "password"="xxx","dfs.nameservices" = "hadoopcluster","dfs.ha.namenodes.hadoopcluster" = "nn1,nn2","dfs.namenode.rpc-address.hadoopcluster.nn1" = "192.168.80.31:8020","dfs.namenode.rpc-address.hadoopcluster.nn2" = "192.168.80.32:8020","dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")PROPERTIES ("timeout" = "3600","max_filter_ratio" = "0.1"--,"load_parallelism" = "8");"""load_sql = load_sql.replace('${label_name}',label_name)\.replace('${tabname}',tabname)\.replace('${column_list}', collist_str)printf("加载数据的SQL:%s"%load_sql)executeSql(load_sql)try:while True:status_sql = "show load from dm where label ='${label_name}';"status_sql = status_sql.replace('${label_name}',label_name)printf("加载数据状态查询的SQL:%s"%status_sql)status_row = querySql(status_sql)[0]printf("状态查询到的数据:%s"%str(status_row))printf("状态查询到的状态:%s"%status_row[2])if status_row[2] == 'Finshed':printf("%s load run sucess!" % label_name)breakelif status_row[2] == 'LOADIND' or status_row[2] == 'PENDING':printf("%s load running! sleep 60s" % label_name)time.sleep(60)else:printf("%s load run failed!" % label_name)printf("ErrorMsg: %s 异常数据查看URL:%s"%(status_row[7],status_row[13]))breakexcept Exception as e:traceback.print_exc() #输出程序错误位置os._exit(1) #异常退出,终止程序

《数据中台研习社》微信群,请添:laowang5244,备注【进群】
🧐分享、点赞、在看,给个三连击呗!👇




