简述
环境准备
import pymysqlimport osimport sysimport argparse
连接MySQL,获取对应库下表的字段信息。
拼接建表语句。
Hive Cli执行建表语句。
连接MySQL并拼装建表语句
def getTable_str():# 创建mysql连接db = pymysql.connect(mysql_host, mysql_username, mysql_password, mysql_db_name, charset='utf8')table_str = "create external table if not exists {0}.{1}( \n".format(hive_db, hive_table_name)# 获取游标cursor = db.cursor()cursor.execute("SELECT COLUMN_NAME,DATA_TYPE,COLUMN_COMMENT FROM information_schema.`COLUMNS` where table_name='{0}' and TABLE_SChEMA='{1}' order by Ordinal_position".format(mysql_table_name, mysql_db_name))data = cursor.fetchall()## 处理sql语句for line in data:#字段名称column_name = line[0]#字段类型 需对字段类型进行转换column_type = line[1]## comment 建表时的备注信息column_comment = line[2]hive_type = "STRING"if (str(column_type).lower() == 'int'):hive_type = "int"elif (str(column_type).lower() == 'double' or str(column_type).lower() == 'float' or str(column_type).lower() == 'decimal'):hive_type = "decimal(15,4)"elif (str(column_type).lower() == 'datetime' or str(column_type).lower() == 'bigint'):hive_type = "bigint"else:hive_type = "STRING"table_str += "`{0}` {1} COMMENT \"{2}\",\n".format(column_name, hive_type, str(column_comment),charset = 'utf8')## 关闭连接db.close()##处理最后一个,\nif (table_str.endswith(",\n")):table_str = table_str[0:len(str(str(table_str).strip('\n'))) - 1]return table_str
处理hive的分区字段
def dealPartitionKey(isPartition):if (str(isPartition) == 'Y'):partition_keys = str(partition_key).split(",")partitions_str = "partitioned by ("for item in partition_keys:partitions_str += "{0} string,".format(item)if (str(partitions_str).endswith(",")):partitions_str = str(partitions_str)[0:len(partitions_str) - 1]partitions_str += ")"else:return ""return partitions_str.lstrip()
合并建表语句
##拼装建表语句def mergeCreateTableStr():table_str=getTable_str()table_str+="\n)"##拼接上分区字段table_str += dealPartitionKey(isPartition)table_str += "\nstored as {0}".format(stored_type)table_str += "\nlocation \"{0}{1}.db/{1}\"".format(hive_warehouse_url,hive_db, hive_table_name)print(table_str)return table_str
Hive Cli执行建表语句
def executeCreateHiveSQL():table_sql=mergeCreateTableStr()##调用系统模块 执行建表语句res = os.popen("beeline -u {0} -n {1} -e '{2}';exit".format(hive_url,hive_user,table_sql)).readlines()if len(res) != 0:print(res)
完整的脚本
#!/usr/bin/python# -*- coding: UTF-8 -*-import pymysqlimport osimport sysimport argparseparser = argparse.ArgumentParser(description='auto_create_tableTohive')#是否分区 isPartitionparser.add_argument('-p', type=str, default = 'N')#分区字段#hive_table_nameparser.add_argument('-ht', type=str, default='')#mysql_db_nameparser.add_argument('-mdb', type=str, default='')#mysql_table_nameparser.add_argument('-mt', type=str, default='')#hive_dbparser.add_argument('-hdb', type=str, default='')args = parser.parse_args()if (len(str(args.p))==0 or len(str(args.ht))==0 or len(str(args.mdb))==0 or len(str(args.mt))==0 or len(str(args.hdb))==0 ):print("请输入以下参数:-p :是否分区 -ht:hive 表名 -mdb:mysql db name -mt:mysql 表名 -hdb:hive db name")sys.exit(100)mysql_db_name=args.mdbmysql_table_name=args.mthive_db=args.hdbhive_table_name=args.htisPartition=args.pprint("mysql dbname:{0},mysql table name :{1},hive db name:{2} ,hive table name:{3} ,is_partition:{4}".format(mysql_db_name,mysql_table_name,hive_db,hive_table_name,isPartition))#mysql 配置信息mysql_host="10.83.1.45"##mysql 用户名mysql_username="root"## mysql 密码mysql_password="root"## mysql对应的数据库名#mysql_db_name="test"##mysql 表名#mysql_table_name="stduent"## hive 数据库名#hive_db="test"## hive 表名#hive_table_name="stduent"#是否分区#isPartition="Y"## 分区字段partition_key="d_date,channel_id"## 存储类型stored_type="parquet"hive_url="jdbc:hive2://bigdata01:10000"hive_user="admin"hive_warehouse_url="/user/hive/warehouse/"##UnicodeEncodeError: 'ascii' codec can't encode characters in position 61-71: ordinal not in range(128)reload(sys)sys.setdefaultencoding( "utf-8" )#处理分区字段def dealPartitionKey(isPartition):if (str(isPartition) == 'Y'):partition_keys = str(partition_key).split(",")partitions_str = "partitioned by ("for item in partition_keys:partitions_str += "{0} string,".format(item)if (str(partitions_str).endswith(",")):partitions_str = str(partitions_str)[0:len(partitions_str) - 1]partitions_str += ")"else:return ""return partitions_str.lstrip()##获取连接 获取mysql的所有字段def getTable_str():# 创建mysql连接db = pymysql.connect(mysql_host, mysql_username, mysql_password, mysql_db_name, charset='utf8')table_str = "create external table if not exists {0}.{1}( \n".format(hive_db, hive_table_name)# 获取游标cursor = db.cursor()cursor.execute("SELECT COLUMN_NAME,DATA_TYPE,COLUMN_COMMENT FROM information_schema.`COLUMNS` where table_name='{0}' and TABLE_SChEMA='{1}' order by Ordinal_position".format(mysql_table_name, mysql_db_name))data = cursor.fetchall()## 处理sql语句for line in data:#字段名称column_name = line[0]#字段类型 需对字段类型进行转换column_type = line[1]## comment 建表时的备注信息column_comment = line[2]hive_type = "STRING"if (str(column_type).lower() == 'int'):hive_type = "int"elif (str(column_type).lower() == 'double' or str(column_type).lower() == 'float' or str(column_type).lower() == 'decimal'):hive_type = "decimal(15,4)"elif (str(column_type).lower() == 'datetime' or str(column_type).lower() == 'bigint'):hive_type = "bigint"else:hive_type = "STRING"table_str += "`{0}` {1} COMMENT \"{2}\",\n".format(column_name, hive_type, str(column_comment),charset = 'utf8')## 关闭连接db.close()##处理最后一个,\nif (table_str.endswith(",\n")):table_str = table_str[0:len(str(str(table_str).strip('\n'))) - 1]return table_str##拼装建表语句def mergeCreateTableStr():table_str=getTable_str()table_str+="\n)"##拼接上分区字段table_str += dealPartitionKey(isPartition)table_str += "\nstored as {0}".format(stored_type)table_str += "\nlocation \"{0}{1}.db/{2}\"".format(hive_warehouse_url,hive_db, hive_table_name)print(table_str)return table_strdef executeCreateHiveSQL():table_sql=mergeCreateTableStr()##调用系统模块 执行建表语句res = os.popen("beeline -u {0} -n {1} -e '{2}';exit".format(hive_url,hive_user,table_sql)).readlines()if len(res) != 0:print(res)#拼装SQL执行SQL语句executeCreateHiveSQL()
流程测试
在MySQL中创建student表
create table student (id int primary key comment '主键id',name varchar(100) comment 'name',age int comment '年龄')
执行Pyhon脚本
python create_table.py -mdb test -mt student -hdb stage_test -ht student -p Y
输出信息如下
mysql dbname:test,mysql table name :student,hive db name:stage_test ,hive table name:student ,is_partition:Ycreate external table if not exists stage_test.student(`id` int COMMENT "主键id",`name` STRING COMMENT "name",`age` int COMMENT "年龄")partitioned by (d_date string,channel_id string)stored as parquetlocation "/user/hive/warehouse/stage_test.db/student"
查看Hive中的表
+--------------------------+-----------------------+-----------------------+| col_name | data_type | comment |+--------------------------+-----------------------+-----------------------+| id | int | 主键id || name | string | name || age | int | 年龄 || d_date | string | || channel_id | string | || | NULL | NULL || # Partition Information | NULL | NULL || # col_name | data_type | comment || | NULL | NULL || d_date | string | || channel_id | string | |+--------------------------+-----------------------+-----------------------+
小技巧
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name -- (Note: TEMPORARY available in Hive 0.14.0 and later)[(col_name data_type [column_constraint_specification] [COMMENT col_comment], ... [constraint_specification])][COMMENT table_comment][PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)][CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS][SKEWED BY (col_name, col_name, ...) -- (Note: Available in Hive 0.10.0 and later)]ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)[STORED AS DIRECTORIES][[ROW FORMAT row_format][STORED AS file_format]| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] -- (Note: Available in Hive 0.6.0 and later)][LOCATION hdfs_path][TBLPROPERTIES (property_name=property_value, ...)] -- (Note: Available in Hive 0.6.0 and later)[AS select_statement]; -- (Note: Available in Hive 0.5.0 and later; not supported for external tables)CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_nameLIKE existing_table_or_view_name[LOCATION hdfs_path];
文章转载自趣说大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




