暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark- word Count案例

顶哥说 2021-07-03
450


1 新建项目

新建 idea Maven项目工程, 并创建子工程,pom.xml文件中引入spark依赖

pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
    <artifactId>dintalk-classes</artifactId>
    <groupId>cn.dintalk.bigdata</groupId>
    <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>spark-core</artifactId>


    <dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.1</version>
    </dependency>
    </dependencies>


    </project>

    2 准备数据文件


    3 代码编写

    3.1 第一种写法

      package cn.dintalk.bigdata.spark.core.wc


      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}


      object Spark01_WordCount {
      def main(args: Array[String]): Unit = {


      // application -> spark 框架


      // 1. 建立 和 Spark框架的连接
      // JDBC 有 connection , Spark 有 SparkContext
      val sparkConf= new SparkConf()
      .setMaster("local")
      .setAppName("wordCount")


      val sc = new SparkContext(sparkConf)


      // 2. 执行业务操作


      // 2.1 读取文件, 获取一行一行的数据
      val lines: RDD[String] = sc.textFile("datas")
      // 2.2 将行数据进行切分,形成一个一个的单词
      val words: RDD[String] = lines.flatMap(_.split(" "))
      // 2.3 将数据按照单词进行分组,便于统计
      val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)
      // 2.4 对分组后的数据进行转换
      // (hello,hello,hello), (word,word) -> (hello,3),(word,2)
      val wordCount: RDD[(String, Int)] = wordGroup.map {
      case (word, list) => {
      (word, list.size)
      }
      }
      // 2.5 将转换结果采集到控制台输出
      val tuples: Array[(String, Int)] = wordCount.collect()
      tuples.foreach(println)
      // 3. 关闭连接
      sc.stop()
      }
      }

      3.2 第二种写法

        package cn.dintalk.bigdata.spark.core.wc


        import org.apache.spark.rdd.RDD
        import org.apache.spark.{SparkConf, SparkContext}


        object Spark02_WordCount {
        def main(args: Array[String]): Unit = {


        // application -> spark 框架


        // 1. 建立 和 Spark框架的连接
        // JDBC 有 connection , Spark 有 SparkContext
        val sparkConf= new SparkConf()
        .setMaster("local")
        .setAppName("wordCount")


        val sc = new SparkContext(sparkConf)


        // 2. 执行业务操作


        // 2.1 读取文件, 获取一行一行的数据
        val lines: RDD[String] = sc.textFile("datas")
        // 2.2 将行数据进行切分,形成一个一个的单词
        val words: RDD[String] = lines.flatMap(_.split(" "))


        val wordToOne: RDD[(String, Int)] = words.map(word => (word, 1))


        val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne
        .groupBy(t => t._1)


        val wordCount: RDD[(String, Int)] = wordGroup.map {
        case (word, list) => {
        list.reduce(
        (t1, t2) => {
        (t1._1, t1._2 + t2._2)
        }
        )
        }
        }
        // 2.5 将转换结果采集到控制台输出
        val tuples: Array[(String, Int)] = wordCount.collect()
        tuples.foreach(println)
        // 3. 关闭连接
        sc.stop()
        }
        }

        3.3 第三种写法

          package cn.dintalk.bigdata.spark.core.wc


          import org.apache.spark.rdd.RDD
          import org.apache.spark.{SparkConf, SparkContext}


          object Spark03_WordCount {
          def main(args: Array[String]): Unit = {


          val sparkConf= new SparkConf()
          .setMaster("local")
          .setAppName("wordCount")
          val sc = new SparkContext(sparkConf)


          val lines: RDD[String] = sc.textFile("datas")


          val words: RDD[String] = lines.flatMap(_.split(" "))
          val wordToOne: RDD[(String, Int)] = words.map((_, 1))
          val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)


          val tuples: Array[(String, Int)] = wordCount.collect()
          tuples.foreach(println)


          sc.stop()
          }
          }

          3.4结果验证


          4 log4j控制日志输出

          4.1 resources目录下新建log4j.properties并 配置

            log4j.rootLogger=ERROR, stdout
            log4j.appender.stdout=org.apache.log4j.ConsoleAppender
            log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
            log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n


            log4j.appender.R=org.apache.log4j.RollingFileAppender
            log4j.appender.R.File=../log/agent.log
            log4j.appender.R.MaxFileSize=10240KB
            log4j.appender.R.MaxBackupIndex=1


            log4j.appender.R.layout=org.apache.log4j.PatternLayout
            log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

            4.2 验证日志输出

            无多余日志的输出



            【推荐阅读】

            Spark 安装



            文章转载自顶哥说,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论