6.1
TopN操作
6.2
GroupTopN操作

6.1
TopN操作
提供一组数据,将其按照降序排列并取出前几位:
//Javaimport kfk.spark.common.CommSparkContext;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.List;public class TopnJava {public static void main(String[] args) {JavaSparkContext sc = CommSparkContext.getsc();List list = Arrays.asList(23,12,56,44,23,99,13,57);/*** 23,12,56,44* map -> 将(23,12,56...)写成 (23,23)(12,12)(56,56)..格式* sortByKey -> 通过key值进行排序 (99,99)(57,57)(56,56)...* map -> 取出value值 (99,57,56...)* task(2) -> 取出前两项 99,57(List)* for()*/JavaRDD rdd = sc.parallelize(list);JavaPairRDD beginSort = rdd.mapToPair(new PairFunction<Integer,Integer,Integer>() {@Overridepublic Tuple2 call(Integer value) throws Exception {return new Tuple2(value,value);}});JavaPairRDD sortValues = beginSort.sortByKey(false);JavaRDD beginTop = sortValues.map(new Function<Tuple2<Integer,Integer>,Integer>() {@Overridepublic Integer call(Tuple2<Integer,Integer> o) throws Exception {return o._2;}});List<Integer> topn = beginTop.take(2);for (int value : topn){System.out.println(value);}}}
//Scalaimport kfk.spark.common.CommSparkContextScaobject TopnScala {def main(args: Array[String]): Unit = {val sc = CommSparkContextSca.getsc();val list = Array(23,12,56,44,23,99,13,57);val rdd = sc.parallelize(list);val beginSort = rdd.map(x => (x,x));val sortValue = beginSort.sortByKey(false);val beginTopn = sortValue.map(x => x._2);val topn = beginTopn.take(2);for (elem <- topn) {System.out.println(elem)}}}
打印结果:


6.2
GroupTopN操作
提供一组key&value值,按照key分组,比对value值大小,并取出前几位:
//Javaimport kfk.spark.common.CommSparkContext;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import java.util.*;public class GroupTopnJava {public static void main(String[] args) {JavaSparkContext sc = CommSparkContext.getsc();List list = Arrays.asList("class1 67","class2 78","class1 78","class1 99","class1 109","class1 34","class1 45","class2 34","class2 88","class2 98","class2 33");JavaRDD rdd = sc.parallelize(list);JavaPairRDD beginGroup = rdd.mapToPair(new PairFunction<String,String,Integer>() {@Overridepublic Tuple2 call(String line) throws Exception {String _key = line.split(" ")[0];String _value = line.split(" ")[1];return new Tuple2(_key,_value); //切分成<class1,67><class2,78>..这种格式}});JavaPairRDD groupValues = beginGroup.groupByKey(); // 按key分组 <class1,(67,78,99,109,34,45,88)> .../*** topN算法演化过程* <class1,(67,78,99,109,34,45,88)>* top(3) <class1,(109,99,88)>* list开始为null,第一个数67加载进去 ->(67)* 第二个数78和67比对,大于放到67前面 ->(78,67)* 第三个数99和78比较,大于放到78前面 ->(99,78,67)* 第四个数109和99比较,大于放到00前面,同时移除掉最后一位数67,保证list里只有三位 ->(109,99,78)* 第五个数34,依次和list里的值比较,不做变化 ->(109,99,78)* 下面依次类推45 ->(109,99,78)* 88 ->(109,99,88) (remove ->78)** 78 ->78,67 (add)* 特殊情况:在list未满三个的时候 34比较后直接添加到list中 ->(78,67,34)*/JavaPairRDD groupTop = groupValues.mapToPair(new PairFunction<Tuple2<String,Iterable>,String,Integer>() {@Overridepublic Tuple2 call(Tuple2<String,Iterable> value) throws Exception {Iterator iterator = value._2.iterator();LinkedList linkedList = new LinkedList();while (iterator.hasNext()){int _value = Integer.parseInt(String.valueOf(iterator.next()));if(linkedList.size() == 0){linkedList.add(_value);}else{for (int i = 0; i < linkedList.size();i++){if(_value > (int)linkedList.get(i)){linkedList.add(i,_value);if(linkedList.size() > 3){linkedList.removeLast();}break;}else{if(linkedList.size() < 3){linkedList.add(_value);break;}}}}}return new Tuple2(value._1,linkedList);}});groupTop.foreach(new VoidFunction<Tuple2<String,List>>() {@Overridepublic void call(Tuple2<String,List> o) throws Exception {System.out.println(o._1);List list = o._2;for (Object num : list){System.out.println(num);}}});}}
//Scalaimport kfk.spark.common.CommSparkContextScaobject GroupTopnScala {def main(args: Array[String]): Unit = {val sc = CommSparkContextSca.getsc()val list = Array("class1 67","class2 78","class1 78","class1 99","class1 109","class1 34","class1 45","class2 34","class2 88","class2 98","class2 33");val rdd = sc.parallelize(list);val beginGroup = rdd.map( x => {val key = x.split(" ")(0)val value = x.split(" ")(1).toInt;(key,value)});val groupValues = beginGroup.groupByKey();val topValues = groupValues.map(x => {val values = x._2.toList.sortWith((x,y) => (x > y)).take(3)(x._1,values)});topValues.foreach(x => {System.out.println(x._1);x._2.foreach( y=> System.out.println(y))})}}
打印结果:



下面将介绍一些常用的MapPartitions案例。
欢迎收看下一章,大数据分析之Spark核心编程(七)。
文章转载自数据信息化,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。






