暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spark2.2.0实战:RDD转DataFrame两种方式(上)

大数据开发运维架构 2020-03-11
105

    Spark SQL支持两种不同的方法将现有的RDDs转换为数据集。

    第一个方法:使用反射来推断包含特定对象类型的RDD的模式。这种基于反射的方法使代码更加简洁,并且当您在编写Spark应用程序时已经了解了模式时,它可以很好地工作。

第一种方法代码实例java版本实现:

    数据准备studentDatatxt

    1001,20,zhangsan
    1002,17,lisi
    1003,24,wangwu
    1004,16,zhaogang

        本地模式代码实现:

      package com.unicom.ljs.spark220.study;

      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.api.java.function.VoidFunction;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SQLContext;

      /**
      * @author: Created By lujisen
      * @company ChinaUnicom Software JiNan
      * @date: 2020-01-20 08:58
      * @version: v1.0
      * @description: com.unicom.ljs.spark220.study
      */
      public class RDD2DataFrameReflect {
      public static void main(String[] args) {
      SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameReflect");
      JavaSparkContext sc = new JavaSparkContext(sparkConf);
      SQLContext sqlContext=new SQLContext(sc);

      JavaRDD<String> lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\studentData.txt");
      JavaRDD<Student2> studentRDD = lines.map(new Function<String, Student2>() {
      @Override
      public Student2 call(String line) throws Exception {
      String[] split = line.split(",");
      Student2 student=new Student2();
      student.setId(Integer.valueOf(split[0]));
      student.setAge(Integer.valueOf(split[1]));
      student.setName(split[2]);
      return student;
      }
      });
      //使用反射方式将RDD转换成dataFrame
      //将Student.calss传递进去,其实就是利用反射的方式来创建DataFrame
      Dataset<Row> dataFrame = sqlContext.createDataFrame(studentRDD, Student2.class);
      //拿到DataFrame之后将其注册为临时表,然后针对其中的数据执行SQL语句
      dataFrame.registerTempTable("studentTable");

      //针对student临时表,执行sql语句查询年龄小于18岁的学生,
      /*DataFrame rowDF */
      Dataset<Row> dataset = sqlContext.sql("select * from studentTable where age < 18");
      JavaRDD<Row> rowJavaRDD = dataset.toJavaRDD();
      JavaRDD<Student2> ageRDD = rowJavaRDD.map(new Function<Row, Student2>() {
      @Override
      public Student2 call(Row row) throws Exception {
      Student2 student = new Student2();
      student.setId(row.getInt(0));
      student.setAge(row.getInt(1));
      student.setName(row.getString(2));

      return student;
      }
      });
      ageRDD.foreach(new VoidFunction<Student2>() {
      @Override
      public void call(Student2 student) throws Exception {
      System.out.println(student.toString());
      }
      });
      }
      }

      Student2类:

         

        package com.unicom.ljs.spark220.study;

        import java.io.Serializable;

        /**
        * @author: Created By lujisen
        * @company ChinaUnicom Software JiNan
        * @date: 2020-01-20 08:57
        * @version: v1.0
        * @description: com.unicom.ljs.spark220.study
        */
        public class Student2 implements Serializable {
        int id;
        int age;
        String name;

        public int getId() {
        return id;
        }

        public void setId(int id) {
        this.id = id;
        }

        public int getAge() {
        return age;
        }

        public void setAge(int age) {
        this.age = age;
        }

        public String getName() {
        return name;
        }

        public void setName(String name) {
        this.name = name;
        }

        @Override
        public String toString() {
        return "Student2{" +
        "id=" + id +
        ", age=" + age +
        ", name='" + name + '\'' +
        '}';
        }
        }

        pom.xml关键依赖:

        <spark.version>2.2.0</spark.version><scala.version>2.11.8</scala.version>
          <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>${spark.version}</version>
          </dependency>
          <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
          </dependency>

          文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论