暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【大数据分析】Spark核心编程(六)

数据信息化 2020-01-02
553

大数据分析之



Spark核心编程(六)


Spark之排序TopN操作

6.1

TopN操作



6.2

GroupTopN操作



6.1


TopN操作

    提供一组数据,将其按照降序排列并取出前几位:

//Java
import kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;


import java.util.Arrays;
import java.util.List;


public class TopnJava {
public static void main(String[] args) {
JavaSparkContext sc = CommSparkContext.getsc();
List list = Arrays.asList(23,12,56,44,23,99,13,57);
/**
* 23,12,56,44
         * map        -> 将(23,12,56...)写成 (23,23)(12,12)(56,56)..格式
         * sortByKey  -> 通过key值进行排序 (99,99)(57,57)(56,56)...
         * map        ->  取出value值 (99,57,56...)
* task(2) -> 取出前两项 99,57(List)
* for()
*/
JavaRDD rdd = sc.parallelize(list);
JavaPairRDD beginSort = rdd.mapToPair(new PairFunction<Integer,Integer,Integer>() {
@Override
public Tuple2 call(Integer value) throws Exception {
return new Tuple2(value,value);
}
});


JavaPairRDD sortValues = beginSort.sortByKey(false);
JavaRDD beginTop = sortValues.map(new Function<Tuple2<Integer,Integer>,Integer>() {
@Override
public Integer call(Tuple2<Integer,Integer> o) throws Exception {
return o._2;
}
});
List<Integer> topn = beginTop.take(2);
for (int value : topn){
System.out.println(value);
}
}
}
//Scala
import kfk.spark.common.CommSparkContextSca


object TopnScala {


def main(args: Array[String]): Unit = {
val sc = CommSparkContextSca.getsc();
val list = Array(23,12,56,44,23,99,13,57);
val rdd = sc.parallelize(list);
val beginSort = rdd.map(x => (x,x));
val sortValue = beginSort.sortByKey(false);
val beginTopn = sortValue.map(x => x._2);
val topn = beginTopn.take(2);
for (elem <- topn) {
System.out.println(elem)
}
}
}


打印结果:

6.2


GroupTopN操作

    提供一组key&value值,按照key分组,比对value值大小,并取出前几位:

//Java
import kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;


import java.util.*;


public class GroupTopnJava {
public static void main(String[] args) {
JavaSparkContext sc = CommSparkContext.getsc();
List list = Arrays.asList("class1 67",
"class2 78",
"class1 78",
"class1 99",
"class1 109",
"class1 34",
"class1 45",
"class2 34",
"class2 88",
"class2 98",
"class2 33");


JavaRDD rdd = sc.parallelize(list);
JavaPairRDD beginGroup = rdd.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2 call(String line) throws Exception {
String _key = line.split(" ")[0];
String _value = line.split(" ")[1];
                return new Tuple2(_key,_value); //切分成<class1,67><class2,78>..这种格式
}
});
        JavaPairRDD  groupValues = beginGroup.groupByKey(); // 按key分组 <class1,(67,78,99,109,34,45,88)> ...
/**
* topN算法演化过程
         * <class1,(67,78,99,109,34,45,88)>
         * top(3)  <class1,(109,99,88)>
         * list开始为null,第一个数67加载进去 ->(67)
         * 第二个数78和67比对,大于放到67前面 ->(78,67)
         * 第三个数99和78比较,大于放到78前面 ->(99,78,67)
         * 第四个数109和99比较,大于放到00前面,同时移除掉最后一位数67,保证list里只有三位  ->(109,99,78)
         * 第五个数34,依次和list里的值比较,不做变化  ->(109,99,78)
* 下面依次类推45 ->(109,99,78)
* 88 ->(109,99,88) (remove ->78)
*
* 78 ->78,67 (add)
         * 特殊情况:在list未满三个的时候 34比较后直接添加到list中  ->(78,67,34)
*/
JavaPairRDD groupTop = groupValues.mapToPair(new PairFunction<Tuple2<String,Iterable>,String,Integer>() {
@Override
public Tuple2 call(Tuple2<String,Iterable> value) throws Exception {
Iterator iterator = value._2.iterator();
LinkedList linkedList = new LinkedList();
while (iterator.hasNext()){
int _value = Integer.parseInt(String.valueOf(iterator.next()));
if(linkedList.size() == 0){
linkedList.add(_value);
}else{
for (int i = 0; i < linkedList.size();i++){
if(_value > (int)linkedList.get(i)){
linkedList.add(i,_value);
if(linkedList.size() > 3){
linkedList.removeLast();
}
break;
}else{
if(linkedList.size() < 3){
linkedList.add(_value);
break;
}
}
}
}
}
return new Tuple2(value._1,linkedList);
}
});


groupTop.foreach(new VoidFunction<Tuple2<String,List>>() {
@Override
public void call(Tuple2<String,List> o) throws Exception {
System.out.println(o._1);
List list = o._2;
for (Object num : list){
System.out.println(num);
}
}
});
}
}
//Scala
import kfk.spark.common.CommSparkContextSca


object GroupTopnScala {


def main(args: Array[String]): Unit = {
val sc = CommSparkContextSca.getsc()
val list = Array("class1 67",
"class2 78",
"class1 78",
"class1 99",
"class1 109",
"class1 34",
"class1 45",
"class2 34",
"class2 88",
"class2 98",
"class2 33");


val rdd = sc.parallelize(list);


val beginGroup = rdd.map( x => {
val key = x.split(" ")(0)
val value = x.split(" ")(1).toInt;
(key,value)
});


val groupValues = beginGroup.groupByKey();
val topValues = groupValues.map(x => {
val values = x._2.toList.sortWith((x,y) => (x > y)).take(3)
(x._1,values)
});


topValues.foreach(x => {
System.out.println(x._1);
x._2.foreach( y=> System.out.println(y))
})
}
}

打印结果:

下面将介绍一些常用的MapPartitions案例。


    欢迎收看下一章,大数据分析之Spark核心编程(七)。



文章转载自数据信息化,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论