暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Python+MySQL自动创建Hive表

趣说大数据 2021-03-20
1817
01

简述


在数仓建设过程中,针对结构化数据,需要在Hive中建立相对应的表结构(原始数据层stage),再经过ETL清理脱敏,形成标准化的数据流入数仓基础数据层(ODS)。
实际工作中由于业务系统表数量较多,一张一张地建表有点力不从心,本文运用Python+MySQL+Hive Cli的知识,通过脚本代码的方式自动创建Hive表,有效提高建表的工作效率。
环境:Python 2.7  Hive 2.1.1-cdh6.3.0

02

环境准备

import pymysql
import os
import sys
import argparse
其中pymysql模块需要单独安装,其它的模块均为系统自带模块。
实现具体步骤如下:
  • 连接MySQL,获取对应库下表的字段信息。

  • 拼接建表语句。

  • Hive Cli执行建表语句。


03

连接MySQL并拼装建表语句


将MySQL字段类型转换成Hive中的字段类型,并参照Hive官网语法拼接建表语句。
def getTable_str():
# 创建mysql连接
db = pymysql.connect(mysql_host, mysql_username, mysql_password, mysql_db_name, charset='utf8')
table_str = "create external table if not exists {0}.{1}( \n".format(hive_db, hive_table_name)
# 获取游标
cursor = db.cursor()
cursor.execute(
"SELECT COLUMN_NAME,DATA_TYPE,COLUMN_COMMENT FROM information_schema.`COLUMNS` where table_name='{0}' and TABLE_SChEMA='{1}' order by Ordinal_position".format(
mysql_table_name, mysql_db_name))
data = cursor.fetchall()
## 处理sql语句
for line in data:
#字段名称
column_name = line[0]
#字段类型 需对字段类型进行转换
column_type = line[1]
## comment 建表时的备注信息
column_comment = line[2]
hive_type = "STRING"
if (str(column_type).lower() == 'int'):
hive_type = "int"
elif (str(column_type).lower() == 'double' or str(column_type).lower() == 'float' or str(
column_type).lower() == 'decimal'):
hive_type = "decimal(15,4)"
elif (str(column_type).lower() == 'datetime' or str(column_type).lower() == 'bigint'):
hive_type = "bigint"
else:
hive_type = "STRING"
table_str += "`{0}` {1} COMMENT \"{2}\",\n".format(column_name, hive_type, str(column_comment),charset = 'utf8')
## 关闭连接
db.close()
##处理最后一个,\n
if (table_str.endswith(",\n")):
table_str = table_str[0:len(str(str(table_str).strip('\n'))) - 1]
return table_str


04

处理hive的分区字段


在工作中有些表需要做数据快照,在这里根据参数值isPartition(Y/N)进行控制,拼接分区信息,分区字段partition_key可配置多个,以逗号分隔。
def dealPartitionKey(isPartition):
if (str(isPartition) == 'Y'):
partition_keys = str(partition_key).split(",")
partitions_str = "partitioned by ("
for item in partition_keys:
partitions_str += "{0} string,".format(item)
if (str(partitions_str).endswith(",")):
partitions_str = str(partitions_str)[0:len(partitions_str) - 1]
partitions_str += ")"
else:
return ""
return partitions_str.lstrip()

05

合并建表语句


这里创建的是Hive的外部表,指定了location地址。
##拼装建表语句
def mergeCreateTableStr():
table_str=getTable_str()
table_str+="\n)"
##拼接上分区字段
table_str += dealPartitionKey(isPartition)
table_str += "\nstored as {0}".format(stored_type)
table_str += "\nlocation \"{0}{1}.db/{1}\"".format(hive_warehouse_url,hive_db, hive_table_name)
print(table_str)
return table_str

06

Hive Cli执行建表语句


通过Hive beeline 客户端指定用户和url地址执行建表语句。
def executeCreateHiveSQL():
table_sql=mergeCreateTableStr()
##调用系统模块 执行建表语句
res = os.popen("beeline -u {0} -n {1} -e '{2}';exit".format(hive_url,hive_user,table_sql)).readlines()
if len(res) != 0:
print(res)
这里使用Python的os模块执行Hive建表语句。

07

完整的脚本

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import pymysql
import os
import sys
import argparse


parser = argparse.ArgumentParser(description='auto_create_tableTohive')
#是否分区 isPartition
parser.add_argument('-p', type=str, default = 'N')
#分区字段
#hive_table_name
parser.add_argument('-ht', type=str, default='')
#mysql_db_name
parser.add_argument('-mdb', type=str, default='')
#mysql_table_name
parser.add_argument('-mt', type=str, default='')
#hive_db
parser.add_argument('-hdb', type=str, default='')
args = parser.parse_args()


if (len(str(args.p))==0 or len(str(args.ht))==0 or len(str(args.mdb))==0 or len(str(args.mt))==0 or len(str(args.hdb))==0 ):
print("请输入以下参数:-p :是否分区 -ht:hive 表名 -mdb:mysql db name -mt:mysql 表名 -hdb:hive db name")
sys.exit(100)


mysql_db_name=args.mdb
mysql_table_name=args.mt
hive_db=args.hdb
hive_table_name=args.ht
isPartition=args.p
print("mysql dbname:{0},mysql table name :{1},hive db name:{2} ,hive table name:{3} ,is_partition:{4}".format(mysql_db_name,mysql_table_name,hive_db,hive_table_name,isPartition))


