暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark Streaming编程实战(开发实例)

原创 数据库新手-ray 2022-06-19
716

流数据模拟器

在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境,首先需要定义流数据模拟器。该模拟器的主要功能是通过 Socket 方式监听指定的端口号,当外部程序通过该端口进行连接并请求数据时,模拟器将定时将指定的文件数据进行随机获取,并发送给外部程序。

流数据模拟器的代码如下。

  1. import java.io.{PrintWriter}
  2. import java.net.ServerSocket
  3. import scala.io.Source
  4. object StreamingSimulation {
  5. //定义随机获取整数的方法
  6. def index(length:Int) = {
  7. import java.util.Random
  8. val rdm = new Random
  9. rdm.nextInt(length)
  10. }
  11. def main(args: Array[String]) {
  12. //调用该模拟器需要 3 个参数,分别为文件路径、端口号和间隔时间(单位为毫秒)
  13. if (args.length != 3) {
  14. System.err.printIn("Usage:<filename> <port><millisecond>")
  15. System.exit(1)
  16. }
  17. //获取指定文件总的行数
  18. val filename = args(0)
  19. val lines = Source.fromFile(filename).getLines.toList
  20. val filerow = lines.length
  21. //指定监听某端口,当外部程序请求时建立连接
  22. val listener = new ServerSocket(args(1).toInt)
  23. while (true) {
  24. val socket = listener.accept()
  25. new Thread() {
  26. override def run = {
  27. printIn("Got client connected from: " + socket.getInetAddress)
  28. val out = new PrintWriter(socket.getOutputStream(), true)
  29. while (true) {
  30. Thread.sleep(args(2).toLong)
  31. //当该端口接受请求时,随机获取某行数据发送给对方
  32. val content = lines(index(filerow))
  33. printIn(content)
  34. out.write(content + '\n') out.flush()
  35. }
  36. socket.close()
  37. }
  38. }.start()
  39. }
  40. }
  41. }

在 IDEA 开发环境打包配置界面中:

  • 首先需要在 ClassPath 加入 Jar 包(/app/scala-2.10.4/lib/scala—swing.jar/app/scala—2.10.4/lib/scala—library.jar/app/scala—2.10.4/lib/scala—actors.jar)。
  • 然后单击“Build”→“Build Artifacts”,选择“Build”或者“Rebuild”动作。
  • 最后使用以下命令复制打包文件到 Spark 根目录下。

cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
cp LearnSpark.jar /app/hadoop/spark-1.1.0/

实例 1:读取文件演示

在该实例中,Spark Streaming 将监控某目录中的文件,获取在该间隔时间段内变化的数据,然后通过 Spark Streaming 计算出该时间段内的单词统计数。

程序代码如下。

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds,StreamingContext}
  3. import org.apache.spark.streaming.StreamingContext._
  4. object FileWordCount {
  5. def main(args:Array[String]) {
  6. val sparkConf = new SparkConf().setAppName("FileWordCount").setMaster("local[2]")
  7. //创建 Streaming 的上下文,包括 Spark 的配置和时间间隔,这里时间间隔为 20 秒 val ssc = new StreamingContext(sparkConf,Seconds(20))
  8. //指定监控的目录,这里为 /home/hadoop/temp/
  9. val lines = ssc.textFileStream("/home/hadoop/temp/")
  10. //对指定文件夹中变化的数据进行单词统计并且打印
  11. val words = lines.flatMap (_.split(""))
  12. val wordCounts = words.map(x => (x, 1)).reduceByKey(_+_)
  13. wordCounts.print()
  14. // 启动 Streaming
  15. ssc.start()
  16. ssc.awaitTermination()
  17. }
  18. }

运行代码的步骤共有三步。

1)创建 Streaming 监控目录。

创建 /home/hadoop/temp 为 Spark Streaming 监控的目录,在该目录中定时添加文件,然后由 Spark Streaming 统计出新添加的文件中的单词个数。

2)使用以下命令启动 Spark 集群。

$cd /app/hadoop/spark-1.1.0
$sbin/start-all.sh

3)在 IDEA 中运行 Streaming 程序。

在 IDEA 中运行该实例,由于该实例没有输入参数故不需要配置参数,在运行日志中将定时打印时间戳。如果在监控目录中加入文件,则输出时间戳的同时将输出该时间段内新添加的文件的单词统计个数。

实例 2:网络数据演示

在该实例中将由流数据模拟器以 1 秒的频度发送模拟数据,Spark Streaming 通过 Socket 接收流数据并每 20 秒运行一次来处理接收到的数据,处理完毕后打印该时间段内数据出现的频度,即在各处理段时间内的状态之间并无关系。

程序代码如下。

  1. import org.apache.spark.{SparkContext,SparkConf}
  2. import org.apache.spark.streaming.{Milliseconds,Seconds,StreamingContext}
  3. import org.apache.spark.streaming.StreamingContext._
  4. import org.apache.spark.storage.StorageLevel
  5. object NetworkWordCount {
  6. def main(args: Array[String]) {
  7. val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
  8. val sc = new SparkContext(conf)
  9. val ssc = new StreamingContext(sc,Seconds(20))
  10. //通过 Socket 获取数据,需要提供 Socket 的主机名和端口号,数据保存在内存和硬盘中
  11. val lines = ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
  12. //对读入的数据进行分割、计数
  13. val words = lines.flatMap(_.split(","))
  14. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print()
  15. ssc.start()
  16. ssc.awaitTermination()
  17. }
  18. }

运行代码的步骤共有四步。

1)启动流数据模拟器。

启动流数据模拟器,模拟器 Socket 的端口号为 9999,频度为 1 秒。在该实例中将定时发送 /home/hadoop/upload/class7 目录下的 people.txt 数据文件,其中,people.txt 数据的内容如下。

1 Michael
2 Andy
3 Justin
4

启动流数据模拟器的命令如下。

$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation \   
/home/hadoop/upload/class7/people.txt 9999 1000

在没有程序连接时,该程序处于阻塞状态。

2)在 IDEA 中运行 Streaming 程序。

在 IDEA 中运行该实例,需要配置连接 Socket 的主机名和端口号,在这里配置主机名为 hadoop1,端口号为 9999。

3)观察模拟器发送情况。

IDEA 中的 Spark Streaming 程序与模拟器建立连接,当模拟器检测到外部连接时开始发送测拭数据,数据是随机在指定的文件中获取的一行数据,时间间隔为 1 秒。图 1 是一个模拟器发送情况的截图。

模拟器发送情况的截图
图 1  模拟器发送情况的截图

4)观察统计结果。

在 IDEA 的运行窗口中,可以观测到统计结果。通过分析可知,Spark Streaming 每段时间内的单词数为 20,正好是 20 秒内每秒发送数量的总和。

---------------------------
Time:14369195400000ms 
(Andy,2)  
(Michael,9)   

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论