暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

数仓实战|Broker load快速导入Hive数据到Doris

数据中台研习社 2021-08-23
4304

      Doris是目前OLAP分析领域的翘楚,虽性能略逊于Clickhouse,但是易用性和并发度都好于Clickhouse。Doris架构和早期的MPP架构数据库Greenplum非常类似,但是Doris比Greenplum性能提升很多,并且并发度高很多。

      Doris和Greenplum相比,主要有以下优化和提升:

  1. Doris将Greenplum数据库Master节点的压力分散到BE节点上,大幅提高了并发查询能力;

  2. Doirs通过分桶技术,提高了数据分布的灵活性;

  3. Doirs通过多副本,提高了数据的可用性和查询并发度;

  4. Doirs 抽象出来的三种数据模型(Aggregate、UNIQUE、Duplicate),让OLAP特性更为显著,查询性能更高

  5. Doirs的数据存储相对于Greenplum有很大的提升;

  6. Doirs对实时数据支持更加友好;

  7. Doris对内存的使用率更好也更合理。Greenplum的内存策略一直饱用户受吐槽,合理的使用内存意味着更高效的查询速度。

       根据笔者的经验,在配置接近的情况下,Doris查询速度比Greenplum快10倍左右。综上,Doris是OLAP领域的“后起之秀”,是Greenplum强有力的替代者。

        我们都知道Greenplum通过GPload可以快速加载Hive数据,详细参考文章《数仓实战|两步搞定Hive数据加载到Greenplum》。Doirs也提供类似的功能,包括从文件系统加载数据的Stream load和从HDFS加载数据的Broker load。本文主要介绍Broker load。


        Broker load 创建导入语句语法:

    LOAD LABEL db_name.label_name 
    (data_desc, ...)
    WITH BROKER broker_name broker_properties
    [PROPERTIES (key1=value1, ... )]


    * data_desc:


    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY separator ]
    [(col1, ...)]
    [PRECEDING FILTER predicate]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]


    * broker_properties:


    (key1=value1, ...)

            Broker load导入示例如下:

      LOAD LABEL db1.label1
      (
      DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
      INTO TABLE tbl1
      COLUMNS TERMINATED BY ","
      (tmp_c1,tmp_c2)
      SET
      (
      id=tmp_c2,
      name=tmp_c1
      ),
      DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
      INTO TABLE tbl2
      COLUMNS TERMINATED BY ","
      (col1, col2)
      where col1 > 1
      )
      WITH BROKER 'broker'
      (
      "username"="user",
      "password"="pass"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );


               Broker load是异步模式,需要通过show load from dbname where label ='${label_name}'; 命令查看导入进度。

               由于项目中导入的任务会很多,所以,我们通常使用python代码来实现导入逻辑,实现不同表的导入。前提是源表和目标表字段列表保持一致。代码分享如下。

        #!/usr/bin/python
        # -*- coding: UTF-8 -*-


        import os
        import sys
        import json
        import codecs
        import subprocess
        import time
        #import MySQLdb
        import pymysql


        # 不带缓存输出数据
        def printf(text):
        sys.stdout.write(text)
        sys.stdout.write("\n")
        sys.stdout.flush()


        # 同步运行任务,如果命令异常则退出
        def runCmd(cmdText):
        printf(cmdText)
        child_process = subprocess.Popen(cmdText, shell=True)
        (stdout, stderr) = child_process.communicate()
        sys.stdout.flush()
        printf('return code: %s, cmd: %s' %(child_process.returncode, cmdText))
        if (child_process.returncode):
        sys.exit(child_process.returncode)


        # 创建目标数据库连接
        def getConn():
        #连接数据库
        conn = pymysql.connect(host='xx.xx.xx.xx', user='root', passwd='root', db='xxx', port=3306)
        return conn


        def executeSql(sql):
        #获取数据库连接
        conn = getConn()
        # 使用cursor()方法获取操作游标
        cursor = conn.cursor()
        # 使用execute方法执行SQL语句
        cursor.execute(sql)
        # 使用 fetchone() 方法获取一条数据
        data = cursor.fetchall()
        # 关闭数据库连接
        cursor.close()
        conn.close()
        return data


        def querySql(sql):
        #获取数据库连接
        conn = getConn()
        # 使用cursor()方法获取操作游标
        cursor = conn.cursor()
        # 使用execute方法执行SQL语句
        cursor.execute(sql)
        # 使用 fetchone() 方法获取所有数据
        data = cursor.fetchall()
        # 关闭数据库连接
        cursor.close()
        conn.close()
        return data


        if __name__ == '__main__':
        tabname = 'dm_gd_hw_goods_sql_day'
        collist_sql="""select group_concat(column_name) as column_list
        from (select column_name, ordinal_position
        from information_schema. columns
        where table_schema = 'hw_mbi'
        and table_name = '${tabname}'
        order by ordinal_position) t;
        """
        collist_sql = collist_sql.replace('${tabname}', tabname)
        printf("查询字段的SQL:%s"%collist_sql)
        collist_str = querySql(collist_sql)[0][0]
        printf("查询到的字段列表:%s"%collist_str)


        label_name = 'load_' + tabname + '_' + time.strftime("%Y%m%d_%H%M%S", time.localtime())
        load_sql=""" LOAD LABEL dm.${label_name}
        (
        DATA INFILE("hdfs://hadoopcluster/hive/warehouse/hw_dm.db/${tabname}/*")
        INTO TABLE ${tabname}
        FORMAT AS "orc"
        (${column_list})
        )
        WITH BROKER broker_name ("username"="xxx", "password"="xxx",
        "dfs.nameservices" = "hadoopcluster",
        "dfs.ha.namenodes.hadoopcluster" = "nn1,nn2",
        "dfs.namenode.rpc-address.hadoopcluster.nn1" = "192.168.80.31:8020",
        "dfs.namenode.rpc-address.hadoopcluster.nn2" = "192.168.80.32:8020",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
        )
        PROPERTIES (
        "timeout" = "3600",
        "max_filter_ratio" = "0.1"--,"load_parallelism" = "8"
        );
        """
        load_sql = load_sql.replace('${label_name}',label_name)\
        .replace('${tabname}',tabname)\
        .replace('${column_list}', collist_str)
        printf("加载数据的SQL:%s"%load_sql)
        executeSql(load_sql)
        try:
        while True:
        status_sql = "show load from dm where label ='${label_name}';"
        status_sql = status_sql.replace('${label_name}',label_name)
        printf("加载数据状态查询的SQL:%s"%status_sql)
        status_row = querySql(status_sql)[0]
        printf("状态查询到的数据:%s"%str(status_row))
        printf("状态查询到的状态:%s"%status_row[2])
        if status_row[2] == 'Finshed':
        printf("%s load run sucess!" % label_name)
        break
        elif status_row[2] == 'LOADIND' or status_row[2] == 'PENDING':
        printf("%s load running! sleep 60s" % label_name)
        time.sleep(60)
        else:
        printf("%s load run failed!" % label_name)
        printf("ErrorMsg: %s 异常数据查看URL:%s"%(status_row[7],status_row[13]))
        break

        except Exception as e:
        traceback.print_exc() #输出程序错误位置
        os._exit(1) #异常退出,终止程序






        数据中台研习社》微信群,请添:laowang5244,备注【进群】


                                                                 🧐分享、点赞、在看,给个三连击呗!👇


        文章转载自数据中台研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论