在本章中,你将学习如何使用dataframe。你将了解到dataframe在Spark应用中是如此重要,因为它通过一个Schema定义了类型化的数据,并提供了一个强大的API。正如你在前面的章节中看到的,Spark是一个了不起的分布式分析引擎。维基百科将操作系统(OS)定义为 "管理计算机硬件和软件资源的系统软件,并为计算机程序提供通用服务"。在第1章中,我甚至将Spark定性为操作系统,因为它提供了构建应用程序和管理资源所需的所有服务。要以编程的方式使用Spark,你需要了解它的一些关键API。为了执行分析和数据操作,Spark需要存储,包括逻辑存储(在应用层面)和物理存储(在硬件层面)。在逻辑层面,最喜欢的存储容器是dataframe,这是一种类似于关系型数据库世界中表的数据结构。在本章中,你将深入了解dataframe的结构,并学习如何通过其API使用dataframe。转换是你对数据进行的操作,例如从日期中提取年份,合并两个字段,对数据进行归一化等等。在本章中,您将学习如何使用dataframe的特定函数来执行转换,以及直接附加到dataframe API 的方法。你将通过使用类似 SQL union 的操作将两个dataframe合并成一个。你还会看到dataset和dataframe之间的区别,以及如何从一个dataset到另一个dataframe。最后,你会看到弹性分布式数据集(RDD),它是Spark中的第一代存储。dataframe是建立在RDD概念之上的,你可能会在讨论和项目中遇到RDD。在本章的最后,您将在两个dataframe中摄取两个文件,修改它们的模式使其匹配,并将结果联合起来。当你完成这些操作时,你将看到Spark是如何处理存储的。在不同的步骤中,您将检查dataframe。
LAB 本章的实例可以在GitHub上找到,网址是:https://github.com/jgperrin/net.jgp.books.spark.ch03
dataframe在Spark中的重要作用
在本节中,你将学习什么是dataframe以及它是如何组织的。你还将学习到不可更改性。一个dataframe既是一个数据结构,也是一个API,如图3.1所示。
Spark的dataframe API在Spark SQL、Spark Streaming、MLlib(用于机器学习)和GraphX中使用(在Spark中操作基于图的数据结构)。使用这个统一的API可以极大地简化对这些技术的访问。你将不必为每个子库学习一个API。将dataframe描述为雄伟的dataframe可能很奇怪,但这个限定词非常适合它。就像雄伟的艺术品吸引着人们的好奇心,雄伟的橡树主宰着森林,雄伟的城墙保护着城堡一样,dataframe在Spark的世界里也是雄伟的。
dataframe的组织过程
在本节中,你将学习dataframe是如何组织数据的。一个dataframe是一组记录,组织成命名的列。它相当于关系型数据库中的一个表或Java中的ResultSet。图3.2展示了一个dataframe。
dataframe可以从广泛的数据源中构建,如文件、数据库或自定义数据源。dataframe的关键概念是它的API,它在Java、Python、Scala和R中都可以使用,在Java中,dataframe由Row形式的dataset来表示。dataset根据Spark目前的策略,可以存储在内存中,也可以存储在磁盘上,但它会尽可能多地使用内存。dataframe以StructType的形式包含了Schema,它可以用于约束数据内容。dataframe还包括一个printSchema()方法,可以更快速地调试你的dataframe。有了足够的理论--让我们来实践吧。
不变不是一句脏话
Dataframe以及dataset和RDD(在3.4节中讨论),被认为是不可改变的存储。不可变性被定义为不可改变。当应用于一个对象时,它意味着它的状态在创建后不能被修改。我认为这个术语是反直觉的。当我刚开始使用Spark时,我很难接受这个概念。让我们使用这个为数据处理而设计的出色技术但数据是不可改变的。你希望我处理数据,但它不能改变?图3.3给出了一个解释:在最初的状态下,数据是不可变的;然后你开始修改它,但Spark只存储了你转换的步骤,而不是转换后数据的每一步。让我重新表述一下。Spark以不可改变的方式,存储数据的初始状态,然后保留转换的列表。中间的数据不被存储。第4章会更深入地挖掘转换的内容。
当你增加节点时,原因就变得容易理解了。图3.3展示的是一个典型的Spark流程,只有一个节点,而图3.4展示的是更多节点。当你以分布式的方式思考时,不可更改性就变得非常重要。在存储方面,你有两种选择。Spark使用的是第二种解决方案,因为在每个节点上同步一个转换列表比同步所有数据要快。第4章介绍了通过Catalyst 'kæt(ə)lɪst/ 进行优化。Catalyst是Spark处理中负责优化的酷小孩。不变性和转换列表是这个优化引擎的基石。虽然不可变性被Spark出色地用作优化数据处理的基础,但在你开发应用时,你将不必考虑太多。Spark就像任何一个好的操作系统一样,将为你处理资源。
通过实例使用dataframe
没有什么比一个小例子更适合开始了。你摄取了第1章和第2章的文件。但之后会发生什么呢?在本节中,你将执行两个简单的摄取。然后,你将研究它们的schema和存储,以便了解dataframe在应用程序中使用时的行为。第一个摄取是北卡罗来纳州维克县的餐馆列表。第二个数据集由北卡罗来纳州达勒姆县的餐馆组成。然后,您将转换数据集,以便您可以通过联合来合并它们。这些都是你作为Spark开发者将要执行的关键操作,所以了解它们背后的原理将为你提供所需的基础。图3.5说明了这个过程。
Union操作后的目标(也是最终)dataframe,在两次转换后需要有相同的schema,如图3.6所示。
一个简单的CSV摄取后的dataframe
在本节中,您将首先摄取数据,然后您将查看dataframe中的数据,以了解schema。这个过程是你理解Spark工作方式的重要一步。这个例子的目标是规范化一个数据集,使它符合特定的标准,就像你刚才在图3.6中看到的那样。我打赌你喜欢去餐馆。也许不是每天都去,也许不是每一种,但你们每个人都有偏好:食物的类型、离家的距离、公司、噪音水平等等。Yelp或OpenTable等网站有丰富的数据集,但我们还是来探讨一些开放的数据。图3.7说明了这个例子中的过程。
你的第一个数据集来自北卡罗来纳州的Wake县,网址是http://mng.bz/5AM7。它包含了该县的餐馆列表。这些数据可以直接从 http://mng.bz/Jz2P 下载。现在您将进行dataframe的摄取和转换,使其与输出相匹配(通过重命名和丢弃列);然后您将对数据分区进行范围划分。当你在摄取和转换数据时,你还会统计记录的数量。图3.8说明了映射的情况。
LAB 你可以从GitHub上下载代码:https://github.com/jgperrin/net.jgp.books.spark.ch03。这是包net.jgp.books.spark.ch03中的实验室#200.lab200_ingestion_schema_manipulation。
您要实现的可视化结果是一个餐厅列表,与图3.8中定义的映射相匹配。

