暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PySpark入门(一):RDD的创建

Python都知道 2023-03-10
254

前言

哈喽大家好,我是了不起。

PySpark入门的第一步,就是了解RDD,因为你会发现编程中RDD就是数据,我们操作的也是RDD。

什么是RDD

RDD代表Resilient Distributed Dataset,学名弹性分布式数据集 。它们是在多个节点上运行和操作以在集群上进行并行处理的元素,是一种对数据集形态的抽象,基于此抽象,使用者可以在集群中执行一系列计算,而不用将中间结果落盘。而这正是之前 MR 抽象的一个重要痛点,每一个步骤都需要落盘,使得不必要的开销很高。

RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以在这些RDD上应用多个操作来完成某项任务。

创建RDD

1.通过使用 parallelize( ) 生成RDD

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("parallelize create RDD") \
    .getOrCreate()

df = spark.sparkContext.parallelize([(1'a'),
             (2'b'),
             (3'c')]).toDF(['c1''c2'])

2.通过使用 createDataFrame( )创建RDD

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("createDataFrame create RDD") \
    .getOrCreate()

p = spark.createDataFrame([
                        ('1''a',   1),
                        ('2''b'0),
                        ('3''c',   1),
                        ('4''d',   0)],
                        ['Id''Name''sex']
                       )

3.通过使用read()和 load()方法

3.1从.csv文件读取数据

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("read create RDD") \
    .getOrCreate()

df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load("/home/usr/data/a.csv",header=True)

3.2从数据库读取数据集

from pyspark.sql import SparkSession

spark = SparkSession \
            .builder \
            .appName("jdbc create RDD") \
            .getOrCreate()

user = 'root'
pw   = 'root'
table_name = 'test'
url = 'jdbc:postgresql://##.###.###.##:5432/dataset?user='+user+'&password='+pw
properties ={'driver''org.postgresql.Driver''password': pw,'user': user}

df = spark.read.jdbc(url=url, table=table_name, properties=properties)

3.3从HDFS读取数据集

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import HiveContext

sc= SparkContext('local','example')
hc = HiveContext(sc)
tf1 = sc.textFile("hdfs://user/data/a.CSV")

hc.sql("use user")
spf = hc.sql("SELECT * FROM user LIMIT 100")

总结

今天给大家带来的是如何创建RDD,RDD是Spark数据操作的基础,创建RDD是大数据入门的第一步。

如果您喜欢我的文章,可以持续关注,后续文章更精彩,敬请期待。

PSPython都知道技术交流群(技术交流、摸鱼、白嫖课程为主)又不定时开放了,感兴趣的朋友,可以在下方公号内回复:666,即可进入。


老规矩,道友们还记得么,右下角的 “在看” 点一下如果感觉文章内容不错的话,记得分享朋友圈让更多的人知道!



文章转载自Python都知道,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论