暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

基于PySpark的Python项目常用工具模块设计​

JW的随笔 2021-09-04
589

        在日常的Python项目开发中,环境搭建和基础搭建总是相对固化的。对于数据相关从业者来说,基于PySpark的Python项目开发是最基础的一种工作项目场景。针对这一场景,我将分享一个简易的常用工具模块搭建流程思路,并将涉及的相关知识要点进行简要讲解。



一、常用工具模块设计
        在基于PySpark的Python项目中,往往都涉及到了大量的hive表读取、创建等相关表操作。因此,这个常用工具模块需要包含以下 3 类内容:
  • Spark实例搭建及对应常用方法;

  • 表操作:读取hive表、插入或更新hive分区表数据、创建或更新视图。

  • 其他:单例模式实现、日志设置。

        注:在工作中,建表和删除表一般都不建议通过代码实现,因此本文不设计对应方法。

        因此,这个工具模块(common)可包含5个脚本,对应功能分别如下:
  • base:存放基本类,包括单例类、表类;

  • constant:存放项目名称、集群信息等相关常量信息;

  • db_io:存放数据处理相关类或方法,包括 spark方法类、表操作类;

  • db_setting:存放项目用到的表信息,生成表实例。

  • utils:存放项目常用公共方法,包括日志设置。




二、代码和知识要点讲解