因为分布在多行的记录有点难读,我把记录作为截图添加到图3.9中。

要显示这些数据集(也就是dataframe),你的代码会像下面这样:

到目前为止,这种摄取与第1章摄取简单的书单,以及第2章摄取作者名单的方法类似。摄取的方式总是一样的,第7、8、9章提供了进一步的细节。让我们更深入地了解一下dataframe。你可以通过使用printSchema()将模式打印到标准输出(stdout)。结果如下。

附录H提供了更多关于类型的细节。您的简单调用如下。
df.printSchema();
有一个简单的方法来计算你的dataframe中的记录数。说你想显示这个。We have 3440 records.
你只需使用以下方法:System.out.println("We have "+ df.count() + " records.");
本节的目标是让你合并两个dataframe,就像你可以执行两个表的SQL联合一样。为了使联合有效,你需要在两个dataframe中使用类似命名的列。为了达到这个目的,你可以很容易地想象,你的第一个数据集的schema也被修改了。这就是它的样子。
让我们来走一遍转换的过程。注意方法链的使用。正如第2章所定义的那样,Java API可以使用方法链,如SparkSession.builder().appName(...).master(...).getOrCreate(),而不是在每一步都创建一个对象并将其传递给下一个操作。你将使用dataframe的四个方法和两个静态函数。你可能很熟悉静态函数:它们是那些被 "分组 "在一个类中的函数,但不需要实例化该类。方法很容易理解:它们被附加到对象本身。当你直接对列中的值进行操作时,静态函数是很有用的。在阅读本书的过程中,你会看到越来越多地使用这些静态函数,在第13章和附录G中会有更详细的描述。如果你没有找到一个能完成你想要的功能的函数(例如,一个特定的转换形成或对你可能拥有的现有库的调用),你可以编写自己的函数。这些函数被称为用户定义函数(UDF),你将在第16章学习。现在让我们看看你需要的方法和函数。- withColumn() 方法-从一个表达式或一个列中创建一个新的列。
- withColumnRenamed() 方法-重命名一个列。
- withColumnRenamed()方法--从列名中获取一个列。有些方法会使用列名作为参数,有些方法需要一个Column对象。
- drop()方法--从数据框架中删除一列。这个方法接受一个列对象的实例或列名。
- lit()函数-创建一个带有值的列;字面意思是一个文本值。

