暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

流式计算来袭,使用Python和PySpark处理流式数据

447

学习如何用Python和Pyspark创建本地、低延迟的流式数据。

长按关注《Python学研大本营》,加入读者群,分享更多精彩

Apache Spark是一个开源的分析工具,适用于数据工程、数据科学、机器学习和其他与数据有关的实践的从小型到大型的数据处理。使用Apache Spark,我们可以并行处理大量数据,将其连接到数百个来源,我们还可以利用微批流。此外,根据用户的需要,Spark流式数据具有高度的容错性。

PySpark使我们能够使用Apache Spark的力量,同时又能利用Python编程语言的简单性。

在这篇文章中,我们将尝试创建一个简单的Python脚本来创建流式数据管道。我们将解释如何使用具有非常低延迟的连续流。我们还将讨论微批流和比较这两种类型的流。

什么是PySpark流式数据?

当我们说到流式数据时,大多数人都会想到视频或音乐的流媒体。然而,流式数据被我们周围的各种设备所使用 -- 健康配件、智能手表、智能手机、智能家居设备等。

在PySpark中,我们区分了两种主要的流式数据类型 -- 连续流和微批流。

连续流

在连续流中,输入数据使用面向流的数据处理引擎进行处理,该引擎可以实时执行聚合、连接和窗口化等操作。

随着新数据的到来,这种处理管道的输出被持续更新(聚合/合并/连接)。在PySpark中,我们把这种流称为结构化流。虽然它是延迟最低的"最快"解决方案,但它不像微批流那样具有容错性,而且需要强大的集群来运行。

微批流

在微批流中,输入数据是以小而离散的数据批处理的。使用这种方法,我们可以使用通常用于成批数据的数据转换,同时保持低(但大于连续流)延迟、高容错性和对窗口功能的支持。

使用案例

在PySpark中,我们可以使用readStream
writeStream
方法来读/写数据流。

readStream
方法允许我们从Spark支持的任何来源开始读取。这个方法返回流式数据帧。

writeStream
方法允许我们将readStream
创建的流媒体DataFrame同时写到一个或多个位置。

对于连续流,我们可以使用这个简单的代码片段。它的作用是什么?当这个脚本运行时,我们将能够向MacOs终端输入文字,我们将把它作为一个源。同时,该脚本将在VS Code中的jupyter笔记本中打印处理后的流,延迟非常低。

from pyspark.sql import SparkSession

# 创建一个Spark会话
spark = SparkSession.builder.appName("Continuous Stream").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF")

# 从流式数据源读取数据
streamingDF = spark.readStream.format("socket").option("host""localhost").option("port"9999).load()

# 定义处理逻辑
processedDF = streamingDF.selectExpr("value as input""length(value) as length")

# 将输出写入一个流式处理器
query = processedDF.writeStream.format("console").start()

# 开启数据流
query.awaitTermination(20# Start the stream for 20 seconds to allow input to be typed in the terminal

# 20秒后,关闭数据流
query.stop()

注意,我们使用readStream
方法从源“socket”中读取数据流。我们的输入被加载为常规的PySpark DataFrame
。加载后,我们使用selectExpr
方法,通过表达式从DataFrame中进行选择。在我们的例子中,我们将创建一个新的列,叫做input
,由sql命令值生成的输入,基本上就是我们在终端输入的值。然后,我们将有一个由sql命令length(value)
生成的长度列,它是我们输入的长度(字符数)。

在PySpark中,在我们用.start()
启动流之前,操作不会被运行。你可以同时应用许多转换,但除非我们执行动作--在我们的例子中是.start()
,否则它不会开始执行过程。

在代码片段中,我们使用localhost:9999
的套接字作为我们的源。为了将数据传递给流,我们必须首先在localhost
上启动9999端口的监听服务器。要做到这一点,我们必须。

在MasOS终端输入:nc -l 9999
。这将使用netcat工具启动9999
端口的监听服务器。在Windows PowerShell中输入:New-Object System.Net.Sockets.TcpListener(9999).Start()
。这个命令使用.NET框架中的TcpListener
类在9999
端口创建了一个新的TCP监听器。在我们开始监听9999
端口后,我们可以运行带有流代码的jupyer笔记本单元,并开始在终端/PowerShell中打字。

Python2

Python2使Python23PythonPythonPython使GUIPygame使线WebFlaskeAIPython243PPT

https://item.jd.com/13284890.html

长按关注《Python学研大本营》
长按访问【IT今日热榜】,发现每日技术热点

文章转载自Python学研大本营,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论