2.1 日志设置

        使用 loguru 第三方库中已经封装好的logger类的实例,就能非常方便的进行日志配置了。

        logger实例默认采用 sys.stderr 标准错误输出,将日志输出到控制台中。当仅需要将日志输出到控制台时,可直接使用logger实例的默认配置(当然也可以自定义);当需要将日志信息输出到日志文件中时,我们对其进行了一些简单的参数配置,实现代码如下:

    import os


    from datetime import datetime
    from loguru import logger




    def my_logger(log_dir: str = '', log_file_prefix='JW_LOGGER'):
    """
    log_dir: project directory path
    log_file_prefix: filename prefix
    """
    logger_object = logger
    if log_dir:
    time_log = datetime.now().strftime('%Y%m%d')
    log_file_name = os.path.join(log_dir, f"{log_file_prefix}_{time_log}.log")
    dir_format = "{time:YYYY-MM-DD HH:mm:ss} | {level} | {file}:{function}:{module}:{name}:{line} {message}"
    logger_object .add(
    log_file_name, # file path
    format=dir_format, # log information format
    level="INFO", # log level of output
    rotation='50 MB', # A condition indicating whenever the current logged file should be closed and a new one started.
    backtrace=True, # Whether the exception trace formatted should be extended upward, beyond the catching point, to show the full stacktrace which generated the error.
    encoding='UTF-8'
    )
        return logger_object 


    if __name__ == '__main__':
    my_logger(log_dir='')

    add() 方法更多的参数设置可见:loguru.logger — loguru documentation




    2.2 单例设计模式

            单例设计模式的目的是让创建的对象在系统中只有唯一的一个实例。

            在Python中,使用元类实现单例模式是一种效果比较好的解决方案;不过,在多线程并发情况下,仅仅使用基于元类的单例模式得到的有可能不是单例,所以,可以添加双重校验锁,避免生成多个对象实例。实现代码如下:

      import threading


      from loguru import logger




      class Singleton(type):
      """Base class Singleton whose subclass should only have one instance."""


      _instance_lock = threading.Lock()


      def __init__(cls, *args, **kwargs):
      cls._instance = None
      super().__init__(*args, **kwargs)


      def __call__(cls, *args, **kwargs):
      # 第一次校验:判断单例是否已生成;避免单例已产生后,线程还要拿锁,浪费锁资源。
      if cls._instance is None:
      with Singleton._instance_lock:
      # 第二次校验:避免线程在等锁的过程中单例已产生
      if cls._instance is None:
      # 通过 super 函数调用父元类 type 的__call__方法完成实例化
      cls._instance = super().__call__(*args, **kwargs)
      else:
      # 在第一次校验时,若单例已经产生,打印对应警告信息
      abandoned_instance = super().__call__(*args, **kwargs)
      warning_message = (
      f"""
      \n{abandoned_instance.__str__()} could not be instantiated !
      An instance of class<{cls.__name__}> already exists ! \n
      """
      )
      logger.warning(warning_message)
      return cls._instance


      补充知识点:类实例化过程

              一个类在实例化的时候实际上是做了以下几件事情:

      • Step-1:直接触发元类中的__call__方法,它控制了类的__new____init__方法的调用;

      • Step-2:类的__new__方法是第二个调用的方法,会构造了一个空对象;

      • Step-3:类的__init__方法是最后调用的方法,会对这个空对象进行初始化;

      • Step-4:最后,元类的__call__方法会返回一个初始化好的类的实例对象。




      2.3 表类

              我们希望设计一个类,它的一个实例化对象能够存储一张hive表的所有字段信息以及相关的一些属性。

              在自定义数据操作时,使用Python的第三方包 dataclasses 能够让代码的可读性可好,操作更为方便:它允许我们将业务逻辑以类的形式进行抽象,支持在类中定义各种数据结果的组合,同时也支持类嵌套。在使用 dataclasse 包时,我们可以仅仅通过一个装饰器语法糖(@dataclass)就将一个普通的类(class)变为 dataclass类。在dataclass类中,我们需要显示的写出每个fields(类变量)的数据类型,同时,也可以提供一个默认值。

              在以下代码中,我们先定义了一个 DataColumn类,用于存储hive表每一字段的相关信息;然后,定义了 DataTable类,用于存储hive表的所有字段信息及其他相关属性;最后,展示了一个学生表(db.student_table)的实例化案例。

        from typing import AnyStr, List, Union
        from dataclasses import dataclass, field
        from pyspark.sql.types import DataType




        @dataclass
        class DataColumn:
        """
        col_name: column name
        col_alias_name: column alias name
        col_type: column type
        comment: column comment
        partitioned: is partition column or not
        """
        col_name: AnyStr
        col_alias_name: Union[AnyStr, None] = None
        col_type: Union[DataType, None] = None
        comment: Union[AnyStr] = ''
        partitioned: bool = False


        def __str__(self):
        class_usage = "wrapping columns feature."
        return class_usage




        @dataclass
        class DataTable:
        """
        db: database name
        table_name: table name
        columns: columns list
        is_create_mode: True which needs to create tables
        is_explicit_mode: True which needs to display the columns we need
        table_comment: table comments
        db_type: table type
        not_partition_cols: columns which are not partition columns.
        partition_cols: columns which are partition columns.
        """
        db_name: AnyStr
        table_name: AnyStr
        columns: List[DataColumn]
        is_need_create_mode: bool = False
        is_explicit_mode: bool = True
        table_comment: AnyStr = ""
        db_type: AnyStr = "hive"
        not_partition_cols: List[str] = field(default_factory=list)
        partition_cols: List[str] = field(default_factory=list)


        def __str__(self):
        class_usage = f"{self.db_type}: {self.db_name}.{self.table_name}"
        return class_usage


        def __post_init__(self):
        self.not_partition_cols = [x.col_name for x in self.columns if not x.partitioned]
        self.partition_cols = [x.col_name for x in self.columns if x.partitioned]
        if self.table_name.split('_')[0] == 'v':
        raise ValueError(f"DataTable Class object should be a table rather than a view !")
        if self.is_need_create_mode:
        if not self.is_explicit_mode:
        raise ValueError(f"In creating mode, explicit mode is must !")
        if [x for x in self.columns if x.col_type is None]:
        raise ValueError(f"In creating mode, columns type should not be empty!")
        if self.is_explicit_mode:
        if not self.columns:
        raise ValueError(f"In explicit mode, columns need one at least!")
        if len(self.columns) != len(set(self.not_partition_cols + self.partition_cols)):
        raise ValueError(f"In explicit mode, columns should not be duplicated!")


        if __name__ == '__main__':
        # 学生表(db.student_table)的实例化对象 StudentDB
        StudentDB = DataTable(
        db_name="db",
        table_name="student_table",
        is_explicit_mode=True,
        table_comment="学生分数表",
        columns=[
        DataColumn(col_name="student_id", comment="学生id"),
        DataColumn(col_name="student_name", comment="学生姓名"),
        DataColumn(col_name="subject", comment="学科"),
        DataColumn(col_name="score", comment="分数")
        ]
        )


        DataTable类变量补充说明:

        • is_create_mode: 当为True时,必须提供所有字段的数据类型;这样才能通过2.4小节中的表操作类的创建表方法(create_table_sql)生成对应的建表语句。

        • is_explicit_mode: 为True时,说明是将所需字段一一罗列展示;为False时,会使用 select * 语句,导入所有字段。




        2.4 表操作类

                我们将表操作相关的SQL语句整合成 表操作类(DataTableIO),并将2.3节中的 DataTable 对象作为它的一个属性,同时构建了基于传入的DataTable 对象的4个方法——hive表读取(table_query_sql)、插入或更新hive分区表数据(insert_or_overwrite_daily_partition_table_sql)、创建或更新视图create_or_update_view_sql)、重写普通表数据(overwrite_normal_table_sql)、创建表(create_table_sql,不用于代码直接创建表,自动生成建表语句,便于工作)。

                其中,hive表读取方法支持select语句(columns 参数)、where语句(conditions 参数)、limit语句(limit 参数),同时提供了 is_loading_view 参数,支持直接从表对应视图中读取数据。创建表方法会根据是否存在分区字段,自动判断生成 普通表(非分区表)建表语句 还是 分区表建表语句。

                代码实现如下:

          from datetime import datetime




          class DataTableIO(object):
          def __init__(self, db: DataTable):
          self.db = db


          def __str__(self):
          class_usage = "Generating sql code about db."
          return class_usage


          def table_query_sql(
          self,
          columns: List = None,
          is_loading_view: bool = False,
          conditions: List = None,
          limit: int = None
          ) -> str:
          """
          query tables sql
          columns: query columns
          is_loading_view: decide whether loading corresponding view
          conditions: where conditions
          limit: limit conditions
          """
          if columns:
          columns = columns
          elif self.db.columns:
          columns = [x.col_name.strip() for x in self.db.columns]
          else:
          columns = ['*']
          columns = ', '.join(columns)


          if is_loading_view:
          query = f"SELECT {columns} FROM {self.db.db_name}.v_{self.db.table_name}"
          else:
          query = f"SELECT {columns} FROM {self.db.db_name}.{self.db.table_name}"


          if conditions:
          if isinstance(conditions, str):
          conditions = [conditions]
          cond = " AND ".join(conditions)
          query = f"{query} WHERE {cond}"


          if limit:
          query = f"{query} LIMIT {limit}"


          return query


          def insert_or_overwrite_daily_partition_table_sql(self, partition_day: str = None, temp_view_name: str = '') -> str:
          """Insert or rewrite table which is partition table and partitioned by date."""
          not_par_col_name = [i.col_name.strip() for i in self.db.columns if not i.partitioned]
          not_par_alias_col_name = [
          i.col_alias_name if i.col_alias_name else i.col_name.strip()
          for i in self.db.columns if not i.partitioned
          ]
          column_text = []
          if partition_day is None:
          partition_day = datetime.now().strftime('%Y-%m-%d')
          if not temp_view_name:
          raise ValueError("Temp View Should register first !")


          for i in range(len(not_par_col_name)):
          column_text = column_text + [not_par_col_name[i] + " AS " + not_par_alias_col_name[i]]


          column_text = ', '.join(column_text)
          insert_sql = (
          f"""
          INSERT OVERWRITE TABLE {self.db.db_name}.{self.db.table_name} partition (d='{partition_day}')
          SELECT {column_text}
          FROM {temp_view_name}
          """
          )
          return insert_sql


          def create_or_update_view_sql(self, partition_day: str = None) -> str:
          """Create or update view."""
          if partition_day is None:
          partition_day = datetime.now().strftime('%Y-%m-%d')
          create_or_update_view_sql = (
          f"""
          create or replace view {self.db.db_name}.v_{self.db.table_name} as
          select * from {self.db.db_name}.{self.db.table_name} where d='{partition_day}'
          """
          )
          return create_or_update_view_sql


          def overwrite_normal_table_sql(self, temp_view_name: str = '') -> str:
          """Overwrite table which is not partition table."""
          not_par_col_name = [i.col_name.strip() for i in self.db.columns if not i.partitioned]
          not_par_alias_col_name = [
          i.col_alias_name if i.col_alias_name else i.col_name.strip()
          for i in self.db.columns if not i.partitioned
          ]
          column_text = []


          for i in range(len(not_par_col_name)):
          column_text = column_text + [not_par_col_name[i] + " AS " + not_par_alias_col_name[i]]


          column_text = ', '.join(column_text)
          overwrite_sql = (
          f"""
          INSERT OVERWRITE TABLE {self.db.db_name}.{self.db.table_name}
          SELECT {column_text}
          FROM {temp_view_name}
          """
          )
          return overwrite_sql


          def create_table_sql(self, is_partition_table: bool = True) -> str:
          """ Print create table sql codes. """
          if not self.db.is_need_create_mode:
          raise ValueError("Attention ! The DataTable object is not creating mode !")


          if self.db.is_explicit_mode:
          not_par_cols = [
          f"{x.col_name} {x.col_type.simpleString()} COMMENT '{x.comment}'"
          for x in self.db.columns if not x.partitioned
          ]
          not_par_str = ",\n\t\t\t\t\t".join(not_par_cols)
          par_cols = [
          f"{x.col_name} {x.col_type.simpleString()} COMMENT '{x.comment}'"
          for x in self.db.columns if x.partitioned
          ]
          if not is_partition_table or not par_cols:
          sql_code = (
          f"""
          USE {self.db.db_name};
          CREATE TABLE {self.db.table_name} (
          {not_par_str}
          )
          COMMENT '{self.db.table_comment}'
          STORED AS textfile;
          """
          )
          else:
          par_str = ",".join(par_cols)
          sql_code = (
          f"""
          USE {self.db.db_name};
          CREATE TABLE {self.db.table_name} (
          {not_par_str}
          )
          COMMENT '{self.db.table_comment}'
          PARTITIONED BY({par_str})
          STORED AS ORC;
          """
          )
          return sql_code
          else:
          raise ValueError("Attention ! The DataTable object should be explicit mode !")




          2.5 spark帮助类

                  基于2.2节中构建的单例类Singleton,我们可以构建了一个spark帮助类,保证在实例化的过程中,同一运行环境中仅存在一个SparkHelper对象。 

                  这个SparkHelper类主要提供了:一个封装好的初始化spark实例(init_spark)通过调用spark实例实现的 表查询方法(query_table)、插入/更新分区表数据方法(insert_daily_partition_table,仅支持以天为分区的分区表)以及 重写普通表(非分区表)数据方法(insert_normal_table)。其中,表查询方法(query_table)在2.5节中的hive表读取方法(table_query_sql)基础上,附加提供了checking_data参数,当为True时,会自动验证读取数据的记录数是否为0

                  代码实现如下:

            from datetime import datetime




            class SparkHelper(object, metaclass=Singleton):
            def __init__(self, **kwargs):
            self.spark = None
            self.init_spark(**kwargs)


            def __str__(self):
            class_usage = "Create a spark instance."
            return class_usage


            def init_spark(
            self,
            spark_home: str,
            pyspark_python: str,
            serializer_bf: int = 1000,
            serializer_bf_max: int = 2000
            ):
            """Initializing spark object."""
            os.environ['SPARK_HOME'] = spark_home
            os.environ['PYSPARK_PYTHON'] = pyspark_python # spark集群Python的位置
            spark = (
            SparkSession.builder
            .master('yarn')
            .appName('JW_project')
            .config("spark.sql.execution.arrow.enabled", "true")
            .config("spark.debug.maxToStringFields", "9999")
            .config('spark.executor.memory', '16g')
            .config('spark.driver.memory', '32g')
            .config('spark.driver.maxResultSize', '8g')
            .config('yarn.nodemanager.vmem-check-enabled', 'false')
            .config('spark.rpc.message.maxSize', '1000')
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config('spark.kryoserializer.buffer.max', f'{serializer_bf_max}m')
            .config('spark.kryoserializer.buffer', f'{serializer_bf}m')
            .config("spark.sql.session.timeZone", "Asia/Shanghai")
            .enableHiveSupport()
            .getOrCreate()
            )
            spark.conf.set("spark.sql.execution.arrow.enabled", "true")
            if not self.spark:
            self.spark = spark
            return spark


            def query_table(
            self,
            db: DataTable,
            columns: List = None,
            is_loading_view: bool = False,
            conditions: List = None,
            limit: int = None,
            checking_data: bool = False
            ) -> pyspark.sql.DataFrame:
            """
            query tables
            db: DataTable object
            columns: query columns
            is_loading_view: is load corresponding view or not
            conditions: where conditions
            limit: limit conditions
            checking_data: checking the existence of data
            """
            object_db_io = DataTableIO(db)
            query = object_db_io.table_query_sql(
            columns=columns, is_loading_view=is_loading_view, conditions=conditions, limit=limit
            )
            return_data_frame = self.spark.sql(query)


            if checking_data and not return_data_frame.count():
            raise ValueError("Attention ! No data exists !")


            return return_data_frame


            def insert_daily_partition_table(
            self,
            db: DataTable,
            object_data_frame: pyspark.sql.DataFrame,
            partition_day: str = None,
            is_create_view: bool = True
            ):
            """Insert or rewrite table which is partition table and partitioned by date."""
            # Detect table existence
            db_name = db.db_name
            table_name = db.table_name
            self.spark.sql(f"use {db_name}")
            table_name_list = self.spark.sql("show tables")
            num = table_name_list.where(f.col("tableName") == table_name).count()


            # Create or insert partition table
            object_db_io = DataTableIO(db)
            if partition_day is None:
            partition_day = datetime.now().strftime('%Y-%m-%d')
            if num:
            print(f" Insert or Rewrite Table: d = {partition_day} ".center(50, "-"))
            temp_view_name = 'jw_object_data_frame_tmp_table666'
            object_data_frame.createOrReplaceTempView(temp_view_name)
            db_insert_sql = object_db_io.insert_or_overwrite_daily_partition_table_sql(
            partition_day=partition_day, temp_view_name=temp_view_name
            )
            self.spark.sql(db_insert_sql)
            self.spark.catalog.dropTempView(temp_view_name)
            else:
            raise ValueError(f"{db_name}.{table_name} should be created online first !")


            # Create or update view
            if is_create_view:
            db_create_or_update_view_sql = object_db_io.create_or_update_view_sql(partition_day=partition_day)
            self.spark.sql(db_create_or_update_view_sql)


            def insert_normal_table(
            self,
            db: DataTable,
            object_data_frame: pyspark.sql.DataFrame
            ):
            """ Overwrite normal table. """
            # Detect table existence
            db_name = db.db_name
            table_name = db.table_name
            self.spark.sql(f"use {db_name}")
            table_name_list = self.spark.sql("show tables")
            num = table_name_list.where(f.col("tableName") == table_name).count()


            object_db_io = DataTableIO(db)
            if num:
            print(f" Overwrite Normal Table: d = {db_name}.{table_name} ".center(50, "-"))
            temp_view_name = 'jw_object_data_frame_tmp_table666'
            object_data_frame.createOrReplaceTempView(temp_view_name)
            db_insert_sql = object_db_io.overwrite_normal_table_sql(temp_view_name=temp_view_name)
            self.spark.sql(db_insert_sql)
            self.spark.catalog.dropTempView(temp_view_name)
            else:
            raise ValueError(f"{db_name}.{table_name} should be created online first !")






            if __name__ == '__main__':
            # 载入存储在const模块中常量信息
            from common.const import SPARK_HOME, PYSPARK_PYTHO


            # 实例化一个 SparkHelper 对象(单例)
            spark_helper = SparkHelper(spark_home=SPARK_HOME, pyspark_python=PYSPARK_PYTHON)








            - 版权声明 - 

            文章版权属于本文作者

            若有侵权,请联系本公众号删除或修改~

            如有问题,欢迎留言~


            文章转载自JW的随笔,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论