你可能需要为每条记录提供一个唯一的标识符。你可以调用这个列id,并通过连接以下内容来建立它:
国家
一个下划线(_)
县
一个下划线(_)
数据集中的标识符
代码是这样的:
df = df.withColumn("id", concat( df.col("state"), lit("_"), df.col("county"), lit("_"), df.col("datasetId")));
最后,你可以显示五条记录并打印模式:
System.out.println("*** Dataframe transformed");
df.show(5);
df.printSchema();
数据是存储在分区中的
现在你已经加载了数据,你可以看到数据的存储位置。这将向你展示Spark内部是如何存储数据的。数据不是物理地存储在dataframe中,而是存储在分区中,如图3.1和简化的图3.10所示。分区不能直接从dataframe中访问,您需要通过RDD查看分区。你将在稍后的 3.4 节中了解更多关于 RDD的信息。
分区会被创建,数据会根据你的基础设施(节点数量和数据集的大小)自动分配到每个分区。由于数据集的大小和我使用的笔记本电脑,在这个方案中只使用一个分区。你可以通过下面的代码找出你有多少个分区。
你可以通过使用repartition()方法对dataframe进行重新分区,以使用四个分区。重新分区可以提高性能。
df = df.repartition(4);
System.out.println("Partition count after repartition: "+
df.rdd().partitions().length);
探究Schema
在上一节中,你学习了通过使用printSchema()来查看Schema。了解数据的结构,特别是Spark如何看待数据是很重要的。你可以通过调用schema()方法了解更多关于schema的细节。查看net.jgp.books.spark.ch03.lab210_schema _introspection包中的SchemaIntrospectionApp,可以了解schema()使用的细节。为了简化阅读,我把下一个lab的输出-放限制在每种情况下的前三个字段。比如说你想输出以下内容。*** Schemaas a tree:
root
|-- OBJECTID: string(nullable = true)
|-- datasetId: string(nullable = true)
|-- name: string(nullable = true)
...
你可以像之前那样使用数据框架的printSchema()方法,或者使用StructType的printTreeString()方法。

你也可以将模式显示为一个简单的字符串。
***** Schema as string: StructField(OBJECTID,StringType,true)StructField(datasetId,StringType,true)StructField(name,StringType,true)...
要做到这一点,你使用以下代码。
而且你甚至可以将模式显示为JSON结构。

你可以使用以下代码。
更多高级的schema操作,你将在第17章看到。
JSON数据摄取到dataframe后的结构
JSON文件可能比CSV更复杂一些,因为它们的嵌套结构。你要做的实验室与之前类似,但这次餐厅数据的来源是一个JSON文件。本节主要介绍与之前实验室的不同之处,并假设你已经阅读过。使用Spark,你将读取一个JSON文件,其中包含的餐厅数据与3.2.1节中的数据集结构相似。你将对摄入的数据进行转换,以匹配之前数据集的转换结构。你这样做是为了让你可以通过union合并它们。图3.11说明了这个过程的这一部分。
您的第二个数据集来自北卡罗来纳州的另一个县,达勒姆。达勒姆县是威克的邻居,其数据集可以在https://live-durhamnc.opendata.arcgis.com/ 找到。
LAB 你可以从GitHub上下载代码:https://github.com/jgperrin/net.jgp.books.spark.ch03。这是包net.jgp.books.spark.ch03中的实验室#220.lab220jsoningestionschemamanipulation。
因为JSON比CSV更难可视化,所以接下来的列表显示了一个只有两家餐厅的数据集节选。JSON绝对更啰嗦,不是吗?我删除了第二条记录的一些字段。
和CSV数据集一样,让我们来走一遍JSON转换。第一部分是JSON摄取,将产生以下内容(以及图3.12)。

