在日常的Python项目开发中,环境搭建和基础搭建总是相对固化的。对于数据相关从业者来说,基于PySpark的Python项目开发是最基础的一种工作项目场景。针对这一场景,我将分享一个简易的常用工具模块搭建流程思路,并将涉及的相关知识要点进行简要讲解。
Spark实例搭建及对应常用方法;
表操作:读取hive表、插入或更新hive分区表数据、创建或更新视图。
其他:单例模式实现、日志设置。
base:存放基本类,包括单例类、表类;
constant:存放项目名称、集群信息等相关常量信息;
db_io:存放数据处理相关类或方法,包括 spark方法类、表操作类;
db_setting:存放项目用到的表信息,生成表实例。
utils:存放项目常用公共方法,包括日志设置。
二、代码和知识要点讲解
2.1 日志设置
使用 loguru 第三方库中已经封装好的logger类的实例,就能非常方便的进行日志配置了。
logger实例默认采用 sys.stderr 标准错误输出,将日志输出到控制台中。当仅需要将日志输出到控制台时,可直接使用logger实例的默认配置(当然也可以自定义);当需要将日志信息输出到日志文件中时,我们对其进行了一些简单的参数配置,实现代码如下:
import osfrom datetime import datetimefrom loguru import loggerdef my_logger(log_dir: str = '', log_file_prefix='JW_LOGGER'):"""log_dir: project directory pathlog_file_prefix: filename prefix"""logger_object = loggerif log_dir:time_log = datetime.now().strftime('%Y%m%d')log_file_name = os.path.join(log_dir, f"{log_file_prefix}_{time_log}.log")dir_format = "{time:YYYY-MM-DD HH:mm:ss} | {level} | {file}:{function}:{module}:{name}:{line} {message}"logger_object .add(log_file_name, # file pathformat=dir_format, # log information formatlevel="INFO", # log level of outputrotation='50 MB', # A condition indicating whenever the current logged file should be closed and a new one started.backtrace=True, # Whether the exception trace formatted should be extended upward, beyond the catching point, to show the full stacktrace which generated the error.encoding='UTF-8')return logger_objectif __name__ == '__main__':my_logger(log_dir='')
2.2 单例设计模式
单例设计模式的目的是让类创建的对象在系统中只有唯一的一个实例。
在Python中,使用元类实现单例模式是一种效果比较好的解决方案;不过,在多线程并发情况下,仅仅使用基于元类的单例模式得到的有可能不是单例,所以,可以添加双重校验锁,避免生成多个对象实例。实现代码如下:
import threadingfrom loguru import loggerclass Singleton(type):"""Base class Singleton whose subclass should only have one instance."""_instance_lock = threading.Lock()def __init__(cls, *args, **kwargs):cls._instance = Nonesuper().__init__(*args, **kwargs)def __call__(cls, *args, **kwargs):# 第一次校验:判断单例是否已生成;避免单例已产生后,线程还要拿锁,浪费锁资源。if cls._instance is None:with Singleton._instance_lock:# 第二次校验:避免线程在等锁的过程中单例已产生if cls._instance is None:# 通过 super 函数调用父元类 type 的__call__方法完成实例化cls._instance = super().__call__(*args, **kwargs)else:# 在第一次校验时,若单例已经产生,打印对应警告信息abandoned_instance = super().__call__(*args, **kwargs)warning_message = (f"""\n{abandoned_instance.__str__()} could not be instantiated !An instance of class<{cls.__name__}> already exists ! \n""")logger.warning(warning_message)return cls._instance
补充知识点:类实例化过程
一个类在实例化的时候实际上是做了以下几件事情:
Step-1:直接触发元类中的__call__方法,它控制了类的__new__和__init__方法的调用;
Step-2:类的__new__方法是第二个调用的方法,会构造了一个空对象;
Step-3:类的__init__方法是最后调用的方法,会对这个空对象进行初始化;
Step-4:最后,元类的__call__方法会返回一个初始化好的类的实例对象。
2.3 表类
我们希望设计一个类,它的一个实例化对象能够存储一张hive表的所有字段信息以及相关的一些属性。
在自定义数据操作时,使用Python的第三方包 dataclasses 能够让代码的可读性可好,操作更为方便:它允许我们将业务逻辑以类的形式进行抽象,支持在类中定义各种数据结果的组合,同时也支持类嵌套。在使用 dataclasse 包时,我们可以仅仅通过一个装饰器语法糖(@dataclass)就将一个普通的类(class)变为 dataclass类。在dataclass类中,我们需要显示的写出每个fields(类变量)的数据类型,同时,也可以提供一个默认值。
在以下代码中,我们先定义了一个 DataColumn类,用于存储hive表每一字段的相关信息;然后,定义了 DataTable类,用于存储hive表的所有字段信息及其他相关属性;最后,展示了一个学生表(db.student_table)的实例化案例。
from typing import AnyStr, List, Unionfrom dataclasses import dataclass, fieldfrom pyspark.sql.types import DataType@dataclassclass DataColumn:"""col_name: column namecol_alias_name: column alias namecol_type: column typecomment: column commentpartitioned: is partition column or not"""col_name: AnyStrcol_alias_name: Union[AnyStr, None] = Nonecol_type: Union[DataType, None] = Nonecomment: Union[AnyStr] = ''partitioned: bool = Falsedef __str__(self):class_usage = "wrapping columns feature."return class_usage@dataclassclass DataTable:"""db: database nametable_name: table namecolumns: columns listis_create_mode: True which needs to create tablesis_explicit_mode: True which needs to display the columns we needtable_comment: table commentsdb_type: table typenot_partition_cols: columns which are not partition columns.partition_cols: columns which are partition columns."""db_name: AnyStrtable_name: AnyStrcolumns: List[DataColumn]is_need_create_mode: bool = Falseis_explicit_mode: bool = Truetable_comment: AnyStr = ""db_type: AnyStr = "hive"not_partition_cols: List[str] = field(default_factory=list)partition_cols: List[str] = field(default_factory=list)def __str__(self):class_usage = f"{self.db_type}: {self.db_name}.{self.table_name}"return class_usagedef __post_init__(self):self.not_partition_cols = [x.col_name for x in self.columns if not x.partitioned]self.partition_cols = [x.col_name for x in self.columns if x.partitioned]if self.table_name.split('_')[0] == 'v':raise ValueError(f"DataTable Class object should be a table rather than a view !")if self.is_need_create_mode:if not self.is_explicit_mode:raise ValueError(f"In creating mode, explicit mode is must !")if [x for x in self.columns if x.col_type is None]:raise ValueError(f"In creating mode, columns type should not be empty!")if self.is_explicit_mode:if not self.columns:raise ValueError(f"In explicit mode, columns need one at least!")if len(self.columns) != len(set(self.not_partition_cols + self.partition_cols)):raise ValueError(f"In explicit mode, columns should not be duplicated!")if __name__ == '__main__':# 学生表(db.student_table)的实例化对象 StudentDBStudentDB = DataTable(db_name="db",table_name="student_table",is_explicit_mode=True,table_comment="学生分数表",columns=[DataColumn(col_name="student_id", comment="学生id"),DataColumn(col_name="student_name", comment="学生姓名"),DataColumn(col_name="subject", comment="学科"),DataColumn(col_name="score", comment="分数")])
DataTable类变量补充说明:
is_create_mode: 当为True时,必须提供所有字段的数据类型;这样才能通过2.4小节中的表操作类的创建表方法(create_table_sql)生成对应的建表语句。
is_explicit_mode: 为True时,说明是将所需字段一一罗列展示;为False时,会使用 select * 语句,导入所有字段。
2.4 表操作类
我们将表操作相关的SQL语句整合成 表操作类(DataTableIO),并将2.3节中的 DataTable 对象作为它的一个属性,同时构建了基于传入的DataTable 对象的4个方法——hive表读取(table_query_sql)、插入或更新hive分区表数据(insert_or_overwrite_daily_partition_table_sql)、创建或更新视图create_or_update_view_sql)、重写普通表数据(overwrite_normal_table_sql)、创建表(create_table_sql,不用于代码直接创建表,自动生成建表语句,便于工作)。
其中,hive表读取方法支持select语句(columns 参数)、where语句(conditions 参数)、limit语句(limit 参数),同时提供了 is_loading_view 参数,支持直接从表对应视图中读取数据。创建表方法会根据是否存在分区字段,自动判断生成 普通表(非分区表)建表语句 还是 分区表建表语句。
代码实现如下:
from datetime import datetimeclass DataTableIO(object):def __init__(self, db: DataTable):self.db = dbdef __str__(self):class_usage = "Generating sql code about db."return class_usagedef table_query_sql(self,columns: List = None,is_loading_view: bool = False,conditions: List = None,limit: int = None) -> str:"""query tables sqlcolumns: query columnsis_loading_view: decide whether loading corresponding viewconditions: where conditionslimit: limit conditions"""if columns:columns = columnselif self.db.columns:columns = [x.col_name.strip() for x in self.db.columns]else:columns = ['*']columns = ', '.join(columns)if is_loading_view:query = f"SELECT {columns} FROM {self.db.db_name}.v_{self.db.table_name}"else:query = f"SELECT {columns} FROM {self.db.db_name}.{self.db.table_name}"if conditions:if isinstance(conditions, str):conditions = [conditions]cond = " AND ".join(conditions)query = f"{query} WHERE {cond}"if limit:query = f"{query} LIMIT {limit}"return querydef insert_or_overwrite_daily_partition_table_sql(self, partition_day: str = None, temp_view_name: str = '') -> str:"""Insert or rewrite table which is partition table and partitioned by date."""not_par_col_name = [i.col_name.strip() for i in self.db.columns if not i.partitioned]not_par_alias_col_name = [i.col_alias_name if i.col_alias_name else i.col_name.strip()for i in self.db.columns if not i.partitioned]column_text = []if partition_day is None:partition_day = datetime.now().strftime('%Y-%m-%d')if not temp_view_name:raise ValueError("Temp View Should register first !")for i in range(len(not_par_col_name)):column_text = column_text + [not_par_col_name[i] + " AS " + not_par_alias_col_name[i]]column_text = ', '.join(column_text)insert_sql = (f"""INSERT OVERWRITE TABLE {self.db.db_name}.{self.db.table_name} partition (d='{partition_day}')SELECT {column_text}FROM {temp_view_name}""")return insert_sqldef create_or_update_view_sql(self, partition_day: str = None) -> str:"""Create or update view."""if partition_day is None:partition_day = datetime.now().strftime('%Y-%m-%d')create_or_update_view_sql = (f"""create or replace view {self.db.db_name}.v_{self.db.table_name} asselect * from {self.db.db_name}.{self.db.table_name} where d='{partition_day}'""")return create_or_update_view_sqldef overwrite_normal_table_sql(self, temp_view_name: str = '') -> str:"""Overwrite table which is not partition table."""not_par_col_name = [i.col_name.strip() for i in self.db.columns if not i.partitioned]not_par_alias_col_name = [i.col_alias_name if i.col_alias_name else i.col_name.strip()for i in self.db.columns if not i.partitioned]column_text = []for i in range(len(not_par_col_name)):column_text = column_text + [not_par_col_name[i] + " AS " + not_par_alias_col_name[i]]column_text = ', '.join(column_text)overwrite_sql = (f"""INSERT OVERWRITE TABLE {self.db.db_name}.{self.db.table_name}SELECT {column_text}FROM {temp_view_name}""")return overwrite_sqldef create_table_sql(self, is_partition_table: bool = True) -> str:""" Print create table sql codes. """if not self.db.is_need_create_mode:raise ValueError("Attention ! The DataTable object is not creating mode !")if self.db.is_explicit_mode:not_par_cols = [f"{x.col_name} {x.col_type.simpleString()} COMMENT '{x.comment}'"for x in self.db.columns if not x.partitioned]not_par_str = ",\n\t\t\t\t\t".join(not_par_cols)par_cols = [f"{x.col_name} {x.col_type.simpleString()} COMMENT '{x.comment}'"for x in self.db.columns if x.partitioned]if not is_partition_table or not par_cols:sql_code = (f"""USE {self.db.db_name};CREATE TABLE {self.db.table_name} ({not_par_str})COMMENT '{self.db.table_comment}'STORED AS textfile;""")else:par_str = ",".join(par_cols)sql_code = (f"""USE {self.db.db_name};CREATE TABLE {self.db.table_name} ({not_par_str})COMMENT '{self.db.table_comment}'PARTITIONED BY({par_str})STORED AS ORC;""")return sql_codeelse:raise ValueError("Attention ! The DataTable object should be explicit mode !")
2.5 spark帮助类
基于2.2节中构建的单例类Singleton,我们可以构建了一个spark帮助类,保证在实例化的过程中,同一运行环境中仅存在一个SparkHelper对象。
这个SparkHelper类主要提供了:一个封装好的初始化spark实例(init_spark),通过调用spark实例实现的 表查询方法(query_table)、插入/更新分区表数据方法(insert_daily_partition_table,仅支持以天为分区的分区表)以及 重写普通表(非分区表)数据方法(insert_normal_table)。其中,表查询方法(query_table)在2.5节中的hive表读取方法(table_query_sql)基础上,附加提供了checking_data参数,当为True时,会自动验证读取数据的记录数是否为0。
代码实现如下:
from datetime import datetimeclass SparkHelper(object, metaclass=Singleton):def __init__(self, **kwargs):self.spark = Noneself.init_spark(**kwargs)def __str__(self):class_usage = "Create a spark instance."return class_usagedef init_spark(self,spark_home: str,pyspark_python: str,serializer_bf: int = 1000,serializer_bf_max: int = 2000):"""Initializing spark object."""os.environ['SPARK_HOME'] = spark_homeos.environ['PYSPARK_PYTHON'] = pyspark_python # spark集群Python的位置spark = (SparkSession.builder.master('yarn').appName('JW_project').config("spark.sql.execution.arrow.enabled", "true").config("spark.debug.maxToStringFields", "9999").config('spark.executor.memory', '16g').config('spark.driver.memory', '32g').config('spark.driver.maxResultSize', '8g').config('yarn.nodemanager.vmem-check-enabled', 'false').config('spark.rpc.message.maxSize', '1000').config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config('spark.kryoserializer.buffer.max', f'{serializer_bf_max}m').config('spark.kryoserializer.buffer', f'{serializer_bf}m').config("spark.sql.session.timeZone", "Asia/Shanghai").enableHiveSupport().getOrCreate())spark.conf.set("spark.sql.execution.arrow.enabled", "true")if not self.spark:self.spark = sparkreturn sparkdef query_table(self,db: DataTable,columns: List = None,is_loading_view: bool = False,conditions: List = None,limit: int = None,checking_data: bool = False) -> pyspark.sql.DataFrame:"""query tablesdb: DataTable objectcolumns: query columnsis_loading_view: is load corresponding view or notconditions: where conditionslimit: limit conditionschecking_data: checking the existence of data"""object_db_io = DataTableIO(db)query = object_db_io.table_query_sql(columns=columns, is_loading_view=is_loading_view, conditions=conditions, limit=limit)return_data_frame = self.spark.sql(query)if checking_data and not return_data_frame.count():raise ValueError("Attention ! No data exists !")return return_data_framedef insert_daily_partition_table(self,db: DataTable,object_data_frame: pyspark.sql.DataFrame,partition_day: str = None,is_create_view: bool = True):"""Insert or rewrite table which is partition table and partitioned by date."""# Detect table existencedb_name = db.db_nametable_name = db.table_nameself.spark.sql(f"use {db_name}")table_name_list = self.spark.sql("show tables")num = table_name_list.where(f.col("tableName") == table_name).count()# Create or insert partition tableobject_db_io = DataTableIO(db)if partition_day is None:partition_day = datetime.now().strftime('%Y-%m-%d')if num:print(f" Insert or Rewrite Table: d = {partition_day} ".center(50, "-"))temp_view_name = 'jw_object_data_frame_tmp_table666'object_data_frame.createOrReplaceTempView(temp_view_name)db_insert_sql = object_db_io.insert_or_overwrite_daily_partition_table_sql(partition_day=partition_day, temp_view_name=temp_view_name)self.spark.sql(db_insert_sql)self.spark.catalog.dropTempView(temp_view_name)else:raise ValueError(f"{db_name}.{table_name} should be created online first !")# Create or update viewif is_create_view:db_create_or_update_view_sql = object_db_io.create_or_update_view_sql(partition_day=partition_day)self.spark.sql(db_create_or_update_view_sql)def insert_normal_table(self,db: DataTable,object_data_frame: pyspark.sql.DataFrame):""" Overwrite normal table. """# Detect table existencedb_name = db.db_nametable_name = db.table_nameself.spark.sql(f"use {db_name}")table_name_list = self.spark.sql("show tables")num = table_name_list.where(f.col("tableName") == table_name).count()object_db_io = DataTableIO(db)if num:print(f" Overwrite Normal Table: d = {db_name}.{table_name} ".center(50, "-"))temp_view_name = 'jw_object_data_frame_tmp_table666'object_data_frame.createOrReplaceTempView(temp_view_name)db_insert_sql = object_db_io.overwrite_normal_table_sql(temp_view_name=temp_view_name)self.spark.sql(db_insert_sql)self.spark.catalog.dropTempView(temp_view_name)else:raise ValueError(f"{db_name}.{table_name} should be created online first !")if __name__ == '__main__':# 载入存储在const模块中常量信息from common.const import SPARK_HOME, PYSPARK_PYTHO# 实例化一个 SparkHelper 对象(单例)spark_helper = SparkHelper(spark_home=SPARK_HOME, pyspark_python=PYSPARK_PYTHON)
- 版权声明 -
文章版权属于本文作者
若有侵权,请联系本公众号删除或修改~
如有问题,欢迎留言~




