一、数据去重:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>2.8.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
//map将输入中的value复制到输出数据的key上 并直接输出public static class Map extends Mapper<Object,Text,Text,Text>{实现map函数@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {context.write(value,new Text(""));}}
public static class Reduce extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(key,new Text(""));}}
public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "Data Deduplication");job.setJarByClass(Dedup.class);设置Map、Combine和Reduce处理类job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);设置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);设置输入和输出目录设置输入输出路径 注意input为输入目录 output为输出目录输入目录必须存在 输出目录一定不能存在FileInputFormat.addInputPath(job, new Path("hdfs://192.168.117.10:9000/input"));FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.117.10:9000/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
二、数据排序:
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {private static IntWritable data = new IntWritable();//实现map函数public void map(Object key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();data.set(Integer.parseInt(line));context.write(data, new IntWritable(1));}}
//reduce将输入中的key复制到输出数据的key上,//然后根据输入的value-list中元素的个数决定key的输出次数//用全局linenum来代表key的位次public static class Reduce extendsReducer<IntWritable, IntWritable, IntWritable, IntWritable> {private static IntWritable linenum = new IntWritable(1);//实现reduce函数public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {for (IntWritable val : values) {context.write(linenum, key);linenum = new IntWritable(linenum.get() + 1);}}}
public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "Data Sort");job.setJarByClass(Sort.class);//设置Map、Combine和Reduce处理类job.setMapperClass(Sort.Map.class);job.setReducerClass(Sort.Reduce.class);//设置输出类型job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);//设置输入和输出目录//设置输入输出路径 注意input为输入目录 output为输出目录// 输入目录必须存在 输出目录一定不能存在FileInputFormat.addInputPath(job, new Path("hdfs://192.168.117.10:9000/input"));FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.117.10:9000/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
三、求数据平均值:
public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//将姓名和成绩转换为StringString info=value.toString();//将输入的数据首先按照空格进行分割StringTokenizer stringTokenizer=new StringTokenizer(info);while(stringTokenizer.hasMoreTokens()){String stuName=stringTokenizer.nextToken();//学生姓名String stuScore=stringTokenizer.nextToken();//学生成绩//以学生姓名作为key,成绩作为value输出context.write(new Text(stuName),new IntWritable(Integer.parseInt(stuScore)));}}}
public static class Reduce extends Reducer<Text,IntWritable,Text,FloatWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sumScore=0;//记录总分int scoreCount=0;//记录科目数Iterator<IntWritable> iterator = values.iterator();while(iterator.hasNext()){sumScore+=iterator.next().get();//累计总分scoreCount++;//累计科目数}float avg=sumScore/scoreCount;context.write(key,new FloatWritable(avg));}}
public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "Data AVG");job.setJarByClass(AvgTest.class);//设置Map和Reduce处理类job.setMapperClass(AvgTest.Map.class);job.setReducerClass(AvgTest.Reduce.class);//设置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入和输出目录//设置输入输出路径 注意input为输入目录 output为输出目录// 输入目录必须存在 输出目录一定不能存在FileInputFormat.addInputPath(job, new Path("hdfs://192.168.117.10:9000/input"));FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.117.10:9000/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
文章转载自码农然,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




