暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Mapreduce经典案例:TopN

小甜菜Pro 2023-02-04
532

TopN分析是将数据中按照一个指标进行正序或者倒序排列,获取其中的前N条数据并重点进行分析。
假设有如下数据文件file.txt
,将其中的数字由大到小排序并取出前5个数字。
1 4 2
9 8 3 9
2 6 3 4 1
7 5 5

为实现以上需求则需要考虑以下几点:
1、Reduce分区只能设置为一个,因为是全局的前5条数据,则不管中间有多少个Map和Reduce阶段,最终只能有一个用来做数据汇总。
2、在Java中有TreeMap数据结构,它是一个有序的key-value集合,默认根据key的自然顺序排序,也可以根据需求自定义创建排序规则。但是通过以上的文本数据中可以发现存在相同数字,则不可以简单的使用TreeMap数据结构进行排序,这是因为在TreeMap中key是不可以有重复值的,因此这里我重新编写了一个名为IdentityTreeMap
的数据结构,专门解决key不可以重复的问题。
IdentityTreeMap
数据结构中,它基于TreeMap实现内部自动排序,并且其中key和value都可以有重复,与TreeMap类似,它也是根据key进行排序,默认为自然顺序排序,但也可以根据需求自定义创建排序规则。
举一个例子:将(1, 2);(2, 1);(1, 1);(3, 2);(2, 2)
分别加入到IdentityTreeMap
中,它在内部会自动变成(1, 2);(1, 1);(2, 1);(2, 2);(3, 2)
,这样就实现了按照key排序并且可重复,具体实现过程请在公众号后台回复20230204
获取详细代码。
3、在此案例中会使用cleanup()方法,该方法是整个MapTask执行完成后或整个ReduceTask执行完成后才执行的一个方法。
下面进行代码实现。
在Map阶段定义一个IdentityTreeMap
数据结构,并且定义该排序为倒序排序,同时命名为map
将传来的每行数据进行切分,获取所有数字并保存在名为num
的数组中,接下来遍历该数组写入到map中,键和值都为相同的数字,此处的原因有两个:一是IdentityTreeMap
数据结构为双值集合(key-value);二是需要按照key进行排序,value进行计数。事实上IdentityTreeMap
数据结构是通过TreeMap
的key进行排序,value进行向后追加实现的,这样就不会违背TreeMap数据结构的规则(key是唯一的),在该案例中可以抽象的理解成key进行排序,value进行计数。举一个例子:(2, 2);(4, 4);(3, 3);(2, 2);(1, 1)
IdentityTreeMap
中会自动变成(1, 1);(2, 2);(2, 2);(3, 3);(4, 4)
,实际上它是这样存储的:[1, (1)];[2, (2, 2)];[3, (3)];[4, (4)]
,通过这个例子再回读上面的解释可能会更好理解。
写入到map中后同时判断元素个数,若个数超过指定的N则删除map中最后一个元素(因为这个map是逆序排序,最小的元素在最后),只保留前N个较大的数字。这里的remove()
方法可以根据指定key-value删除元素,参数可以为HashMap类型。这里的getLast()
方法用来获取map中最后一个元素,返回类型为HashMap。
当把数据文件都读取完成后进行cleanup操作,遍历map中所有value,设置VAL并写入下一阶段。
以下是Map阶段详细代码。
public class TopNMapper extends Mapper<LongWritableTextNullWritableIntWritable{
    int N = 5;
    private static final NullWritable KEY = NullWritable.get();
    private static final IntWritable VAL = new IntWritable();
    private final IdentityTreeMap<Integer, Integer> map = new IdentityTreeMap<>((o1, o2) -> o2 - o1);
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, IntWritable>.Context context) {
        String[] num = value.toString().split(" ");
        for (String n : num) {
            map.put(Integer.parseInt(n), Integer.parseInt(n));
            if (map.size() > N) map.remove(map.getLast());
        }
    }
    protected void cleanup(Mapper<LongWritable, Text, NullWritable, IntWritable>.Context context) throws IOException, InterruptedException {
        for (Integer value : map.values()) {
            VAL.set(value);
            context.write(KEY, VAL);
        }
    }
}