#mysql 配置信息
mysql_host="10.83.1.45"
##mysql 用户名
mysql_username="root"
## mysql 密码
mysql_password="root"
## mysql对应的数据库名
#mysql_db_name="test"
##mysql 表名
#mysql_table_name="stduent"
## hive 数据库名
#hive_db="test"
## hive 表名
#hive_table_name="stduent"
#是否分区
#isPartition="Y"
## 分区字段
partition_key="d_date,channel_id"
## 存储类型
stored_type="parquet"
hive_url="jdbc:hive2://bigdata01:10000"
hive_user="admin"
hive_warehouse_url="/user/hive/warehouse/"
##UnicodeEncodeError: 'ascii' codec can't encode characters in position 61-71: ordinal not in range(128)
reload(sys)
sys.setdefaultencoding( "utf-8" )
#处理分区字段
def dealPartitionKey(isPartition):
if (str(isPartition) == 'Y'):
partition_keys = str(partition_key).split(",")
partitions_str = "partitioned by ("
for item in partition_keys:
partitions_str += "{0} string,".format(item)
if (str(partitions_str).endswith(",")):
partitions_str = str(partitions_str)[0:len(partitions_str) - 1]
partitions_str += ")"
else:
return ""
return partitions_str.lstrip()




##获取连接 获取mysql的所有字段
def getTable_str():
# 创建mysql连接
db = pymysql.connect(mysql_host, mysql_username, mysql_password, mysql_db_name, charset='utf8')
table_str = "create external table if not exists {0}.{1}( \n".format(hive_db, hive_table_name)
# 获取游标
cursor = db.cursor()
cursor.execute(
"SELECT COLUMN_NAME,DATA_TYPE,COLUMN_COMMENT FROM information_schema.`COLUMNS` where table_name='{0}' and TABLE_SChEMA='{1}' order by Ordinal_position".format(
mysql_table_name, mysql_db_name))
data = cursor.fetchall()
## 处理sql语句
for line in data:
#字段名称
column_name = line[0]
#字段类型 需对字段类型进行转换
column_type = line[1]
## comment 建表时的备注信息
column_comment = line[2]
hive_type = "STRING"
if (str(column_type).lower() == 'int'):
hive_type = "int"
elif (str(column_type).lower() == 'double' or str(column_type).lower() == 'float' or str(
column_type).lower() == 'decimal'):
hive_type = "decimal(15,4)"
elif (str(column_type).lower() == 'datetime' or str(column_type).lower() == 'bigint'):
hive_type = "bigint"
else:
hive_type = "STRING"
table_str += "`{0}` {1} COMMENT \"{2}\",\n".format(column_name, hive_type, str(column_comment),charset = 'utf8')
## 关闭连接
db.close()
##处理最后一个,\n
if (table_str.endswith(",\n")):
table_str = table_str[0:len(str(str(table_str).strip('\n'))) - 1]
return table_str


##拼装建表语句
def mergeCreateTableStr():
table_str=getTable_str()
table_str+="\n)"
##拼接上分区字段
table_str += dealPartitionKey(isPartition)
table_str += "\nstored as {0}".format(stored_type)
table_str += "\nlocation \"{0}{1}.db/{2}\"".format(hive_warehouse_url,hive_db, hive_table_name)
print(table_str)
return table_str


def executeCreateHiveSQL():
table_sql=mergeCreateTableStr()
##调用系统模块 执行建表语句
res = os.popen("beeline -u {0} -n {1} -e '{2}';exit".format(hive_url,hive_user,table_sql)).readlines()
if len(res) != 0:
print(res)


#拼装SQL执行SQL语句
executeCreateHiveSQL()

07

流程测试


  • 在MySQL中创建student表

create table student (id int primary key comment '主键id',
name varchar(100) comment 'name',
age int comment '年龄')
  • 执行Pyhon脚本

python create_table.py -mdb test -mt student -hdb stage_test -ht student -p Y

  • 输出信息如下

mysql dbname:test,mysql table name :student,hive db name:stage_test ,hive table name:student ,is_partition:Y


create external table if not exists stage_test.student(
`id` int COMMENT "主键id",
`name` STRING COMMENT "name",
`age` int COMMENT "年龄"
)partitioned by (d_date string,channel_id string)
stored as parquet
location "/user/hive/warehouse/stage_test.db/student"
  • 查看Hive中的表

+--------------------------+-----------------------+-----------------------+
| col_name | data_type | comment |
+--------------------------+-----------------------+-----------------------+
| id | int | 主键id |
| name | string | name |
| age | int | 年龄 |
| d_date | string | |
| channel_id | string | |
| | NULL | NULL |
| # Partition Information | NULL | NULL |
| # col_name | data_type | comment |
| | NULL | NULL |
| d_date | string | |
| channel_id | string | |
+--------------------------+-----------------------+-----------------------+


08

小技巧


可参照Hive官网建表语句格式,针对脚本中的建表语句进行扩展。
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name    -- (Note: TEMPORARY available in Hive 0.14.0 and later)
[(col_name data_type [column_constraint_specification] [COMMENT col_comment], ... [constraint_specification])]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[SKEWED BY (col_name, col_name, ...) -- (Note: Available in Hive 0.10.0 and later)]
ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
[STORED AS DIRECTORIES]
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] -- (Note: Available in Hive 0.6.0 and later)
]
[LOCATION hdfs_path]
[TBLPROPERTIES (property_name=property_value, ...)] -- (Note: Available in Hive 0.6.0 and later)
[AS select_statement]; -- (Note: Available in Hive 0.5.0 and later; not supported for external tables)

CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
LIKE existing_table_or_view_name
[LOCATION hdfs_path];
文章转载自趣说大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论