暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

认识sparklyr

碧茂大数据 2021-10-08
537

更多精彩,请点击上方蓝字关注我们!

  • sparklyr不仅提供了基于Spark的分布式机器学习算法库,还有其他的一些功能。如下:

    • 使用dplry和SQL(通过DBI)交互式的操作Spark的数据。

    • 过滤和聚合Spark数据集,然后将它们通过R进行分析和可视化。

    • 使用Spark MLlib和H2O SparkingWater实现分布式的机器学习。

    • 创建extensions,可以调用完整的SparkAPI并提供Spark包的接口。

    • 支持集成连接到Spark,并通过RStudioIDE浏览Spark DataFrames。

从CRAN安装sparklyr

install.packages("sparklyr")

  • 还要安装一个本地的Spark版本

    • 如果使用RStudio IDE,还需要下载一个最新的IDE,这个新的IDE包含了集成Spark的功能提升。

library(sparklyr)
spark_install(version = "1.6.2")

连接到Spark

  • 可以选择连接本地的Spark实例或者远程的Spark集群,如下连接到本地的Spark。

    • 返回的Spark connection(sc)为Spark集群提供了一个远程的dplyr数据源。

library(sparklyr) 
sc <- spark_connect(master = "local")

数据读取

  • 可以使用dplyr的copy_to函数将R的data frames拷贝到Spark。

    • 更典型的是可以通过spark_read的一系列函数读取Spark集群中的数据。

  • 如下例子,从R拷贝一些数据集到Spark。

    • 注意可能需要安装nycflights13和Lahman包才能运行这些代码。

library(dplyr) 
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")

dplyr使用

  • 针对集群中的表,现在可以使用所有可用的dplyr的verbs。以下是一个简单的过滤示例:

# filter by departure delay
flights_tbl %>% filter(dep_delay == 2)

  • 比如,分析航班延误的数据。

delay <- flights_tbl %>% 
group_by(tailnum) %>%
summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>%
collect()

# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)

  • 注意尽管上面显示的dplyr函数与在使用R的data frames时是一样的,但如果使用的是sparklyr,它们其实是被推到远端的Spark集群里执行的。

Window Functions

  • dplyr同时也支持window函数,比如:

library(dplyr)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")

batting_tbl %>%
select(playerID, yearID, teamID, G, AB:H) %>%
arrange(playerID, yearID, teamID) %>%
group_by(playerID) %>%
filter(min_rank(desc(H)) <= 2 & H > 0)

Machine Learning

  • 使用Spark MLlib或H2O SparkingWater实现分布式的机器学习。

    • 它们都提供了一系列的基于DataFrames构建的high-levelAPIs,从而帮助创建和调试机器学习工作流。

Spark MLlib

  • 例子:将使用ml_linear_regression来拟合线性回归模型。

    • 使用内置的mtcar数据集,看看是否可以根据其重量(wt)和发动机的气缸数量(cyl)来预测汽车的燃油消耗(mpg)。

    • 假设在每种情况下,mpg和features(wt和cyl)之间的关系是线性的。

# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars)

# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# fit a linear model to the training dataset
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))

  • 对于由Spark生成的线性回归模型,可以使用summary()来更多的了解拟合质量(quality of our fit),以及每个预测变量的统计显著性(statistical significance)。

summary(fit)

  • Spark机器学习支持众多的算法和特征变换,如上所示,会发现将这些功能与dplyr管道链接起来很容易。

H2O Sparkling Water

  • 以mtcars为例,这次使用H2O Sparkling Water来实现。

    • dplyr代码依旧是用来准备数据,当将数据分为test和training后,调用h2o.glm而不是ml_linear_regression。

# convert to h20_frame (uses the same underlying rdd)
training <- as_h2o_frame(partitions$training)
test <- as_h2o_frame(partitions$test)

# fit a linear model to the training dataset
fit <- h2o.glm(x = c("wt", "cyl"),
y = "mpg",
training_frame = training,
lamda_search = TRUE)

# inspect the model
print(fit)

  • 对于由H2O产生的线性回归模型,可以使用print() 或 summary()来更多的了解拟合质量(quality of our fit)。

    • summary()方法返回一些关于评分历史(scoringhistory)和变量重要性(variableimportance)的额外信息。

关注公众号:领取精彩视频课程&海量免费语音课程




文章转载自碧茂大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论