在Reduce阶段也是和Map阶段同样的操作,到这里可能会想:如果在map阶段只是把数据进行切分,然后直接传入到reduce阶段,最后在reduce阶段集中进行汇总是不是可以?为什么要进行这样重复的操作?
以上问题中第一问的答案是肯定的,在map阶段可以直接简单的将数据切分并最后在reduce阶段进行汇总排序,但是这样是有弊端的,不能充分发挥出分布式的原理。MapReduce核心思想是分而治之,将一件任务量大的任务交给多台服务器处理,而map阶段恰好是将一件任务量大的任务分交给各个服务器,而reduce阶段一般只是单台服务器进行运算,因此要尽量将繁重复杂的任务放在map阶段处理,这样就可以使资源利用最大化。
此案例所使用的数据量极小,只是做演示使用,但是当数据量高达几个TB的时候,map阶段只是做简单的数据切分这样会浪费各个服务器大量计算资源,而繁重的排序工作只是移交给最后做汇总的单台服务器运行,那么这样用MapReduce程序是为什么呢?与单台服务器直接处理数据又有何区别呢?
因此为了避免这个问题,则需要在map阶段进行排序筛选,然后将各个服务器处理后的数据再进行单台服务器reduce处理,这样就可以将任务量大的任务分散到各个服务器,相比之下最后做汇总的单台服务器会比其他只做map操作的服务器任务量重一些,但多出来的任务量也是微乎其微,这样就做到了资源利用最大化,从而充分实现了分布式计算。
以下是Reduce阶段详细代码。
public class TopNReducer extends Reducer<NullWritableIntWritableNullWritableIntWritable{
    int N = 5;
    private static final NullWritable KEY = NullWritable.get();
    private static final IntWritable VAL = new IntWritable();
    private final IdentityTreeMap<Integer, Integer> map = new IdentityTreeMap<>((o1, o2) -> o2 - o1);
    protected void reduce(NullWritable key, Iterable<IntWritable> values, Reducer<NullWritable, IntWritable, NullWritable, IntWritable>.Context context) {
        for (IntWritable value : values) {
            map.put(value.get(), value.get());
            if (map.size() > N) map.remove(map.getLast());
        }
    }
    protected void cleanup(Reducer<NullWritable, IntWritable, NullWritable, IntWritable>.Context context) throws IOException, InterruptedException {
        for (Integer value : map.values()) {
            VAL.set(value);
            context.write(KEY, VAL);
        }
    }
}

最后是启动类,主要用来设置Mapper类以及Map的输出类型、设置Reducer类以及Reduce的输出类型和文件的输入输出路径,这里不再详细说明,具体实现过程请在公众号后台回复20230204
获取详细代码。
将代码进行打包(具体如何打包请参考本公众号中如何使用IDEA将Maven项目进行打包?
文章,或在本公众号后台回复关键词20230117
获取该图文),并上传至服务器中/usr/local/hadoop/jar
路径下,同时开启HDFS和Yarn集群,将本文开头的file.txt
文件上传至服务器中/root
路径下。
在服务器中使用$HADOOP_HOME/bin/hadoop fs -mkdir -p Mapreduce/TopN/input
命令在HDFS中创建数据上传路径,使用$HADOOP_HOME/bin/hadoop fs -put root/file.txt Mapreduce/TopN/input
命令将file.txt
文件上传至HDFS的/Mapreduce/TopN/input
路径下。
使用$HADOOP_HOME/bin/hadoop jar usr/local/hadoop/jar/topn.jar Mapreduce/TopN/input Mapreduce/TopN/output
命令启动MapReduce程序并提交到Yarn集群。

等待运行结束后使用$HADOOP_HOME/bin/hadoop fs -cat Mapreduce/TopN/output/part-r-00000
命令查看运行结果,此案例运行结果如下所示。

