暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark实现WordCount的几种方式总结

暴走大数据 2020-07-27
881

点击上方蓝色字体,选择“设为星标”

回复”资源“获取更多惊喜

大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!

暴走大数据
点击右侧关注,暴走大数据!

方法一:map + reduceByKey

    package com.cw.bigdata.spark.wordcount


    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}


    object WordCount1 {
    def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount1")


    val sc: SparkContext = new SparkContext(config)


    val lines: RDD[String] = sc.textFile("in")


    lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
    }
    }

    方法二:使用countByValue代替map + reduceByKey

      package com.cw.bigdata.spark.wordcount


      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}


      object WordCount2 {
      def main(args: Array[String]): Unit = {
      val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount2")


      val sc: SparkContext = new SparkContext(config)


      val lines: RDD[String] = sc.textFile("in")


      lines.flatMap(_.split(" ")).countByValue().foreach(println)


      }
      }

      方法三:aggregateByKey或者foldByKey

        package com.cw.bigdata.spark.wordcount


        import org.apache.spark.{SparkConf, SparkContext}
        import org.apache.spark.rdd.RDD


        /**
        * WordCount实现第三种方式:aggregateByKey或者foldByKey
        *
        * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
        * 1.zeroValue:给每一个分区中的每一个key一个初始值;
        * 2.seqOp:函数用于在每一个分区中用初始值逐步迭代value;(分区内聚合函数)
        * 3.combOp:函数用于合并每个分区中的结果。(分区间聚合函数)
        *
          *  foldByKey相当于aggregateByKey的简化操作,seqop和combop相同
        */
        object WordCount3 {
        def main(args: Array[String]): Unit = {
        val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount3")


        val sc: SparkContext = new SparkContext(config)


        val lines: RDD[String] = sc.textFile("in")


        lines.flatMap(_.split(" ")).map((_, 1)).aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)

        lines.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).collect().foreach(println)


        }
        }

        方法四:groupByKey+map

          package com.cw.bigdata.spark.wordcount


          import org.apache.spark.{SparkConf, SparkContext}
          import org.apache.spark.rdd.RDD


          /**
            * WordCount实现的第四种方式:groupByKey+map
          */
          object WordCount4 {
          def main(args: Array[String]): Unit = {
          val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount4")


          val sc: SparkContext = new SparkContext(config)


          val lines: RDD[String] = sc.textFile("in")


          val groupByKeyRDD: RDD[(String, Iterable[Int])] = lines.flatMap(_.split(" ")).map((_, 1)).groupByKey()


          groupByKeyRDD.map(tuple => {
          (tuple._1, tuple._2.sum)
          }).collect().foreach(println)


          }
          }

          方法五:Scala原生实现wordcount

            package com.cw.bigdata.spark.wordcount




            /**
              * Scala原生实现wordcount
            */
            object WordCount5 {
            def main(args: Array[String]): Unit = {


            val list = List("cw is cool", "wc is beautiful", "andy is beautiful", "mike is cool")
            /**
            * 第一步,将list中的元素按照分隔符这里是空格拆分,然后展开
            * 先map(_.split(" "))将每一个元素按照空格拆分
            * 然后flatten展开
            * flatmap即为上面两个步骤的整合
                  */
            val res0 = list.map(_.split(" ")).flatten
            val res1 = list.flatMap(_.split(" "))


            println("第一步结果")
            println(res0)
            println(res1)


            /**
            * 第二步是将拆分后得到的每个单词生成一个元组
            * k是单词名称,v任意字符即可这里是1
            */
            val res3 = res1.map((_, 1))
            println("第二步结果")
            println(res3)
            /**
            * 第三步是根据相同的key合并
            */
            val res4 = res3.groupBy(_._1)
            println("第三步结果")
            println(res4)
            /**
            * 最后一步是求出groupBy后的每个key对应的value的size大小,即单词出现的个数
            */
            val res5 = res4.mapValues(_.size)
            println("最后一步结果")
            println(res5.toBuffer)
            }
            }

            方法六:combineByKey

              package com.cw.bigdata.spark.wordcount


              import org.apache.spark.{SparkConf, SparkContext}
              import org.apache.spark.rdd.RDD


              /**
                * WordCount实现的第六种方式:combineByKey
              */
              object WordCount6 {
              def main(args: Array[String]): Unit = {
              val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("combineByKey")


              val sc: SparkContext = new SparkContext(config)


              val lines: RDD[String] = sc.textFile("in")


              val mapRDD: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))


              // combineByKey实现wordcount
              mapRDD.combineByKey(
              x => x,
              (x: Int, y: Int) => x + y,
              (x: Int, y: Int) => x + y
              ).collect().foreach(println)


              }
              }

              欢迎点赞+收藏
              欢迎转发至朋友圈

              文章转载自暴走大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论