Dataframe包含嵌套的字段和数组。使用show()方法是有用的,但结果不是很好读。对应的schema会给你带来更多的信息。
当然,这个schema tree的结构与清单3.1中的JSON文档的结构类似。而且这个结构现在看起来肯定比CSV文件更像一棵树。制作这个的代码也与摄取和转换CSV数据集的代码类似。
SpaSparkSession spark = SparkSession.builder().appName("Restaurants in Durham County, NC") .master("local").getOrCreate();
Dataset<Row> df = spark.read().format("json").load("data/Restaurants_in_Durham_County_NC.json");
System.out.println("*** Right after ingestion");
df.show(5);df.printSchema();
一旦数据在dataframe中,操作数据的API是一样的。你可以开始转换dataframe了。你的目标结构是扁平的,所以映射(如图 3.13 所示)必须包含嵌套字段。
下面是你将产生的内容(图3.14以截图形式显示了dataframe的内容)。

其嵌套结构的schema如下:

要访问结构中的字段,可以在路径中使用点(.)符号。要访问一个数组中的元素,可以使用getItem()方法。下面是实际操作的代码。

就像你创建了所有的字段和列一样,创建id字段的操作和你对CSV文件的操作是一样的:
df = df.withColumn("id", concat(df.col("state"), lit("_"),
df.col("county"), lit("_"),
df.col("datasetId")));
System.out.println("*** Dataframe transformed");
df.show(5);
最后,看分区也是同样,就像你的CSV文件一样。

这里是代码:
System.out.println("*** Looking at partitions");
Partition[] partitions = df.rdd().partitions();
int partitionCount = partitions.length;
System.out.println("Partition count before repartition: " + partitionCount);
df = df.repartition(4);
System.out.println("Partition count after repartition: " + df.rdd().partitions().length);
现在你有了两个dataframe,具有相同的列数据集。下一步是将它们组合起来。
合并两个dataframe
在本节中,您将学习如何在类似于SQL的union来结合两个数据集,以建立一个更大的数据集。这将允许你对更多的数据点进行分析。在上一节中,你摄取了两个数据集,对它们进行了转换,并对它们进行了分析。与关系数据库中的表一样,你可以在它们之间进行很多操作:关联它们、组合它们等等。现在,你要把两个数据集合并起来,这样以后就可以对合并后的数据集进行分析。图3.15说明了这个过程的细节。
LAB 你可以从GitHub上下载代码:https://github.com/jgperrin/net.jgp.books.spark.ch03。这是包net.jgp.books.spark.ch03.lab230dataframeunion中的实验室#230。
你当然可以想象,你可以重用你为摄取和转换所写的大部分代码。然而,要执行union,你必须确保schema是严格相同的。否则,Spark将无法执行union。图 3.16 展示了你要做的映射。
您的应用程序的最终输出如下(图3.17为完整的截图)。

与图3.17对应的模式如下。
我们先来看看代码。导入是一样的。为了更简单一些,SparkSession的实例是一个私有成员,在start()方法中初始化。代码的其余部分被隔离在三个方法中。buildWakeRestaurantsDataframe() 构建包含Wake县餐馆的dataframe
buildDurhamRestaurantsDataframe()建立包含Durham县餐馆的dataframe。
combineDataframes()通过使用类似SQL的union来合并两个dataframe。现在,不用担心生成的dataframe的内存使用问题。在第4章,你会看到dataframe是自优化的。

这是最简单的部分,对吧?让我们来分析一下这些方法,首先是buildWakeRestaurantsDataframe(),它从CSV文件中读取数据集。这个你应该很熟悉,因为你之前在3.2.1节看到过这个。

现在你已经准备好处理第二个数据集了。