以上为TopN经典案例,下面介绍一个TopN的实际应用场景:统计电影评分排行Top100(2018年及其之后年份)
以下数据来源于Kaggle官网https://www.kaggle.com/datasets/chaitanyahivlekar/large-movie-dataset
movies_dataset.csv
文件是每个用户对电影的评分数据,每一行为一条数据,每行数据有5个字段,分别是:行号、用户ID、电影名称、评分值和电影类型。示例数据如下:0,1,Pulp Fiction (1994),5.0,Comedy|Crime|Drama|Thriller
,本案例已经将该文件中的第一行删除(该数据第一行为字段名称并不是有效数据)。
本案例只需将数据的每行中电影名称和评分值进行提取,然后进行分析即可。可以看到该案例主要思想是TopN,但是在之前需要将不同电影的评分统计出来,之后才可进行TopN,因此整体思想流程如下。

Count
TopN
两个Mapreduce程序串联才可实现需求。
1、首先是Count。
在Map阶段将数据进行切分并获得电影名称和评分值,在使用分隔符,
进行数据切分时,数组中倒数第二个元素一定为评分值,但是由于在电影名称字段中也可能有,
,因此切分后的data
数组长度可能不为5,这里需要进行一个判断:若数组长度为5,则第二个元素一定为电影名称;若数组长度大于5,则需要将数组中第二个元素至倒数第二个元素之间的所有元素拼接起来,使用了StringBuilder
数据结构,得到电影名称;若数组长度小于5,则直接返回,数据为无效数据。
通过观察数据可以发现在电影名称字段中存在年份信息,将该信息提取出来获得电影的年份,若在提取过程中出现异常,则说明电影无年份信息,直接返回即可。最后通过匹配的年份将数据进行写入下一阶段。
以下是Map阶段详细代码。
public class CountMapper extends Mapper<LongWritableTextTextFloatWritable{
    private static final Text KEY = new Text();
    private static final FloatWritable VAL = new FloatWritable();
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FloatWritable>.Context context) throws IOException, InterruptedException {
        String[] data = value.toString().split(",");
        int length = data.length;
        float score = Float.parseFloat(data[length - 2]);
        String name;
        if (length == 5) name = data[2];
        else if (length > 5) {
            StringBuilder nameStringBuilder = new StringBuilder();
            for (int i = 2; i < length - 2; i++) nameStringBuilder.append(data[i]);
            String nameStringBuilderToString = nameStringBuilder.toString();
            name = nameStringBuilderToString.substring(1, nameStringBuilderToString.length() - 1);
        } else return;
        int year;
        try {
            String[] splitLeft = name.split("\\(");
            year = Integer.parseInt(splitLeft[splitLeft.length - 1].split("\\)")[0]);
        } catch (NumberFormatException ignored) return;
        if (year >= 2018) {
            KEY.set(name);
            VAL.set(score);
            context.write(KEY, VAL);
        }
    }
}

在Reduce阶段将上一阶段产生的values(各个电影的所有评分集合)进行遍历,将其进行累加得到各个电影的评分总和,同时使用count
记录各个电影的评分用户数量,然后通过这两个指标就可以得到各个电影的平均评分并将结果保留1位小数,最后设置KEY和VAL并写入下一阶段。
以下是Reduce阶段详细代码。
public class CountReducer extends Reducer<TextFloatWritableTextText{
    private static final Text KEY = new Text();
    private static final Text VAL = new Text();
    protected void reduce(Text key, Iterable<FloatWritable> values, Reducer<Text, FloatWritable, Text, Text>.Context context) throws IOException, InterruptedException {
        int count = 0;
        float sum = 0;
        for (FloatWritable score : values) {
            sum += score.get();
            count++;
        }
        String average = String.format("%.1f", sum / count);
        KEY.set(key);
        VAL.set(average);
        context.write(KEY, VAL);
    }
}

