Web2.0时代,数据爆炸式、指数级增长,大数据分布式计算需求频繁
通过单机内存扩展来增强计算能力,已经无法承载大规模数据量的计算
分布式计算开发和维护的复杂与多变,对程序员要求太高
MapReduce是一个基于集群的高性能并行计算平台。可以使用普通服务器构成一个包含数十、数百、甚至数千个节点的分布式和并行计算集群。
MapReduce是一个并行计算与运行的软件框架。它提供了一个庞大但设计精良的并行计算软件框架,能自动划分计算数据和计算任务,自动完成计算任务的并行化处理,实现在集群节点上自动分配和执行任务并收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂实现细节交由系统负责处理,大大减少了软件开发人员的负担。
MapReduce是一个并行程序设计模型与方法。它提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。
分布可靠,对数据集的操作分发给集群中的多个节点实现可靠性,每个节点周期性返回它完成的任务和最新的状态
提供跨语言编程的能力
各大运营商
中大型互联网公司,如BAT、京东、乐视、美团等
金融银行保险类公司
各大云平台的分布式计算框架
其他本地系统无法承载计算能力的应用

以WordCount为例
WordCount运行图解

输入数据格式解析:InputFormat
输入数据处理:Mapper
数据分组:Partitioner
数据按照key排序
本地规约:Combiner(相当于local reducer,可选)
将任务输出保存在本地
数据远程拷贝
数据按照key排序
数据处理:Reducer
数据输出格式:OutputFormat
搭建开发环境,参考HDFS环境搭建,基本一致
基于MapReduce框架编写代码
编译打包,将源代码和依赖jar包打成一个包
上传至运行环境
运行hadoop jar命令,现已由yarn jar替代,建议使用新命令提交执行
通过yarn web ui查看执行过程
查看执行结果
Mapper:是MapReduce计算框架中Map过程的封装
Text:Hadoop对Java String类的封装,适用于Hadoop对文本字符串的处理
IntWritable:Hadoop对Java Integer类的封装,适用于Hadoop整型的处理
Context:Hadoop环境基于上下文的操作对象,如Map中key/value的输出、分布式缓存数据、分布式参数传递等
StringTokenizer:对String对象字符串的操作类,做基于空白字符的切分操作工具类
源码编写实现:
package com.jack.mapper;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MyTokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {// 暂存每个传过来的词频计数,均为1,省掉重复申请空间 IntWritable:对Java Integer类的封装private final static IntWritable one = new IntWritable(1);// 暂存每个传过来的词的值,省掉重复申请空间private Text word = new Text();// 核心map方法的具体实现,逐个<key,value>对去处理public void map(Object key, Text value, Context context)throws IOException, InterruptedException {// 用每行的字符串值初始化StringTokenizerStringTokenizer itr = new StringTokenizer(value.toString());// 循环取得每个空白符分隔出来的每个元素while (itr.hasMoreTokens()) {// 将取得出的每个元素放到word Text对象中word.set(itr.nextToken());// 通过context对象,将map的输出逐个输出context.write(word, one);}}}
Reducer:是MapReduce计算框架中Reduce过程的封装
源码编写实现:
package com.jack.reducer;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;//reduce类,实现reduce函数public class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();//核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {//暂存每个key组中计算总和int sum = 0;//加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值for (IntWritable val : values) {//将key组中的每个词频数值sum到一起sum += val.get();}//将该key组sum完成的值放到result IntWritable中,使可以序列化输出result.set(sum);//将计算结果逐条输出context.write(key, result);}}
Configuration:与HDFS中的Configuration一致,负责参数的加载和传递
Job:作业,是对一轮MapReduce任务的抽象,即一个MapReduce的执行全过程的管理类
FileInputFormat:指定输入数据的工具类,用于指定任务的输入数据路径
FileOutputFormat:指定输出数据的工具类,用于指定任务的输出数据路径
源码编写实现:
package com.jack.driver;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.jack.mapper.MyTokenizerMapper;import com.jack.reducer.IntSumReducer;public class WordCount {// 启动mr的driver方法public static void main(String[] args) throws Exception {// 得到集群配置参数Configuration conf = new Configuration();// 设置到本次的job实例中Job job = Job.getInstance(conf, "jack_WordCount");// 指定本次执行的主类是WordCountjob.setJarByClass(WordCount.class);// 指定map类job.setMapperClass(MyTokenizerMapper.class);// 指定combiner类,要么不指定,如果指定,一般与reducer类相同job.setCombinerClass(IntSumReducer.class);// 指定reducer类job.setReducerClass(IntSumReducer.class);// 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 指定输入数据的路径FileInputFormat.addInputPath(job, new Path(args[0]));// 指定输出路径,并要求该输出路径一定是不存在的FileOutputFormat.setOutputPath(job, new Path(args[1]));// 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!System.exit(job.waitForCompletion(true) ? 0 : 1);}}


import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//启动mr的driver类public class WordCount {//map类,实现map函数public static class MyTokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {//暂存每个传过来的词频计数,均为1,省掉重复申请空间private final static IntWritable one = new IntWritable(1);//暂存每个传过来的词的值,省掉重复申请空间private Text word = new Text();//核心map方法的具体实现,逐个<key,value>对去处理public void map(Object key, Text value, Context context)throws IOException, InterruptedException {//用每行的字符串值初始化StringTokenizerStringTokenizer itr = new StringTokenizer(value.toString());//循环取得每个空白符分隔出来的每个元素while (itr.hasMoreTokens()) {//将取得出的每个元素放到word Text对象中word.set(itr.nextToken());//通过context对象,将map的输出逐个输出context.write(word, one);}}}//reduce类,实现reduce函数public static class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();//核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {//暂存每个key组中计算总和int sum = 0;//加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值for (IntWritable val : values) {//将key组中的每个词频数值sum到一起sum += val.get();}//将该key组sum完成的值放到result IntWritable中,使可以序列化输出result.set(sum);//将计算结果逐条输出context.write(key, result);}}//启动mr的driver方法public static void main(String[] args) throws Exception {//得到集群配置参数Configuration conf = new Configuration();//设置到本次的job实例中Job job = Job.getInstance(conf, "jack_WordCount");//指定本次执行的主类是WordCountjob.setJarByClass(WordCount.class);//指定map类job.setMapperClass(MyTokenizerMapper.class);//指定combiner类,要么不指定,如果指定,一般与reducer类相同job.setCombinerClass(IntSumReducer.class);//指定reducer类job.setReducerClass(IntSumReducer.class);//指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//指定输入数据的路径FileInputFormat.addInputPath(job, new Path(args[0]));//指定输出路径,并要求该输出路径一定是不存在的FileOutputFormat.setOutputPath(job, new Path(args[1]));//指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!System.exit(job.waitForCompletion(true) ? 0 : 1);}}


查看当前正在执行的job任务

终止(kill)一个任务的执行




查看一个job的日志

集群的构建完全选用价格便宜、易于扩展的低端商用服务器,而非价格昂贵不易扩展的商用服务
大规模数据处理和大规模数据存储的需要,讲求集群综合能力,而非单台机器处理能力,横向增加机器节点数据量
使用大量普通服务器,节点硬件和软件出错是常态
具备多种有效的错误检测和恢复机制,在某个计算节点失效后会自动转移到别的计算节点。某个任务节点失败后其他节点能够无缝接管失效节点的计算任务
当失效节点恢复后自动无缝加入集群,不需要管理员人工进行系统配置
采用代码/数据互定位的功能,计算和数据在同一个机器节点或者是同一个机架中,发挥数据本地化特点
可避免跨机器节点或是机架传输数据,提高运行效率
磁盘的顺序访问远比随机访问快得多,因此MapReduce设计为面向顺序式大规模数据的磁盘访问处理
利用集群中的大量数据存储节点同时访问数据,实现面向大数据集批处理的高吞吐量的并行处理
一个作业由若干个Map任务和Reduce任务构成,整个作业完成的时间取决于最慢的任务的完成时间。由于节点硬件、软件问题,某些任务可能运行很慢
采用推测执行机制,发现某个任务的运行速度远低于任务平均速度,会为慢的任务启动一个备份任务,同时运行。哪个先运行完,采用哪个结果。
可弹性的增加或减少集群计算节点来调节计算能力
计算的性能随着节点数的增加保持接近线性程度的增长
并行编程有很多困难,需要考虑多线程中复杂繁琐的细节,诸如分布式存储管理、数据分发、数据通信和同步、计算结果收集等细节问题。
MapReduce提供了一种抽象机制将程序员与系统层细节隔离开,程序员只需关注业务,其他具体执行交由框架处理即可。