请注意,当你删除一个父列时,所有的嵌套列也会被删除。
现在,你有两个具有相同列数的dataframe。因此,你可以在 combineDataframes() 方法中把它们组合起来。在类似于SQL的组合中,有两种方法可以将两个dataframe组合起来:你可以使用union()或unionByName()方法。union()方法不关心列的名称,只关心它们的顺序。这个方法总是会把第一个dataframe中的第1列和第二个dataframe中的第1列联合起来,然后移动到第2列,再移动到第3列,而不管它们的名字是什么。
在经过几次转换操作(在这些操作中,你创建了新的列,重命名它们,转储它们,或者合并它们)之后,可能很难记住列的顺序是否正确。如果字段不匹配,在最坏的情况下,你可能会有不一致的数据,最好的情况是程序停止。另一方面,unionByName()通过名称来匹配列,这样更安全。下面的代码显示了union操作,并查看了结果的分区。你可以组合更多的数据集,但不能同时组合。当你在一个dataframe中加载一个小的(通常是128MB以下)数据集时,Spark会将只创建一个分区。然而,在这种情况下,Spark为基于CSV的数据集创建一个分区,为基于JSON的数据集创建一个分区。在两个不同的dataframe中的两个数据集会导致至少两个分区(每个数据集至少一个)。连接它们将创建一个独特的dataframe,但它将依赖于两个原始分区(或更多)。你可以尝试通过repartition()来修改这个例子,看看Spark将如何创建数据集和分区。repartition不会带来很大的好处。在第17章中,你会看到分区可以提高分割在几个节点上的较大数据集的性能,特别是(但不只是)在join操作中。Dataframe是一个Dataset<Row>
在本节中,你将了解更多关于dataframe的实现。你几乎可以拥有任何Plain Old Java对象(POJO)的数据集,但只有行的数据集(Dataset)才被称为dataframe。让我们来探讨一下dataframe的好处,并仔细看看如何操作这些特定的数据集。重要的是要明白,你可以拥有其他POJO的数据集,因为你可以重用你的库中可能已经有的或对你的应用更特殊的POJO。第9章甚至说明了如何基于现有的POJO来摄取数据。然而,作为Row的数据集(Dataset)实现的dataframe具有更丰富的API。你将看到如何在需要时来回转换,从dataframe到dataset。
LAB 你可以从GitHub下载代码,网址是https://github.com/jgperrin/net.jgp.books.spark.ch03。你将从包net.jgp .books.spark.ch03.lab300_dataset中的实验室#300开始。
重复使用您的POJO
让我们来探讨一下在你的dataset API中直接重用POJO的好处,并多了解一些关于Spark存储的知识。使用dataset而不是dataframe的主要好处是,你可以直接在Spark中重用你的POJO。使用dataset与你的POJO可以让你使用你熟悉的对象,而没有Row可能带来的任何限制,比如从中提取数据。当你看dataset API(http://mng.bz/qXYE)时,你会看到很多对dataset的引用,其中T代表一个通用类型,而不是具体的Row。然而,要注意的是,有些操作会失去POJO的强类型,而返回一个Row:一个例子是加入两个数据集或在一个数据集上执行聚合。这完全不是一个问题,但应该是一个预期的特征。例如,考虑一个基于书籍的数据集。如果你通过分组来统计按年份出版的书籍数量,你的书籍POJO中不会有一个计数字段,所以Spark会自动创建一个dataframe来存储结果。最后,Row使用了名为Tungsten的高效存储。这不是你的POJO的情况。Tungsten:疯狂快速的Java存储
性能优化是一个永无止境的故事。Project Tungsten是Apache Spark的一个集成部分,它专注于增强三个关键领域:内存管理和二进制处理,缓存感知计算,以及代码生成。让我们快速看看第一个领域和Java存储对象的方式。在Java中,我喜欢的第一件事(来自C++)是你不必跟踪内存使用和对象生命周期:所有这些都由垃圾收集器(GC)完成。虽然GC在大多数情况下都表现得很好,但当你玩数据集时,它可能很快就会因为创建数百万个对象而不堪重负。在Java(8及以下)中存储一个四字符的字符串,如Java,将需要48个字节;在使用UTF-8/ASCII编码时,存储这个字符串应该只需要4个字节。Java虚拟机(JVM)原生String实现的存储方式不同,它用UTF-16编码对每个字符使用2个字节进行编码,每个String对象还包含一个12字节的头和8字节的哈希码。当你调用Java(或其他任何基于JVM的语言).length()操作时,JVM仍然会返回4,因为那是以字符为单位的字符串长度,而不是它在内存中的物理表示。查看Java对象布局(JOL)工具,http://openjdk.java.net/projects/code-tools/jol/,了解更多关于物理存储的信息。GC和对象存储本身都不差。然而,在高性能和可预测的工作负载中,本可以取得进步。因此,一个更高效的存储系统诞生了。Tungsten直接管理内存块,压缩数据,并有新的数据容器,使用与操作系统的低级交互,提供16倍到100倍的性能提升。你可以在http://mng.bz/7zyg 阅读更多关于Project Tungsten的信息。
创建一个字符串的dataset
为了理解如何使用dataset而不是dataframe,让我们看看如何创建一个简单的String的dataset。这将通过使用一个我们都熟悉的简单对象--字符串来说明dataset的用法。然后你将能够创建更复杂对象的dataset。你的应用程序将从一个简单的包含字符串的Java数组中创建一个String的dataset,然后显示结果--没有任何花哨的东西。下面是预期的输出。


要使用dataframe的扩展方法来代替dataset,你可以通过调用toDF()方法轻松地将一个dataset转换为一个dataframe。请看实验室#310 (net.jgp.books.spark.ch03.lab310datasetto_dataframe.ArrayToDatasetToDataframeApp)。它在start()方法的结尾处添加了以下片段。
Dataset<Row> df = ds.toDF();
df.show();
df.printSchema();
输出与本节中的前一个实验室(实验室#300)完全相同,但现在你有了一个dataframe!你可以在这里找到一个dataframe。
来回转换
在本节中,你将学习如何将一个dataframe转换为一个dataset并返回。如果你想操作你现有的POJO和只适用于dataframe的扩展API,这种转换是有用的。你将读取一个CSV文件,其中包含一个dataframe中的书籍。你将把dataframe转换为书籍的dataset,然后再回到dataframe。虽然这听起来是一个令人讨厌的流程,但作为Spark工程师,你可能会参与其中的部分或全部操作。想象一下下面的用例。你在你的库中有一个现有的bookProcessor()方法。这个方法接收一个图书POJO,并通过API将其发布到一个商家网站上,如Amazon、Fnac或Flipkart。你绝对不希望重写这个方法仅仅在Spark上工作。你要继续发送图书POJO。你可以加载成千上万的书籍,将它们存储在书籍的数据集中,当你要对它们进行迭代时,你可以使用分布式处理来调用你现有的bookProcessor()方法,无需修改。让我们专注于第一部分:摄取文件,并将dataframe变成书籍的dataset。输出结果如下。
在你将你的dataframe转换为dataset后,字段会被排序。这不是你要求应用程序做的事情;这是一个额外的奖励(或者说 "malus",取决于日期)。然而,如果你打算在这之后合并数据集,记得使用 unionByName()(而不是 union()),因为字段可能在起飞过程中发生了移动(类似于飞行时东西在头顶的舱室里被移动,这意味着你不知道当你打开盖子时你会得到什么)。操作见以下列表。
map()方法是一种有趣的动物,一开始看起来有点吓人,但却像小狗一样可爱。map()方法之所以让人望而生畏,是因为它需要更多的编码,而且它并不总是一个容易理解的概念。这个方法将
- 在MapFunction类的call()方法中做一些事情。
让我们深入了解一下map()方法的签名。在Java中,泛型并不总是简单的。 Dataset<U> map(MapFunction<T, U>, Encoder<U>)
调用时,map()方法将
当你实现你的方法时,确保你有正确的签名和实现,因为这可能很棘手。包括签名和所需方法在内的骨架如下:

下面的列表将此骨架应用到您正在构建的BookMapper mapper类中。附录I列出了这些类型的转换的参考资料,包括类签名。


你还需要一个简单的POJO来代表一本书(Book POJO),这是下一个列表。我去掉了大部分的getters和setters,以简化可读性;我很确定你可以在心理上添加缺少的方法。我将所有常见的工件存储在一个x子包中,以增加项目的可读性;在Eclipse中,x代表额外的。

现在你已经有了dataset,你可以把它转换回dataframe,这样你就可以,例如,执行连接或聚合操作。所以,让我们把dataset转换回dataframe来研究这部分机制。你将研究一个有趣的日期案例,因为日期被分割成一个嵌套结构。下面的列表显示了输出结果。


现在,你已经准备好将dataset转换为dataframe,然后执行一些转换,比如将日期从这个可恶的结构改为dataframe中的日期列。
D Dataset<Row> df2 = bookDs.toDF();
好吧,这并不难,对吧?要将一个dataset转换为dataframe,你只需使用toDF()方法。然而,你仍然有这种奇怪的日期格式,所以我们来纠正一下。第一步是将日期转换为一个具有日期表示的字符串。在这种情况下,你将使用ANSI/ISO格式。YYYY-MM-DD, 如1971-10-05。请记住,Java中的年份是从1900年开始的,所以1971年是71,而2004年是104。同样,月份从0开始,所以10月,也就是一年中的第10个月,是第9个月。使用Java方法来构建日期需要使用一个映射函数,就像你在清单3.3中做的那样。这是通过对数据的迭代来构建数据集或数据框架的方法。你也可以使用UDF,定义在第16章。
expr()静态函数将计算一个类似于SQL的表达式并返回一个列。它可以使用字段名。表达式releaseDate.year + 1900将在这个转换过程中被Spark评估,并变成一个包含该值的列。releaseDate.year中的点符号表示数据的路径,你可以在清单3.5的模式中看到。通过实例你将看到更多的静态函数,并将在第13章以及附录G中研究转换。一旦你有了一个作为字符串的日期,你可以使用to_date()静态函数将其转换为作为日期的日期。

你也可以drop() releaseDate列,它的结构很奇怪,它不是很有用。现在你应该可以建立一个包含任何POJO的dataset,并将其转换为一个dataframe。
dataframe的祖先:RDD
在前面的章节中,你概要地学习了dataset和dataframe。然而,Spark并不是天生就有这些组件的。让我们来了解一下为什么要记住弹性分布式数据集的作用。在dataframe之前,Spark专门使用RDD。不幸的是,你仍然会发现一些老卫士只信奉RDD,而无视或忽略dataframe。为了避免疯狂的讨论,你应该知道什么是RDD,以及为什么dataframe在大多数应用中肯定更容易使用,但如果没有RDD,它们将无法工作。Spark最著名的创始人之一Matei Zaharia将RDD定义为一种分布式内存抽象,让程序员以容错的方式在大型集群上进行内存内计算。RDD的第一个实现是在Spark中。这个想法是通过一组可靠(弹性)的节点来实现内存内计算:如果一个节点发生故障,没有什么大不了的;另一个节点会接力,就像RAID5磁盘架构一样。RDD的诞生,就是考虑到了不可变的概念(定义在3.1.2节)。尽管围绕着dataframe做出了重大努力,但RDD并没有消失。而且没有人希望它们消失,它们仍然是Spark使用的低级存储层。看看我的朋友Jules Damji的文章,比较Spark的各种存储结构,网址是http://mng.bz/omdD 但要小心,他是偏向Scala的。你可以看到dataframe和RDD的一种方式是,dataframe是RDDs的扩展。如果说dataframe是雄伟的,那么RDD绝对不是丑陋和懦弱的。RDD把所有的存在感都带到了存储层。你应该考虑RDDs,当RDD是dataframe的基础。正如你所看到的,在许多用例中,dataframe比RDD更容易使用,性能也更优化,但不要因为dataframe的雄伟品质而戏弄RDD的粉丝,好吗?概要
dataframe是一个不可改变的分布式数据集合,组织成命名的列。基本上,一个dataframe就是一个带有schema的RDD。
dataframe是Dataset及Row的泛型的一个特例,代码形式为:Dataset<Row>|
一个dataset除了包含Row形式的数据集,还可以是任何的类型,如Dataset<String>、Dataset<Book>或Dataset<SomePojo>。
Dataframe可以存储列式信息,就像CSV文件一样,也可以存储嵌套字段和数组,就像JSON文件一样。无论你是在使用CSV文件、JSON文件还是其他格式,dataframe API都是一样的。
在JSON文件中,可以使用点号(.)访问嵌套字段。
dataframe的 API 可以在 http://mng.bz/qXYE 找到;关于如何使用dataframe的细节,请参见参考章节。
静态方法的API可以在http://mng.bz/5AQD(和附录G)中找到;关于如何使用静态方法的细节,请参见参考章节。
如果你在联合两个dataframe时不关心列名,请使用 union()。
如果你在联合两个dataframe时不关心列名,请使用 unionByName()。
在Spark中,你可以直接在一个数据集中重用你的POJO。
如果你想让一个对象作为数据集的一部分,它必须是可序列化的。
数据集的drop()方法可以删除dataframe中的一列。
数据集的col()方法根据名称返回数据集的列。
to_date() 静态函数将字符串中的日期转换为日期。
expr() 静态函数使用字段名来计算表达式的结果。
lit() 静态函数返回具有文字值的列。
弹性分布式数据集(RDD)是数据元素的不可改变的分布式集合。
当性能至关重要时,你应该在RDD上使用dataframe。
Tungsten存储依赖于dataframe。
Catalyst 是转换优化器(见第 4 章)。它依靠dataframe来优化操作和转换。
跨Spark库的API(graph, SQL, machine learning, 和streaming)正在变得统一在dataframe API下。