2、其次是TopN。
在Map阶段定义一个IdentityTreeMap
数据结构,并且定义该排序为倒序排序,同时命名为map
将传来的数据进行切分,分隔符为\t
,得到的数组第一个元素为评分值,第二个元素为电影名称,然后写入map中,键为评分,值为电影名称,始终保留map中前N条数据,若超出N条则删除map中最后一条(因为map中key是按降序排列的,key值大的在map前部)。
当把数据文件都读取完成后进行cleanup操作,遍历map中每个元素,并通过切分获得key和value,同时设置VAL并写入下一阶段。
以下是Map阶段详细代码。
public class TopNMapper extends Mapper<LongWritableTextNullWritableText{
    int N = 100;
    private static final NullWritable KEY = NullWritable.get();
    private static final Text VAL = new Text();
    private final IdentityTreeMap<Float, String> map = new IdentityTreeMap<>((o1, o2) -> {
        float flag = o2 - o1;
        if (flag > 0return 1;
        if (flag < 0return -1;
        return 0;
    });
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context) {
        String[] data = value.toString().split("\t");
        String name = data[0];
        float score = Float.parseFloat(data[1]);
        map.put(score, name);
        if (map.size() > N) map.remove(map.getLast());
    }
    protected void cleanup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
        for (HashMap<Float, String> element : map.getAll()) {
            String[] elementStringSplit = element.toString().split("=");
            VAL.set(elementStringSplit[0].split("\\{")[1] + "\t" + elementStringSplit[1].split("\\}")[0]);
            context.write(KEY, VAL);
        }
    }
}

在Reduce阶段也是和Map阶段同样的操作,若有疑问请参考本文中上半篇关于经典案例TopN中Reduce阶段的说明,这里不再赘述。
以下是Reduce阶段详细代码。
public class TopNReducer extends Reducer<NullWritableTextTextText{
    int N = 100;
    private static final Text KEY = new Text();
    private static final Text VAL = new Text();
    private final IdentityTreeMap<Float, String> map = new IdentityTreeMap<>((o1, o2) -> {
        float flag = o2 - o1;
        if (flag > 0return 1;
        if (flag < 0return -1;
        return 0;
    });
    protected void reduce(NullWritable key, Iterable<Text> values, Reducer<NullWritable, Text, Text, Text>.Context context) {
        for (Text value : values) {
            String[] valueSplit = value.toString().split("\t");
            map.put(Float.parseFloat(valueSplit[0]), valueSplit[1]);
            if (map.size() > N) map.remove(map.getLast());
        }
    }
    protected void cleanup(Reducer<NullWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        for (HashMap<Float, String> element : map.getAll()) {
            String[] elementStringSplit = element.toString().split("=");
            KEY.set(elementStringSplit[0].split("\\{")[1]);
            VAL.set(elementStringSplit[1].split("\\}")[0]);
            context.write(KEY, VAL);
        }
    }
}

本案例核心代码为如上四个文件,其余代码不再详细说明,若想查看具体实现过程请在公众号后台回复20230204
获取详细代码。
最后将代码进行打包(具体如何打包请参考本公众号中如何使用IDEA将Maven项目进行打包?
文章,或在本公众号后台回复关键词20230117
获取该图文),并上传至服务器中的/usr/local/hadoop/jar
路径下,同时开启HDFS和Yarn集群,将movies_dataset.csv
电影数据上传至服务器中的/root
路径下。
在服务器中使用$HADOOP_HOME/bin/hadoop fs -mkdir -p /Mapreduce/TopNMovies/input
命令在HDFS中创建数据上传路径,使用$HADOOP_HOME/bin/hadoop fs -put /root/movies_dataset.csv /Mapreduce/TopNMovies/input
命令将movies_dataset.csv
文件上传至HDFS的/Mapreduce/TopNMovies/input
路径下。
使用$HADOOP_HOME/bin/hadoop jar /usr/local/hadoop/jar/topn-movies.jar /Mapreduce/TopNMovies/input /Mapreduce/TopNMovies/output /Mapreduce/TopNMovies/temp
命令(该命令需要传递三个参数,第一个为Count阶段输入路径,第二个为TopN阶段输出路径,第三个为Count阶段输出路径也是TopN阶段输入路径)启动MapReduce程序并提交到Yarn集群。

等待运行结束后使用$HADOOP_HOME/bin/hadoop fs -cat /Mapreduce/TopNMovies/output/part-r-00000
命令查看运行结果,此案例运行结果如下所示。

本文介绍了TopN经典案例,并在其基础上介绍了它的实际应用场景。以上就是本期分享的全部内容,想要获取本文中的全部代码及数据,请在公众号后台回复20230204
,若各位小伙伴有什么不懂的问题也可以直接在公众号后台留言,感谢观看!

文章转载自小甜菜Pro,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论