暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【大数据分析】Spark核心编程(八)

数据信息化 2020-01-05
321

大数据分析之



Spark核心编程(八)


Spark之笛卡尔积和重分区操作

8.1

Cartesian操作



8.2

Coalesce操作



8.3

Repartition操作



8.1


Cartesian操作

Cartesian:笛卡尔乘积

    比如有两个RDD,那么通过Cartesian算子可以将两个RDD的每一条数据相互join一次,最终组成一个笛卡尔乘积。

    如将下面的衣服和裤子匹配起来,共有9种搭配方式:

//Java
import kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;


import java.util.Arrays;
import java.util.List;


public class CartesionJava {
public static void main(String[] args) {
JavaSparkContext sc = CommSparkContext.getsc();
List list1 = Arrays.asList("衣服-1","衣服-2","衣服-3");
List list2 = Arrays.asList("裤子-1","裤子-2","裤子-3");


JavaRDD rdd1 = sc.parallelize(list1);
JavaRDD rdd2 = sc.parallelize(list2);


JavaPairRDD carteValues = rdd1.cartesian(rdd2);
        for (Object o : carteValues.collect()) {
System.out.println(o);
}
}
}
//Scala
import kfk.spark.common.CommSparkContextSca


object CartesianScala {
def main(args: Array[String]): Unit = {
val sc = CommSparkContextSca.getsc();
val list1 = Array("衣服-1","衣服-2","衣服-3");
val list2 = Array("裤子-1","裤子-2","裤子-3");


val rdd1 = for (elem <- sc.parallelize(list1).cartesian(sc.parallelize(list2)).collect()) {
System.out.println(elem)
}
}
}

打印结果:

8.2


Coalesce操作

    Coalesce算子是将RDD的partition数量缩减,将一定量的数据压缩到更少放入partiton中。

    一般与filter算子配合使用,使用filter算子过滤掉很多数据之后,就会出现很多parition种数据不均匀的情况,此时我们就可以用Coalesce算子来压缩RDD的partition的数量,从而让各个partition中的数据均匀,不浪费内存占用资源。

//Java
import kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;


import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;


public class CoalesceJava {


public static void main(String[] args) {
JavaSparkContext sc = CommSparkContext.getsc();


List list = Arrays.asList("henry","chery","ben","leo","lili");
JavaRDD rdd = sc.parallelize(list,4); //分配4个分区
JavaRDD mapIndexValues = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator, Iterator>() {
@Override
public Iterator call(Integer index, Iterator iterator) throws Exception {
List _list = new ArrayList();
while (iterator.hasNext()){
String userName = String.valueOf(iterator.next());
_list.add((index+1) +":"+userName);
}
                return _list.iterator();  //返回一个带分区编号的集合
}
},false);




        JavaRDD coalesceValues = mapIndexValues.coalesce(2);  //将分区数下调至2个
JavaRDD mapIndexValues2 = coalesceValues.mapPartitionsWithIndex(new Function2<Integer, Iterator, Iterator>() {
@Override
public Iterator call(Integer index, Iterator iterator) throws Exception {
List _list = new ArrayList();
while (iterator.hasNext()){
String userName = String.valueOf(iterator.next());
_list.add((index+1) +":"+userName);
}
                return _list.iterator();  //返回一个带新分区编号的集合
}
},false);


for (Object o : mapIndexValues2.collect()) {
System.out.println(o);
}


}
}
//Scala
import kfk.spark.common.CommSparkContextSca


object CoalesceScala {


def main(args: Array[String]): Unit = {
val sc = CommSparkContextSca.getsc();
val list = Array("henry","chery","ben","leo","lili");


val rdd = sc.parallelize(list,4)


val mapIndexValues = rdd.mapPartitionsWithIndex((index, x) => {
var list = List[String]()


while (x.hasNext){
val userName = x.next();
list .::= ((index+1) + ":"+userName)
}
list.iterator
})


val coalesceValues = mapIndexValues.coalesce(2)


val mapIndexValues2 = coalesceValues.mapPartitionsWithIndex((index, x) =>{
var list = List[String]()
while (x.hasNext){
val userName = x.next();
list .::= ((index+1) + ":"+userName)
}
list.iterator
})


for (elem <- mapIndexValues2.collect()) {
System.out.println(elem)
}

打印结果:

可以看到原来在3,4分区的值最后都分配到了2分区,共有2个分区。

8.3


Repartition操作

    Repartition算子是将任意RDD的partition数量增大或者减小。

    与coalesce不同的是,Coalease只能将rdd的partition数量减少,而repartition对rdd的partition数量做到自由改变

建议使用的场景:

    SparkSQL加载hive的数据之后,自动分配(这里是按照hive数据对应到HDFS文件的block数量)的partition数量比较少,影响算子的运行速度。此时,在Spark SQL加载hive数据后,我们可以手动的去设置partition的数量来提供算子的运行速度。

//Java
import kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;


import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;


public class RePartitionJava {
public static void main(String[] args) {
JavaSparkContext sc = CommSparkContext.getsc();


List list = Arrays.asList("henry","chery","ben","leo","lili");
        JavaRDD rdd = sc.parallelize(list,2);  //分配两个分区
JavaRDD mapIndexValues = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator, Iterator>() {
@Override
public Iterator call(Integer index, Iterator iterator) throws Exception {
List _list = new ArrayList();
while (iterator.hasNext()){
String userName = String.valueOf(iterator.next());
_list.add((index+1) +":"+userName);
}
return _list.iterator();
}
},false);




        JavaRDD coalesceValues = mapIndexValues.repartition(3);  //上调至3个分区
JavaRDD mapIndexValues2 = coalesceValues.mapPartitionsWithIndex(new Function2<Integer, Iterator, Iterator>() {
@Override
public Iterator call(Integer index, Iterator iterator) throws Exception {
List _list = new ArrayList();
while (iterator.hasNext()){
String userName = String.valueOf(iterator.next());
_list.add((index+1) +":"+userName);
}
return _list.iterator();
}
},false);


for (Object o : mapIndexValues2.collect()) {
System.out.println(o);
        }
}
//Scala
import kfk.spark.common.CommSparkContextSca


object RePartitionScala {
def main(args: Array[String]): Unit = {
val sc = CommSparkContextSca.getsc();
val list = Array("henry","chery","ben","leo","lili");


val rdd = sc.parallelize(list,2)


val mapIndexValues = rdd.mapPartitionsWithIndex((index, x) => {
var list = List[String]()


while (x.hasNext){
val userName = x.next();
list .::= ((index+1) + ":"+userName)
}
list.iterator
})


val coalesceValues = mapIndexValues.repartition(3)
val mapIndexValues2 = coalesceValues.mapPartitionsWithIndex((index, x) =>{
var list = List[String]()
while (x.hasNext){
val userName = x.next();
list .::= ((index+1) + ":"+userName)
}
list.iterator
    })
for (elem <- mapIndexValues2.collect()) {
System.out.println(elem)
    }
}
}

打印结果:

可以看到原来在1,2分区的值最后有的分配到第3个分区中,共有3个分区。

下面将介绍一些常用的Sample案例。


    欢迎收看下一章,大数据分析之Spark核心编程(九)。



文章转载自数据信息化,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论