8.1
Cartesian操作
8.2
Coalesce操作
8.3
Repartition操作

8.1
Cartesian操作
Cartesian:笛卡尔乘积
比如有两个RDD,那么通过Cartesian算子可以将两个RDD的每一条数据相互join一次,最终组成一个笛卡尔乘积。
如将下面的衣服和裤子匹配起来,共有9种搭配方式:

//Javaimport kfk.spark.common.CommSparkContext;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;import java.util.List;public class CartesionJava {public static void main(String[] args) {JavaSparkContext sc = CommSparkContext.getsc();List list1 = Arrays.asList("衣服-1","衣服-2","衣服-3");List list2 = Arrays.asList("裤子-1","裤子-2","裤子-3");JavaRDD rdd1 = sc.parallelize(list1);JavaRDD rdd2 = sc.parallelize(list2);JavaPairRDD carteValues = rdd1.cartesian(rdd2);for (Object o : carteValues.collect()) {System.out.println(o);}}}
//Scalaimport kfk.spark.common.CommSparkContextScaobject CartesianScala {def main(args: Array[String]): Unit = {val sc = CommSparkContextSca.getsc();val list1 = Array("衣服-1","衣服-2","衣服-3");val list2 = Array("裤子-1","裤子-2","裤子-3");val rdd1 = for (elem <- sc.parallelize(list1).cartesian(sc.parallelize(list2)).collect()) {System.out.println(elem)}}}
打印结果:


8.2
Coalesce操作
Coalesce算子是将RDD的partition数量缩减,将一定量的数据压缩到更少放入partiton中。
一般与filter算子配合使用,使用filter算子过滤掉很多数据之后,就会出现很多parition种数据不均匀的情况,此时我们就可以用Coalesce算子来压缩RDD的partition的数量,从而让各个partition中的数据均匀,不浪费内存占用资源。
//Javaimport kfk.spark.common.CommSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;public class CoalesceJava {public static void main(String[] args) {JavaSparkContext sc = CommSparkContext.getsc();List list = Arrays.asList("henry","chery","ben","leo","lili");JavaRDD rdd = sc.parallelize(list,4); //分配4个分区JavaRDD mapIndexValues = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator, Iterator>() {@Overridepublic Iterator call(Integer index, Iterator iterator) throws Exception {List _list = new ArrayList();while (iterator.hasNext()){String userName = String.valueOf(iterator.next());_list.add((index+1) +":"+userName);}return _list.iterator(); //返回一个带分区编号的集合}},false);JavaRDD coalesceValues = mapIndexValues.coalesce(2); //将分区数下调至2个JavaRDD mapIndexValues2 = coalesceValues.mapPartitionsWithIndex(new Function2<Integer, Iterator, Iterator>() {@Overridepublic Iterator call(Integer index, Iterator iterator) throws Exception {List _list = new ArrayList();while (iterator.hasNext()){String userName = String.valueOf(iterator.next());_list.add((index+1) +":"+userName);}return _list.iterator(); //返回一个带新分区编号的集合}},false);for (Object o : mapIndexValues2.collect()) {System.out.println(o);}}}
//Scalaimport kfk.spark.common.CommSparkContextScaobject CoalesceScala {def main(args: Array[String]): Unit = {val sc = CommSparkContextSca.getsc();val list = Array("henry","chery","ben","leo","lili");val rdd = sc.parallelize(list,4)val mapIndexValues = rdd.mapPartitionsWithIndex((index, x) => {var list = List[String]()while (x.hasNext){val userName = x.next();list .::= ((index+1) + ":"+userName)}list.iterator})val coalesceValues = mapIndexValues.coalesce(2)val mapIndexValues2 = coalesceValues.mapPartitionsWithIndex((index, x) =>{var list = List[String]()while (x.hasNext){val userName = x.next();list .::= ((index+1) + ":"+userName)}list.iterator})for (elem <- mapIndexValues2.collect()) {System.out.println(elem)}
打印结果:

可以看到原来在3,4分区的值最后都分配到了2分区,共有2个分区。

8.3
Repartition操作
Repartition算子是将任意RDD的partition数量增大或者减小。
与coalesce不同的是,Coalease只能将rdd的partition数量减少,而repartition对rdd的partition数量做到自由改变。
建议使用的场景:
SparkSQL加载hive的数据之后,自动分配(这里是按照hive数据对应到HDFS文件的block数量)的partition数量比较少,影响算子的运行速度。此时,在Spark SQL加载hive数据后,我们可以手动的去设置partition的数量来提供算子的运行速度。
//Javaimport kfk.spark.common.CommSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;public class RePartitionJava {public static void main(String[] args) {JavaSparkContext sc = CommSparkContext.getsc();List list = Arrays.asList("henry","chery","ben","leo","lili");JavaRDD rdd = sc.parallelize(list,2); //分配两个分区JavaRDD mapIndexValues = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator, Iterator>() {@Overridepublic Iterator call(Integer index, Iterator iterator) throws Exception {List _list = new ArrayList();while (iterator.hasNext()){String userName = String.valueOf(iterator.next());_list.add((index+1) +":"+userName);}return _list.iterator();}},false);JavaRDD coalesceValues = mapIndexValues.repartition(3); //上调至3个分区JavaRDD mapIndexValues2 = coalesceValues.mapPartitionsWithIndex(new Function2<Integer, Iterator, Iterator>() {@Overridepublic Iterator call(Integer index, Iterator iterator) throws Exception {List _list = new ArrayList();while (iterator.hasNext()){String userName = String.valueOf(iterator.next());_list.add((index+1) +":"+userName);}return _list.iterator();}},false);for (Object o : mapIndexValues2.collect()) {System.out.println(o);}}
//Scalaimport kfk.spark.common.CommSparkContextScaobject RePartitionScala {def main(args: Array[String]): Unit = {val sc = CommSparkContextSca.getsc();val list = Array("henry","chery","ben","leo","lili");val rdd = sc.parallelize(list,2)val mapIndexValues = rdd.mapPartitionsWithIndex((index, x) => {var list = List[String]()while (x.hasNext){val userName = x.next();list .::= ((index+1) + ":"+userName)}list.iterator})val coalesceValues = mapIndexValues.repartition(3)val mapIndexValues2 = coalesceValues.mapPartitionsWithIndex((index, x) =>{var list = List[String]()while (x.hasNext){val userName = x.next();list .::= ((index+1) + ":"+userName)}list.iterator})for (elem <- mapIndexValues2.collect()) {System.out.println(elem)}}}
打印结果:

可以看到原来在1,2分区的值最后有的分配到第3个分区中,共有3个分区。

下面将介绍一些常用的Sample案例。
欢迎收看下一章,大数据分析之Spark核心编程(九)。






