暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

博客 | 使用 CDSW 和运营数据库构建 ML 应用2:查询/加载数据

Cloudera中国 2021-02-26
273
点击Cloudera中国 即可订阅!

本文转载自大数据杂货铺


在本期中,我们将讨论如何执行“获取/扫描”操作以及如何使用 PySpark SQL。之后,我们将讨论批量操作,然后再讨论一些故障排除错误。在这里阅读第一个博客

Get/Scan 操作

· 使用目录

在此示例中,让我们加载在第1部分的“放置操作”中创建的表“ tblEmployee”。我使用相同的目录来加载该表。 

    from pyspark.sql import SparkSession




    spark = SparkSession \
    .builder \
    .appName("SampleApplication") \
    .getOrCreate()




    tableCatalog = ''.join("""{
    "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
    "rowkey":"key",
    "columns":{
    "key":{"cf":"rowkey", "col":"key", "type":"int"},
    "empId":{"cf":"personal","col":"empId","type":"string"},
    "empName":{"cf":"personal", "col":"empName", "type":"string"},
    "empState":{"cf":"personal", "col":"empState", "type":"string"}
    }
    }""".split())




    table = spark.read.format("org.apache.hadoop.hbase.spark") \
    .options(catalog=tableCatalog) \
    .option("hbase.spark.use.hbasecontext", False) \
    .load()




    table.show()

    执行 table.show()将为您提供:

    此外,您可以编辑目录,在其中可以省略一些不需要的列。例如,如果只需要“ tblEmployee”表的“ key”和“ empName”列,则可以在下面创建目录。如果您用上面的示例替换上面示例中的目录,table.show()将显示仅包含这两列的 PySpark Dataframe。

      tableCatalog = ''.join("""{
      "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
      "rowkey":"key",
      "columns":{
      "key":{"cf":"rowkey", "col":"key", "type":"int"},
      "empName":{"cf":"personal", "col":"empName", "type":"string"}
      }
      }""".split())

      执行 table.show()将为您提供:


      · 使用 hbase.columns.mapping

      同样,我们可以使用 hbase.columns.mapping 将 HBase 表加载到 PySpark 数据帧中。让我们尝试使用此方法加载“ tblEmployee”

      从 pyspark.sql 导入 SparkSession

        spark = SparkSession \
        .builder \
        .appName("SampleApplication") \
        .getOrCreate()




        df = spark.read.format("org.apache.hadoop.hbase.spark") \
        .option("hbase.columns.mapping",
        "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \
        .option("hbase.table", "tblEmployee") \
        .option("hbase.spark.use.hbasecontext", False) \
        .load()




        df.show()

        执行 df.show()将为您提供:

        · 使用 PySpark 的 Spark SQL

        使用 PySpark SQL 是在 Python 中执行 HBase 读取操作的最简单、最佳方法。使用 PySpark SQL,可以创建一个临时表,该表将直接在 HBase 表上运行 SQL 查询。但是,要执行此操作,我们需要在从 HBase 加载的 PySpark 数据框上创建视图。让我们从上面的“ hbase.column.mappings”示例中加载的数据帧开始。此代码段显示了如何定义视图并在该视图上运行查询。 

          df.createOrReplaceTempView("personView")
          result = spark.sql("SELECT * FROM personView") # SQL Query
          result.show()

          执行 result.show()将为您提供:

          使用视图的最大优势之一是查询将反映 HBase 表中的更新数据,因此不必每次都重新定义和重新加载 df 即可获取更新值。视图本质上是针对依赖 HBase 的最新数据的用例。

          如果您执行读取操作并在不使用 View 的情况下显示结果,则结果不会自动更新,因此您应该再次 load()以获得最新结果。

          下面是一个演示此示例。首先,将2行添加到 HBase 表中,并将该表加载到 PySpark DataFrame 中并显示在工作台中。然后,我们再写2行并再次运行查询,工作台将显示所有4行。 

            from pyspark.sql import Row
            from pyspark.sql import SparkSession




            spark = SparkSession \
            .builder \
            .appName("PySparkSQLExample") \
            .getOrCreate()

            # 目录

              tableCatalog = ''.join("""{
              "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
              "rowkey":"key",
              "columns":{
              "key":{"cf":"rowkey", "col":"key", "type":"int"},
              "empId":{"cf":"personal","col":"empId","type":"string"},
              "empName":{"cf":"personal", "col":"empName", "type":"string"},
              "empState":{"cf":"personal", "col":"empState", "type":"string"}
              }
              }""".split())

              #添加前2行

                employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
                employeeRDD = spark.sparkContext.parallelize(employee)
                employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
                employeeDF = spark.createDataFrame(employeeMap)




                employeeDF.write.format("org.apache.hadoop.hbase.spark") \
                .options(catalog=tableCatalog, newTable=5) \
                .option("hbase.spark.use.hbasecontext", False) \
                .save()




                df = spark.read.format("org.apache.hadoop.hbase.spark") \
                .options(catalog=tableCatalog) \
                .option("hbase.spark.use.hbasecontext", False) \
                .load()




                df.createOrReplaceTempView("sampleView")
                result = spark.sql("SELECT * FROM sampleView")




                print("The PySpark DataFrame with only the first 2 rows")
                result.show()

                #再添加2行

                  employee = [(11, 'bobG', 'Bob Graham', 'TX'), (12, 'manasC', 'Manas Chakka', 'GA')]
                  employeeRDD = spark.sparkContext.parallelize(employee)
                  employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
                  employeeDF = spark.createDataFrame(employeeMap)




                  employeeDF.write.format("org.apache.hadoop.hbase.spark") \
                  .options(catalog=tableCatalog, newTable=5) \
                  .option("hbase.spark.use.hbasecontext", False) \
                  .save()
                  # Notice here I didn't reload "df" before doing result.show() again
                  print("The PySpark Dataframe immediately after writing 2 more rows")
                  result.show()

                  这是此代码示例的输出:


                  批量操作

                  使用 PySpark 时,您可能会遇到性能限制,可以通过并行操作来缓解这些限制。HBase 通过批量操作实现了这一点,并且使用 Scala 和 Java 编写的 Spark 程序支持 HBase。有关使用 Scala 或 Java 进行这些操作的更多信息,请查看此链接

                  https://hbase.apache.org/book.html#_basic_spark。

                  但是,PySpark 对这些操作的支持受到限制。通过访问 JVM,可以创建 HBase 配置和 Java HBase 上下文对象。下面是显示如何创建这些对象的示例。

                  当前,存在通过这些 Java 对象支持批量操作的未解决问题。

                  https://issues.apache.org/jira/browse/HBASE-24829

                  故障排除

                  —辅助节点中的 Python 版本与驱动程序不同

                  例外:worker  中的Python 版本与驱动程序3.6中的版本不同,PySpark 无法使用其他次要版本运行

                  如果未设置环境变量PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON 或不正确,则会发生此错误。请参考上面的配置步骤,并确保在群集的每个节点上都安装了 Python,并将环境变量正确设置为正确的路径。

                  — Py4J 错误

                  AttributeError:“ SparkContext”对象没有属性“ _get_object_id”

                  尝试通过 JVM 显式访问某些 Java Scala 对象时,即“ sparkContext._jvm”,可能会出现此错误。已提交 JIRA 来解决此类问题,但请参考本文中提到的受支持的方法来访问 HBase 表

                  https://issues.apache.org/jira/browse/HBASE-24828

                  —找不到数据源“ org.apache.hbase.spark”

                  java.lang.ClassNotFoundException:无法找到数据源:org.apache.hadoop.hbase.spark。请在

                  http://spark.apache.org/third-party-projects.html中找到软件包。

                  如果 Spark 驱动程序和执行程序看不到 jar,则会出现此错误。确保根据选择的部署(CDSW与spark-shell submit)为运行时提供正确的 jar。

                  结论

                  PySpark 现在可用于转换和访问 HBase 中的数据。对于那些只喜欢使用 Python 的人,这里以及使用 PySpark 和 Apache HBase,第1部分中提到的方法将使您轻松使用 PySpark 和 HBase。 

                  查看这些链接以开始使用 CDP DH 集群,并在 CDSW 中自己尝试以下示例:Cloudera Data Hub Cloudera Data Science Workbench(CDSW)作为 PySpark 更高级用法的一部分,请单击此处以了解第3部分,以了解 PySpark 模型的方式可以与 HBase 数据一起构建,评分和提供服务。


                  原文作者:Manas Chakka

                  原文链接:https://blog.cloudera.com/building-a-machine-learning-application-with-cloudera-data-science-workbench-and-operational-database-part-2-querying-loading-data/


                  Cloudera中国

                  更多资讯,点击阅读原文

                  长按扫码关注我们


                  文章转载自Cloudera中国,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论