点击上方蓝色字体,选择“设为星标”
回复”资源“获取更多惊喜

方法一:map + reduceByKey
package com.cw.bigdata.spark.wordcountimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object WordCount1 {def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount1")val sc: SparkContext = new SparkContext(config)val lines: RDD[String] = sc.textFile("in")lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)}}
方法二:使用countByValue代替map + reduceByKey
package com.cw.bigdata.spark.wordcountimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object WordCount2 {def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount2")val sc: SparkContext = new SparkContext(config)val lines: RDD[String] = sc.textFile("in")lines.flatMap(_.split(" ")).countByValue().foreach(println)}}
方法三:aggregateByKey或者foldByKey
package com.cw.bigdata.spark.wordcountimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD/*** WordCount实现第三种方式:aggregateByKey或者foldByKey** def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]* 1.zeroValue:给每一个分区中的每一个key一个初始值;* 2.seqOp:函数用于在每一个分区中用初始值逐步迭代value;(分区内聚合函数)* 3.combOp:函数用于合并每个分区中的结果。(分区间聚合函数)** foldByKey相当于aggregateByKey的简化操作,seqop和combop相同*/object WordCount3 {def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount3")val sc: SparkContext = new SparkContext(config)val lines: RDD[String] = sc.textFile("in")lines.flatMap(_.split(" ")).map((_, 1)).aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)lines.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).collect().foreach(println)}}
方法四:groupByKey+map
package com.cw.bigdata.spark.wordcountimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD/*** WordCount实现的第四种方式:groupByKey+map*/object WordCount4 {def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount4")val sc: SparkContext = new SparkContext(config)val lines: RDD[String] = sc.textFile("in")val groupByKeyRDD: RDD[(String, Iterable[Int])] = lines.flatMap(_.split(" ")).map((_, 1)).groupByKey()groupByKeyRDD.map(tuple => {(tuple._1, tuple._2.sum)}).collect().foreach(println)}}
方法五:Scala原生实现wordcount
package com.cw.bigdata.spark.wordcount/*** Scala原生实现wordcount*/object WordCount5 {def main(args: Array[String]): Unit = {val list = List("cw is cool", "wc is beautiful", "andy is beautiful", "mike is cool")/*** 第一步,将list中的元素按照分隔符这里是空格拆分,然后展开* 先map(_.split(" "))将每一个元素按照空格拆分* 然后flatten展开* flatmap即为上面两个步骤的整合*/val res0 = list.map(_.split(" ")).flattenval res1 = list.flatMap(_.split(" "))println("第一步结果")println(res0)println(res1)/*** 第二步是将拆分后得到的每个单词生成一个元组* k是单词名称,v任意字符即可这里是1*/val res3 = res1.map((_, 1))println("第二步结果")println(res3)/*** 第三步是根据相同的key合并*/val res4 = res3.groupBy(_._1)println("第三步结果")println(res4)/*** 最后一步是求出groupBy后的每个key对应的value的size大小,即单词出现的个数*/val res5 = res4.mapValues(_.size)println("最后一步结果")println(res5.toBuffer)}}
方法六:combineByKey
package com.cw.bigdata.spark.wordcountimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD/*** WordCount实现的第六种方式:combineByKey*/object WordCount6 {def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("combineByKey")val sc: SparkContext = new SparkContext(config)val lines: RDD[String] = sc.textFile("in")val mapRDD: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))// combineByKey实现wordcountmapRDD.combineByKey(x => x,(x: Int, y: Int) => x + y,(x: Int, y: Int) => x + y).collect().foreach(println)}}

文章转载自暴走大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。








