暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hadoop企业级优化与实战案例(下)

fuzzy maker 2019-06-08
103

7.3 辅助排序案例

  • 需求

    有如下订单数据,取出每个订单号中价格最大的商品,按订单号排序

    订单id商品id成交金额
    0000001Pdt_01222.8
    0000001Pdt_0625.8
    0000002Pdt_03522.8
    0000002Pdt_04122.4
    0000002Pdt_05722.4
    0000003Pdt_01222.8
    0000003Pdt_0233.8
  • orderPO

    @Data
    @Accessors(chain = true)
    public class OrderPO implements WritableComparable<OrderPO> {

       private int orderId;
       private double price;

       /**
        * orderId asc price desc
        * @param obj
        * @return
        */
       @Override
       public int compareTo(OrderPO obj) {
           if (this.orderId > obj.orderId) {
               return 1;
          } else if (this.orderId == obj.orderId) {
               return this.price > obj.price ? -1 : 1 ;
          } else {
               return -1;
          }
      }

       @Override
       public void write(DataOutput dataOutputthrows IOException {

           dataOutput.writeInt(orderId);
           dataOutput.writeDouble(price);
      }

       @Override
       public void readFields(DataInput dataInputthrows IOException {

           this.orderId = dataInput.readInt();
           this.price = dataInput.readDouble();
      }

       @Override
       public String toString() {
           return orderId + "\t" + price;
      }
    }
  • mapper

    public class OrderMapper extends Mapper<LongWritableTextOrderPONullWritable> {

       OrderPO order = new OrderPO();

       @Override
       protected void map(LongWritable keyText valueContext contextthrows IOExceptionInterruptedException {
           String[] words = value.toString().split("\t");
           Integer orderId = Integer.valueOf(words[0]);
           Double price = Double.valueOf(words[2]);
           order.setOrderId(orderId).setPrice(price);
           context.write(orderNullWritable.get());
      }
    }
  • reducer

    public class OrderReducer extends Reducer<OrderPONullWritableOrderPONullWritable> {

       @Override
       protected void reduce(OrderPO keyIterable<NullWritable> valuesContext contextthrows IOExceptionInterruptedException {
           context.write(keyNullWritable.get());
      }
    }
  • groupComparator

    public class GroupComparatorByOrderId extends WritableComparator {

       public GroupComparatorByOrderId() {
           super(OrderPO.classtrue);
      }

       @Override
       public int compare(WritableComparable aWritableComparable b) {
           OrderPO aPo = (OrderPOa;
           OrderPO bPo = (OrderPOb;
           return Integer.compare(aPo.getOrderId(), bPo.getOrderId());
      }
    }
  • driver

    public class OrderDriver {

       public static void main(String[] argsthrows IOExceptionClassNotFoundExceptionInterruptedException {

           args = new String[]{"/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/input/order.txt",
                   "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output"};

           Configuration conf = new Configuration();
           Job job = Job.getInstance(conf);

           job.setJarByClass(OrderDriver.class);
           job.setMapperClass(OrderMapper.class);
           job.setReducerClass(OrderReducer.class);

           job.setMapOutputKeyClass(OrderPO.class);
           job.setMapOutputValueClass(NullWritable.class);
           job.setOutputKeyClass(OrderPO.class);
           job.setOutputValueClass(NullWritable.class);

           // set groupComparator
           job.setGroupingComparatorClass(GroupComparatorByOrderId.class);
           
           FileInputFormat.setInputPaths(jobnew Path(args[0]));
           FileOutputFormat.setOutputPath(jobnew Path(args[1]));
           
           boolean b = job.waitForCompletion(true);
           System.exit(b ? 0 : 1);
      }
    }

7.4 自定义OutputFormat案例

  • 需求:过滤日志及自定义日志输出路径

    过滤输入的log日志中是否包含google

    (1)包含google的网站输出到:google.log

    (2)不包含google的网站输出到:other.log

  • 自定义FileOutputFormat

    public class FilterLogOutputFormat extends FileOutputFormat<TextNullWritable> {
       @Override
       public RecordWriter<TextNullWritable> getRecordWriter(TaskAttemptContext jobthrows IOExceptionInterruptedException {
           return new FilterLogRecordwriter(job);
      }
    }
  • 自定义RecordWriter

    public class FilterLogRecordwriter extends RecordWriter<TextNullWritable> {

       private static final Path GOOGLEOUTPUTPATH = new Path("/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output/google");
       private static final Path OTHERSOUTPUTPATH = new Path("/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output/others");

       FSDataOutputStream googleFos;
       FSDataOutputStream othersFos;

       public FilterLogRecordwriter(TaskAttemptContext jobthrows IOException {
           FileSystem fs = FileSystem.get(job.getConfiguration());
           googleFos = fs.create(GOOGLEOUTPUTPATH);
           othersFos = fs.create(OTHERSOUTPUTPATH);
      }

       @Override
       public void write(Text keyNullWritable valuethrows IOExceptionInterruptedException {
           String keyword = key.toString();
           if (keyword.contains("google")) {
               googleFos.write(keyword.getBytes());
               googleFos.write("\n".getBytes());
          } else {
               othersFos.write(keyword.getBytes());
               othersFos.write("\n".getBytes());
          }
      }
       @Override
       public void close(TaskAttemptContext contextthrows IOExceptionInterruptedException {
           IOUtils.closeStream(googleFos);
           IOUtils.closeStream(othersFos);
      }
    }
  • mapper

    public class FilterLogMapper extends Mapper<LongWritableTextTextNullWritable> {

       @Override
       protected void map(LongWritable keyText valueContext contextthrows IOExceptionInterruptedException {
           context.write(valueNullWritable.get());
      }
    }
  • driver

    public class FilterLogDriver {

       public static void main(String[] argsthrows IOExceptionClassNotFoundExceptionInterruptedException {

           args = new String[]{"/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/input/logFilter.txt",
                   "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/out"};
           Configuration conf = new Configuration();
           Job job = Job.getInstance(conf);

           job.setJarByClass(FilterLogDriver.class);
           job.setMapperClass(FilterLogMapper.class);

           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(NullWritable.class);
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(NullWritable.class);

           FileInputFormat.setInputPaths(jobnew Path(args[0]));
           // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
           // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
           FileOutputFormat.setOutputPath(jobnew Path(args[1]));

           job.setOutputFormatClass(FilterLogOutputFormat.class);
        // reducer并行度为0 
           job.setNumReduceTasks(0);

           boolean b = job.waitForCompletion(true);
           System.exit(b ? 0 : 1);
      }
    }

7.5 MapReduce中多表合并案例

需求一:reduce join

  • 需求:将商品信息表中数据根据商品pid合并到订单数据表中。

  • po

    @Data
    @Accessors(chain = true)
    public class OrderProductPO implements WritableComparable {

       /**
        * 给初始化默认值,避免序列化出错
        */
       private String orderId ;
       private String productId ;
       private Integer count ;
       private String productName ;
       /**
        * 标记位:
        * 0 order表
        * 1 product表
        */
       private Integer mark;


       @Override
       public int compareTo(Object obj) {
           OrderProductPO po = (OrderProductPOobj;
           return Integer.compare(Integer.parseInt(orderId), Integer.parseInt(po.getOrderId()));
      }

       @Override
       public void write(DataOutput dataOutputthrows IOException {
           dataOutput.writeUTF(orderId);
           dataOutput.writeUTF(productId);
           dataOutput.writeInt(count);
           dataOutput.writeUTF(productName);
           dataOutput.writeInt(mark);
      }

       @Override
       public void readFields(DataInput dataInputthrows IOException {
           this.orderId = dataInput.readUTF();
           this.productId = dataInput.readUTF();
           this.count = dataInput.readInt();
           this.productName = dataInput.readUTF();
           this.mark = dataInput.readInt();
      }

       @Override
       public String toString() {
           return orderId + "\t" + productName + "\t" + count;
      }
    }
  • mapper

    public class OrderProductMapper extends Mapper<LongWritableTextTextOrderProductPO> {

       private Integer mark;
       /**
        * 用来初始化,获取数据来源,打标记
        * @param context
        * @throws IOException
        * @throws InterruptedException
        */
       @Override
       protected void setup(Context contextthrows IOExceptionInterruptedException {
           FileSplit split = (FileSplitcontext.getInputSplit();
           Path path = split.getPath();
           String fileName = path.getName();
           mark = fileName.contains("t_order"? 0 : 1;
      }

       OrderProductPO orderProductPO = new OrderProductPO();
       Text k = new Text();

       @Override
       protected void map(LongWritable keyText valueContext contextthrows IOExceptionInterruptedException {

           String[] fields = value.toString().split("\t");
           if (Objects.equals(mark0)) {
               // order表
               String orderId = fields[0];
               String productId = fields[1];
               Integer count = Integer.valueOf(fields[2]);
               k.set(productId);
               orderProductPO.setOrderId(orderId).setProductId(productId)
                      .setCount(count).setMark(mark)
                      .setProductName("");
          } else {
               // product 表
               String productId = fields[0];
               String productName = fields[1];
               k.set(productId);
               orderProductPO.setProductId(productId).setProductName(productName).setMark(mark)
                      .setOrderId("").setCount(0);
          }

           context.write(korderProductPO);
      }
    }
  • reducer

    public class OrderProductReducer extends Reducer<TextOrderProductPOOrderProductPONullWritable> {

       private String productName;

       @Override
       protected void reduce(Text keyIterable<OrderProductPO> valuesContext contextthrows IOExceptionInterruptedException {

           // 注意:reducer的机制,key和value每次不会重新创建,因此,不可重复遍历,不能用引用!!!
           List<OrderProductPO> orderList = new ArrayList<>();

           // get productname
           for (OrderProductPO orderProductPO : values) {
               Integer mark = orderProductPO.getMark();
               if (Objects.equals(mark1)) {
                   // from product table
                   productName = orderProductPO.getProductName();
              } else {
                   OrderProductPO tmp = new OrderProductPO();
                   BeanUtils.copyProperties(orderProductPOtmp);
                   orderList.add(tmp);
              }
          }

           for (OrderProductPO orderProductPO : orderList) {
               orderProductPO.setProductName(productName);
               context.write(orderProductPONullWritable.get());
          }

           productName = "";

      }
    }


  • Driver

    public class OrderProductDriver {
       public static void main(String[] argsthrows IOExceptionClassNotFoundExceptionInterruptedException {
           args = new String[]{"/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/reducejoin",
                   "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output"};
           Configuration conf = new Configuration();
           Job job = Job.getInstance(conf);

           job.setJarByClass(OrderProductDriver.class);
           job.setMapperClass(OrderProductMapper.class);
           job.setReducerClass(OrderProductReducer.class);

           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(OrderProductPO.class);
           job.setOutputKeyClass(OrderProductPO.class);
           job.setOutputValueClass(NullWritable.class);

           FileInputFormat.setInputPaths(jobnew Path(args[0]));
           FileOutputFormat.setOutputPath(jobnew Path(args[1]));

           boolean b = job.waitForCompletion(true);
           System.exit(b ? 0 : 1);
      }
    }

需求二:map join

  • po

    @Data
    @Accessors(chain = true)
    public class OrderProductPO implements WritableComparable {

       /**
        * 给初始化默认值,避免序列化出错
        */
       private String orderId ;
       private String productId ;
       private Integer count ;
       private String productName ;


       @Override
       public int compareTo(Object obj) {
           OrderProductPO po = (OrderProductPOobj;
           return Integer.compare(Integer.parseInt(orderId), Integer.parseInt(po.getOrderId()));
      }

       @Override
       public void write(DataOutput dataOutputthrows IOException {
           dataOutput.writeUTF(orderId);
           dataOutput.writeUTF(productId);
           dataOutput.writeInt(count);
           dataOutput.writeUTF(productName);
      }

       @Override
       public void readFields(DataInput dataInputthrows IOException {
           this.orderId = dataInput.readUTF();
           this.productId = dataInput.readUTF();
           this.count = dataInput.readInt();
           this.productName = dataInput.readUTF();
      }

       @Override
       public String toString() {
           return orderId + "\t" + productName + "\t" + count;
      }
    }
  • mapper

    public class OrderProductMapper extends Mapper<LongWritableTextOrderProductPONullWritable> {

       Map<StringString> pdMap = new HashMap<>();
       /**
        * 用来初始化,获取数据来源,打标记
        * @param context
        * @throws IOException
        * @throws InterruptedException
        */
       @Override
       protected void setup(Context contextthrows IOException {

           // 获取cachefile 的路径
           URI[] cacheFiles = context.getCacheFiles();
           String cacheFilePath = cacheFiles[0].getPath();

           InputStreamReader isr = new InputStreamReader(new FileInputStream(cacheFilePath));
           BufferedReader reader = new BufferedReader(isr);
           String line;
           while(StringUtils.isNotEmpty(line = reader.readLine())){
               String[] fields = line.split("\t");
               pdMap.put(fields[0], fields[1]);
          }
           reader.close();
      }

       OrderProductPO orderProductPO = new OrderProductPO();

       @Override
       protected void map(LongWritable keyText valueContext contextthrows IOExceptionInterruptedException {

           String[] fields = value.toString().split("\t");
           String orderId = fields[0];
           String productId = fields[1];
           Integer count = Integer.valueOf(fields[2]);
           String productName = pdMap.get(productId);

           orderProductPO.setOrderId(orderId).setProductId(productId)
                  .setCount(count).setProductName(productName);

           context.write(orderProductPONullWritable.get());
      }
    }
  • driver

    public class OrderProductDriver {
       public static void main(String[] argsthrows IOExceptionClassNotFoundExceptionInterruptedExceptionURISyntaxException {
           args = new String[]{"/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/mapjoin/t_order.txt",
                   "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output"};
           Configuration conf = new Configuration();
           Job job = Job.getInstance(conf);

           job.setJarByClass(OrderProductDriver.class);
           job.setMapperClass(OrderProductMapper.class);

           job.setMapOutputKeyClass(OrderProductPO.class);
           job.setMapOutputValueClass(NullWritable.class);
           job.setOutputKeyClass(OrderProductPO.class);
           job.setOutputValueClass(NullWritable.class);

           FileInputFormat.setInputPaths(jobnew Path(args[0]));
           FileOutputFormat.setOutputPath(jobnew Path(args[1]));

           job.setNumReduceTasks(0);
           job.setCacheFiles(new URI[]{new URI("mapreduce/reducejoin/t_product.txt")});

           boolean b = job.waitForCompletion(true);
           System.exit(b ? 0 : 1);
      }
    }

7.6 日志清洗(ETL)

  • 需求:去除日志中字段长度小于等于11的日志,使用计数器打印去除的个数,保留的个数

  • mapper

    public class WebMapper extends Mapper<LongWritableTextTextNullWritable> {

       private static final Long MINFIELDSIZE = 11L;

       @Override
       protected void map(LongWritable keyText valueContext contextthrows IOExceptionInterruptedException {
           String[] fields = value.toString().split(" ");
           if (fields.length > MINFIELDSIZE) {
               context.getCounter("map""success").increment(1L);
               context.write(valueNullWritable.get());
          } else {
               context.getCounter("map""fail").increment(1L);
          }
      }
    }
  • driver

    public class WebDriver {

       public static void main(String[] argsthrows IOExceptionClassNotFoundExceptionInterruptedException {
           args = new String[] {"/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/input/web.log",
                   "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output"};
           Configuration conf = new Configuration();
           Job job = Job.getInstance(conf);

           job.setJarByClass(WebDriver.class);
           job.setMapperClass(WebMapper.class);

           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(NullWritable.class);
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(NullWritable.class);

           job.setNumReduceTasks(0);

           FileInputFormat.setInputPaths(jobnew Path(args[0]));
           FileOutputFormat.setOutputPath(jobnew Path(args[1]));

           boolean b = job.waitForCompletion(true);
           System.exit(b ? 0 : 1);
      }
    }

7.7 压缩/解压缩案例

需求一:数据流的压缩和解压缩

用流的方式压缩解压缩一个文件,gzip和bzip2格式

  • 测试代码:压缩

    @Test
    public void testCompressWithStream() throws Exception {
       String source = "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/input/input.txt";
       String dest = "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output/";
       String method1 = "org.apache.hadoop.io.compress.GzipCodec";
       String method2 = "org.apache.hadoop.io.compress.BZip2Codec";
       compress(sourcedestmethod1);
       compress(sourcedestmethod2);
    }

    private void compress(String sourceString destString methodthrows IOExceptionClassNotFoundException {

       // create file inputstream
       FileInputStream fis = new FileInputStream(source);
       // create compressioncodec with reflectionUtils
       Class<?> methodClazz = Class.forName(method);
       CompressionCodec compressionCodec = (CompressionCodecReflectionUtils.newInstance(methodClazznew Configuration());
       // get filename and outputstream
       String fileName = source.substring(source.lastIndexOf("/"));
       FileOutputStream fos = new FileOutputStream(dest + fileName + compressionCodec.getDefaultExtension());
       // wrapper output stream
       CompressionOutputStream outputStream = compressionCodec.createOutputStream(fos);
       // stream flush
       IOUtils.copyBytes(fisoutputStream1024 * 1024 * 32);
       // close stream
       IOUtils.closeStream(outputStream);
       IOUtils.closeStream(fos);
       IOUtils.closeStream(fis);
    }
  • 测试代码:解压缩

    @Test
    public void testDeCompressWithStream() throws Exception{
       String source1 = "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/input/input.txt.gz";
       String source2 = "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/input/input.txt.bz2";
       String dest1 = "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output1/";
       String dest2 = "/Users/zhukaishengy/IdeaProjects/hadoop-parent/mapreduce/output2/";
       String method1 = "org.apache.hadoop.io.compress.GzipCodec";
       String method2 = "org.apache.hadoop.io.compress.BZip2Codec";
       deCompress(source1dest1method1);
       deCompress(source2dest2method2);
    }

    private void deCompress(String sourceString destString methodthrows Exception{
       // create file inputstream
       FileInputStream fis = new FileInputStream(source);
       // get filename and outputstream
       String fileName = source.substring(source.lastIndexOf("/"), source.lastIndexOf("."));
       FileOutputStream fos = new FileOutputStream(dest + fileName);
       // create compressioncodec with reflectionUtils
       Class<?> methodClazz = Class.forName(method);
       CompressionCodec compressionCodec = (CompressionCodecReflectionUtils.newInstance(methodClazznew Configuration());
       // wrapper input stream
       CompressionInputStream inputStream = compressionCodec.createInputStream(fis);
       // stream flush
       IOUtils.copyBytes(inputStreamfos1024 * 1024 * 32);
       // close stream
       IOUtils.closeStream(fos);
       IOUtils.closeStream(inputStream);
       IOUtils.closeStream(fis);
    }


需求二:Map输出端采用压缩

  • driver部分代码

    // 开启map端输出压缩
    configuration.setBoolean("mapreduce.map.output.compress"true);
    // 设置map端输出压缩方式
    configuration.setClass("mapreduce.map.output.compress.codec"BZip2Codec.classCompressionCodec.class);


需求三:Reduce输出端采用压缩

  • driver部分代码

    // 设置reduce端输出压缩开启
    FileOutputFormat.setCompressOutput(jobtrue);

    // 设置压缩的方式
    FileOutputFormat.setOutputCompressorClass(jobBZip2Codec.class); 
    //   FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
    //   FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);


7.8 倒排索引(多job串联)

  • 需求:多个文本建立倒排索引

  • 预计输出结果

    hahaa.txt-->2b.txt-->2c.txt-->2
    huhua.txt-->5b.txt-->5c.txt-->5
    lilia.txt-->1b.txt-->1c.txt-->1
    rlc.txt-->1b.txt-->1a.txt-->1
    xihaa.txt-->1b.txt-->1c.txt-->1
    xixia.txt-->1b.txt-->1c.txt-->1
    zksb.txt-->1c.txt-->1a.txt-->1
  • 第一个mapper

    public class FirstIndexMapper extends Mapper<LongWritableTextTextLongWritable> {

       private String fileName;
       private Text k = new Text();
       private LongWritable v = new LongWritable(1);

       @Override
       protected void setup(Context context) {
           FileSplit split = (FileSplitcontext.getInputSplit();
           fileName = split.getPath().getName();
      }

       @Override
       protected void map(LongWritable keyText valueContext context) {

           String[] words = value.toString().split(" ");
           Arrays.stream(words).forEach(word -> {
               String sk = word + "-->" + fileName;
               k.set(sk);
               try {
                   context.write(kv);
              } catch (Exception e) {
                   e.printStackTrace();
              }
          });

      }
    }
  • 第一个reducer

    public class FirstIndexReducer extends Reducer<TextLongWritableTextLongWritable> {

       private LongWritable v = new LongWritable();

       @Override
       protected void reduce(Text keyIterable<LongWritable> valuesContext contextthrows IOExceptionInterruptedException {
           Long count = 0L;
           for (LongWritable ignore : values) {
               count ++ ;
          }
           v.set(count);
           context.write(keyv);
      }
    }
  • 第二个mapper

    public class SecondIndexMapper extends Mapper<LongWritableTextTextText> {

       private Text k = new Text();
       private Text v = new Text();

       @Override
       protected void map(LongWritable keyText valueContext contextthrows IOExceptionInterruptedException {
           
           String[] words = value.toString().split("-->");
           String sk = words[0];
           String source = words[1];
           String sv = source.replace("\t""-->");
           k.set(sk);
           v.set(sv);
           context.write(k ,v);
      }
    }
  • 第二个reducer

    public class SecondIndexReducer extends Reducer<TextTextTextNullWritable> {

       private Text k = new Text();

       @Override
       protected void reduce(Text keyIterable<Text> valuesContext contextthrows IOExceptionInterruptedException {

           StringBuffer sb = new StringBuffer(key.toString());
           sb.append("\t");
           values.forEach(value -> {
               String sv = value.toString();
               sb.append(sv).append("\t");
          });
           k.set(sb.toString());
           context.write(kNullWritable.get());
      }
    }

7.9 寻找微博互相关注的两个人的共同好友

  • 需求:已知每个人的关注列表,找出所有互相关注的人和他们的共同好友,假设数据量非常大,切分为多个split。

  • 分析

    • 两个mr

    • 第一个:列举出每个人被关注列表

    • 第二个:组合被关注列表,汇总

  • 第一个mr

    • mapper

      public class WeiboReverseMapper extends Mapper<TextTextTextText> {

         Text k = new Text();
         @Override
         protected void map(Text keyText valueContext context) {

             String[] words2 = value.toString().split(",");
             Arrays.stream(words2).forEach(sk -> {
                 k.set(sk);
                 try {
                     context.write(k ,key);
                } catch (Exception e) {
                     e.printStackTrace();
                }
            });
        }
      }
    • reducer

      public class WeiboReverseReducer extends Reducer<TextTextTextNullWritable> {

         Text k = new Text();
         @Override
         protected void reduce(Text keyIterable<Text> valuesContext contextthrows IOExceptionInterruptedException {

             StringBuffer sb = new StringBuffer(key.toString());
             sb.append(":");
             for (Text starer : values) {
                 sb.append(starer.toString()).append("\t");
            }
             String sk = sb.substring(0sb.lastIndexOf("\t"));
             k.set(sk);
             context.write(kNullWritable.get());
        }
      }
    • 输出

      A:IKOGFBHDC
      B:FEAJ
      C:KEHGFBA
      D:AEGCHFLK
      E:LMFBDGAH
      F:ADGCLM
      G:M
      H:O
      I:OC
      J:O
      K:B
      L:ED
      M:FE
      O:FHAJI
  • 第二个mr

    • mapper

      public class WeiboMapper extends Mapper<TextTextTextText> {

         Text k = new Text();
         WeiboService weiboService = new WeiboService();
         @Override
         protected void map(Text keyText valueContext context) {

             String[] starers = value.toString().split("\t");
             if (starers.length <= 1) {
                 // 只有一个人关注她
                 return;
            }
             List<String> kList = weiboService.splitStarArray(starers);
             kList.forEachsk -> {
                 k.set(sk);
                 try {
                     context.write(kkey);
                } catch (IOException e) {
                     e.printStackTrace();
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
            });
        }
      }
    • reducer

      public class WeiboReducer extends Reducer<TextTextTextNullWritable> {

         Text k = new Text();
         @Override
         protected void reduce(Text keyIterable<Text> valuesContext contextthrows IOExceptionInterruptedException {
             StringBuffer sb = new StringBuffer(key.toString());
             sb.append("-->");
             for (Text value : values) {
                 sb.append(value).append(",");
            }
             String sk = sb.substring(0sb.lastIndexOf(","));
             k.set(sk);
             context.write(kNullWritable.get());
        }
      }
    • 工具类

      @Slf4j
      public class WeiboService {

         /**
          * 将数组排序,返回所有不重复的两个组合
          * @param stars
          * @return
          */
         public List<String> splitStarArray(String[] stars){
             // sort by alpha
             Arrays.sort(stars);
             // calcul capacity
             int cap = (int) (this.seldom(stars.length2));
             List<String> result = new ArrayList<>(cap);
             for (int i = 0i < stars.length - 1i++) {
                 for (int j = i + 1j < stars.lengthj++) {
                     String friendOne = stars[i];
                     String friendTwo = stars[j];
                     String resultItem = friendOne + "@" + friendTwo;
                     result.add(resultItem);
                }
            }
             return result;
        }

         /**
          * 实现n阶阶乘
          * @param n
          * @return
          */
         public long fact(int n){
            try {
                if (n <= 1) {
                    return 1;
                }
                long preFact = n * fact(n -1);
                if (preFact > Integer.MAX_VALUE) {
                    log.error("计算的乘阶数过高!!!");
                    return Integer.MAX_VALUE;
                }
                return preFact;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return 0L;
        }

         /**
          * 实现Cmn
          * @return
          */
         public long seldom(int mint n){
             return fact(m/ fact(m - n/ fact(n);
        }
      }
    • 输出

      A@B-->E,C
      A@C-->D,F
      A@D-->E,F
      A@E-->D,B,C
      A@F-->O,B,C,D,E
      A@G-->F,E,C,D
      A@H-->E,C,D,O
      A@I-->O
      A@J-->O,B
      A@K-->D,C
      A@L-->F,E,D
      A@M-->E,F
      B@C-->A
      B@D-->A,E
      B@E-->C
      B@F-->E,A,C
      B@G-->C,E,A
      B@H-->A,E,C
      B@I-->A
      B@K-->C,A
      B@L-->E
      B@M-->E
      B@O-->A
      C@D-->A,F
      C@E-->D
      C@F-->D,A
      C@G-->D,F,A
      C@H-->D,A
      C@I-->A
      C@K-->A,D
      C@L-->D,F
      C@M-->F
      C@O-->I,A
      D@E-->L
      D@F-->A,E
      D@G-->E,A,F
      D@H-->A,E
      D@I-->A
      D@K-->A
      D@L-->E,F
      D@M-->F,E
      D@O-->A
      E@F-->D,M,C,B
      E@G-->C,D
      E@H-->C,D
      E@J-->B
      E@K-->C,D
      E@L-->D
      F@G-->D,C,A,E
      F@H-->A,D,O,E,C
      F@I-->O,A
      F@J-->B,O
      F@K-->D,C,A
      F@L-->E,D
      F@M-->E
      F@O-->A
      G@H-->D,C,E,A
      G@I-->A
      G@K-->D,A,C
      G@L-->D,F,E
      G@M-->E,F
      G@O-->A
      H@I-->O,A
      H@J-->O
      H@K-->A,C,D
      H@L-->D,E
      H@M-->E
      H@O-->A
      I@J-->O
      I@K-->A
      I@O-->A
      K@L-->D
      K@O-->A
      L@M-->E,F

      注:案例演示代码地址:https://github.com/ZhuKaishengy/hadoop-parent.git


文章转载自fuzzy maker,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论