暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hadoop相关案例之——去重、排序、求平均值

码农然 2019-12-26
630

一、数据去重:

实现思路:数据去重的最终目标是让原始数据中出现此时超过一次的数据在输出文件中只出现一次。(利用Key不重复实现)
导入依赖文件:
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.8.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
    <version>2.8.0</version>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
    </dependency>


    1、Map阶段:
      //map将输入中的value复制到输出数据的key上 并直接输出
      public static class Map extends Mapper<Object,Text,Text,Text>{
      实现map函数
      @Override
      protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      context.write(value,new Text(""));
      }
      }

      2、Reduce阶段:
        public static class Reduce extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        context.write(key,new Text(""));
        }
        }


        3、Main方法:
          public static void main(String[] args) throws Exception{
              Configuration conf = new Configuration();
          Job job = new Job(conf, "Data Deduplication");
          job.setJarByClass(Dedup.class);
          设置Map、Combine和Reduce处理类
          job.setMapperClass(Map.class);
          job.setCombinerClass(Reduce.class);


          job.setReducerClass(Reduce.class);


          设置输出类型
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);
          设置输入和输出目录
          设置输入输出路径 注意input为输入目录 output为输出目录
          输入目录必须存在 输出目录一定不能存在
          FileInputFormat.addInputPath(job, new Path("hdfs://192.168.117.10:9000/input"));
          FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.117.10:9000/output"));
          System.exit(job.waitForCompletion(true) ? 0 : 1);
          }





          二、数据排序:

          实现思路:这个实例仅仅要求对输入数据进行排序,大家应该知道在MapReduce过程中就有排序,它是按照key值进行排序的,如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。

          1、Map阶段:
            public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
            private static IntWritable data = new IntWritable();
            //实现map函数
            public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
            String line = value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
            }
            }


            2、Reduce阶段:
              //reduce将输入中的key复制到输出数据的key上,
              //然后根据输入的value-list中元素的个数决定key的输出次数
              //用全局linenum来代表key的位次
              public static class Reduce extends
              Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
              private static IntWritable linenum = new IntWritable(1);
              //实现reduce函数
              public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
              throws IOException, InterruptedException {
              for (IntWritable val : values) {
              context.write(linenum, key);
              linenum = new IntWritable(linenum.get() + 1);
              }
              }
              }

              3、Main方法:
                public static void main(String[] args) throws Exception{
                Configuration conf = new Configuration();
                Job job = new Job(conf, "Data Sort");
                job.setJarByClass(Sort.class);
                //设置Map、Combine和Reduce处理类
                job.setMapperClass(Sort.Map.class);
                job.setReducerClass(Sort.Reduce.class);
                //设置输出类型
                job.setOutputKeyClass(IntWritable.class);
                    job.setOutputValueClass(IntWritable.class);
                //设置输入和输出目录
                //设置输入输出路径 注意input为输入目录 output为输出目录
                // 输入目录必须存在 输出目录一定不能存在
                FileInputFormat.addInputPath(job, new Path("hdfs://192.168.117.10:9000/input"));
                FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.117.10:9000/output"));
                System.exit(job.waitForCompletion(true) ? 0 : 1);
                }





                三、求数据平均值:

                实现思路:Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat分解过的数据集,其中InputFormat的作用是将数据集切割成小数据集InputSplit,每一个InputSlit将由一个Mapper负责处理。此外,InputFormat中还提供了一个RecordReader的实现,并将一个InputSplit解析成<key,value>对提供给了map函数。InputFormat的默认值是TextInputFormat,它针对文本文件,按行将文本切割成InputSlit,并用LineRecordReader将InputSplit解析成<key,value> 对,key是行在文本中的位置,value是文件中的一行。
                Map的结果会通过partion分发到Reducer,Reducer做完Reduce操作后,将通过以格式OutputFormat输出。
                Mapper最终处理的结果对<key,value>,会送到Reducer中进行合并,合并的时候,有相同key的键/值对则送到同一个Reducer上。Reducer是所有用户定制Reducer类地基础,它的输入是key和这个key对应的所有value的一个迭代器,同时还有Reducer的上下文。Reduce的结果由Reducer.Context的write方法输出到文件中。


                1、Map阶段:
                  public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{
                  @Override
                  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  //将姓名和成绩转换为String
                  String info=value.toString();
                  //将输入的数据首先按照空格进行分割
                  StringTokenizer stringTokenizer=new StringTokenizer(info);
                  while(stringTokenizer.hasMoreTokens()){
                  String stuName=stringTokenizer.nextToken();//学生姓名
                  String stuScore=stringTokenizer.nextToken();//学生成绩
                  //以学生姓名作为key,成绩作为value输出
                  context.write(new Text(stuName),new IntWritable(Integer.parseInt(stuScore)));
                  }
                  }
                  }


                  2、Reduce阶段:
                    public static class Reduce extends Reducer<Text,IntWritable,Text,FloatWritable>{
                    @Override
                    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                    int sumScore=0;//记录总分
                    int scoreCount=0;//记录科目数
                    Iterator<IntWritable> iterator = values.iterator();
                    while(iterator.hasNext()){
                    sumScore+=iterator.next().get();//累计总分
                    scoreCount++;//累计科目数
                    }
                    float avg=sumScore/scoreCount;
                    context.write(key,new FloatWritable(avg));
                    }
                    }


                    3、Main方法:
                      public static void main(String[] args) throws Exception{
                      Configuration conf = new Configuration();
                      Job job = new Job(conf, "Data AVG");
                          job.setJarByClass(AvgTest.class);
                          //设置Map和Reduce处理类
                          job.setMapperClass(AvgTest.Map.class);
                          job.setReducerClass(AvgTest.Reduce.class);
                          //设置输出类型
                          job.setOutputKeyClass(Text.class);
                          job.setOutputValueClass(IntWritable.class);
                      //设置输入和输出目录
                      //设置输入输出路径 注意input为输入目录 output为输出目录
                      // 输入目录必须存在 输出目录一定不能存在
                      FileInputFormat.addInputPath(job, new Path("hdfs://192.168.117.10:9000/input"));
                      FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.117.10:9000/output"));
                      System.exit(job.waitForCompletion(true) ? 0 : 1);
                      }





                      文章转载自码农然,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论