暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MapReduce的安装和运行操作

糟老头修炼记 2020-05-16
763


Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。Eclipse最初主要用来Java语言开发,通过安装不同的插件Eclipse可以支持不同的计算机语言,比如C++和Python等开发工具。Eclipse的本身只是一个框架平台,但是众多插件的支持使得Eclipse拥有其他功能相对固定的IDE软件很难具有的灵活性。


1.安装 Hadoop-Eclipse-Plugin

Eclipse是一个插件化的集成开发工具,要在 Eclipse 上编译和运行 MapReduce 程序,首先需要安装hadoop插件hadoop-eclipse-plugin。

两种方式进行hadoop插件的安装:

(1)命令行安装方式

在确保未启动Eclipse的前提下,将hadoop插件的jar包直接在tools文件夹中解压,然后,在解压后的目录下将 release 中的jar包复制到Eclipse安装目录下(/usr/local/eclipse/plugins),然后启动Eclipse即可。

    # 解压到/tools文件夹下
    unzip -qo tools/hadoop2x-eclipse-plugin-master.zip -d tools
    # 复制到 eclipse 安装目录的 plugins 目录下
    cp tools/hadoop2x-eclipse-plugin-master/release/hadoop-eclipse-plugin-2.6.0.jar usr/local/eclipse/plugins/

    (2)界面操作方式


    2.验证是否成功安装hadoop插件

    启动Eclipse,默认可以进入/root/workspace目录下的工作区

    启动Eclipse之后,依次打开“Window”——“Preference”菜单,在左侧菜单中如果发现“Hadoop Map/Reduce”,即证明安装成功


    3.单词统计wordcount实验准备:

    启动Hadoop

      su hadoop # 切换hadoop用户
      /usr/local/hadoop/sbin/start-dfs.sh # 启动hadoop

      配置环境变量

        vim ~/.bashrc# 打开环境变量配置文件
          export PATH=$PATH:/usr/local/hadoop/bin:/usr/local/hadoop/sbin
          # 配置PATH变量,在PATH变量中加入Hadoop的bin和sbin目录
            source ~/.bashrc# 使配置文件生效


            JAVA源代码

              import java.io.IOException;
              import java.util.Iterator;
              import java.util.StringTokenizer;
              import org.apache.hadoop.conf.Configuration;
              import org.apache.hadoop.fs.Path;
              import org.apache.hadoop.io.IntWritable;
              import org.apache.hadoop.io.Text;
              import org.apache.hadoop.mapreduce.Job;
              import org.apache.hadoop.mapreduce.Mapper;
              import org.apache.hadoop.mapreduce.Reducer;
              import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
              import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
              import org.apache.hadoop.util.GenericOptionsParser;
              public class WordCount {
              public WordCount() {
              }
              public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration();
              String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
              if(otherArgs.length < 2) {
              System.err.println("Usage: wordcount <in> [<in>...] <out>");
              System.exit(2);
              }
              Job job = Job.getInstance(conf, "word count");
              job.setJarByClass(WordCount.class);
              job.setMapperClass(WordCount.TokenizerMapper.class);
              job.setCombinerClass(WordCount.IntSumReducer.class);
              job.setReducerClass(WordCount.IntSumReducer.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              for(int i = 0; i < otherArgs.length - 1; ++i) {
              FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
              }
              FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
              System.exit(job.waitForCompletion(true)?0:1);
              }
              public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
              private IntWritable result = new IntWritable();
              public IntSumReducer() {
              }
              public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              int sum = 0;
              IntWritable val;
              for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
              val = (IntWritable)i$.next();
              }
              this.result.set(sum);
              context.write(key, this.result);
              }
              }
              public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
              private static final IntWritable one = new IntWritable(1);
              private Text word = new Text();
              public TokenizerMapper() {
              }
              public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              StringTokenizer itr = new StringTokenizer(value.toString());
              while(itr.hasMoreTokens()) {
              this.word.set(itr.nextToken());
              context.write(this.word, one);
              }
              }
              }
              }


              配置 Hadoop-Eclipse-Plugin:

              在Eclipse中依次打开“window”——“Open Perspective”菜单,选择“Other”,在弹出窗口中选择“Map-Reduce”,点击OK完成。此时,Eclipse会打开一个Map-Reduce的视图。

              建立与 Hadoop 集群的连接:点击 Eclipse软件右下角的 Map/Reduce Locations面板,在面板中单击右键或点击右下角小象图标,选择 New Hadoop Location进行创建:

              在弹出来的 General 选项面板中,General 的设置要与 Hadoop 的配置一致。伪分布式,填写 localhost即可。使用的Hadoop伪分布式配置,设置 fs.defaultFS 为hdfs://localhost:9000, 则DFS Master 的 Port 要改为 9000。Map/Reduce(V2) Master的 Port 用默认的即可,Location Name 随意填写。用户名务必使用hadoop。

              点击 finish,Map/Reduce Location 就创建好了。

              配置好后,点击左侧 Project Explorer 中的 DFS Location(点击三角形展开)就能直接查看 HDFS 中的文件列表了,双击可以查看内容,右键点击可以上传、下载、删除 HDFS中的文件

              如果无法查看,可右键点击 Location 尝试 Reconnect 或重启 Eclipse。


              (1)HDFS 中的内容变动后,Eclipse 不会同步刷新,需要右键点击 Project Explorer中的MapReduce Location或者下级文件夹如hadoop或input,选择 Refresh,才能看到变动后的文件。

              (2)如果找不到 Project Explorer,看“Window”-“Show View”菜单下能否解决问题。

              (3)如果在Eclipse中连接HDFS出现“failed on connection exception”,检查一下Hadoop服务是否已经启动。


              4.在 Eclipse 中操作 HDFS 中的文件

                su hadoop# 切换用户
                /usr/local/hadoop/sbin/start-dfs.sh# 准备本地测试文件
                cd home/hadoop


                vim local.txt
                vim text.txt
                # 在HDFS新建一个目录,将本地文件local.txt上传,并查看内容
                hdfs dfs -mkdir -p mydir
                hdfs dfs -put home/hadoop/local.txt mydir
                hdfs dfs -cat mydir/local.txt
                # 向hdfs中的local.txt文件追加内容
                hdfs dfs -appendToFile home/hadoop/text.txt mydir/local.txt
                # 查看hdfs中local.txt的内容
                hdfs dfs -cat mydir/local.txt
                # 将hdfs中的local.txt文件下载到本地
                hdfs dfs -get mydir/local.txt home/hadoop/localnew.txt
                # 然后,请自己输入命令,查看一下本地路径/home/hadoop下的新文件localnew.txt吧!
                # 将hdfs中的local.txt文件删除
                hdfs dfs -rm mydir/local.txt
                # 将hdfs中的mydir文件夹删除
                hdfs dfs -rmdir mydir

                5.Eclipse中的HDFS操作

                (1)查看文件

                 点击左侧 Project Explorer 中的 DFS Location(点击三角形展开)能直接查看 HDFS 中的文件列表,双击可以查看内容

                (2)上传文件

                选中mydir文件夹,右键选择“Upload files to DFS”,选择某个本地文件(如/home/hadoop下的某个文件),上传。


                若Eclipse报错,是因为当前桌面版系统的用户root对HDFS中的/user/hadoop/mydir文件夹没有操作权限。因此需要在命令行执行以下命令:

                  su hadoop
                  # hdfs中子文件的访问权限也是要单独指定的,因此如果我们要对mydir文件夹进行操作,就要单独指定它的权限
                  hadoop fs -chmod 777 user/hadoop/mydir
                  # 同时我们也要修改默认的/user/hadoop文件夹的权限
                  hadoop fs -chmod 777 user/hadoop
                  # 后面你或许还会遇到类似的权限错误,请根据提示修改hdfs权限,如/user/hadoop/input
                  hadoop fs -chmod 777 user/hadoop/input

                  然后再回到Eclipse执行上传操作即可。上传完成,即可刷新查看到新上传的文件。


                  (3)删除文件

                         选中mydir目录下的某个文件,右键Delete


                  (4)新建文件夹

                  选中mydir目录,右键选择“Create new directory”,新建一个文件夹


                  6.通过 Eclipse 运行 MapReduce

                  复制配置文件:

                  在运行 MapReduce程序前,需要将/usr/local/hadoop/etc/hadoop 中将有修改过的配置文件(如伪分布式需要core-site.xml 和 hdfs-site.xml),以及 log4j.properties 复制到 WordCount项目下的 src 文件夹(~/workspace/WordCount/src)中,执行下列命令(也可直接在文件系统中复制)

                    cp usr/local/hadoop/etc/hadoop/core-site.xml ~/workspace/WordCount/src
                    cp usr/local/hadoop/etc/hadoop/hdfs-site.xml ~/workspace/WordCount/src
                    cp usr/local/hadoop/etc/hadoop/log4j.properties ~/workspace/WordCount/src

                    复制完成后,右键点击 WordCount 选择 refresh进行刷新,就可以看到文件结构了


                    运行WordCount

                    右键点击刚创建的 WordCount.java,选择 Run As -> RunConfigurations之后会弹出窗口,在此处可以设置运行时的相关参数。

                    如果Java Application下面没有 WordCount菜单,那么需要先双击 Java Application。

                    切换到“Arguments” 栏,在 Program arguments 处填写hdfs的输入、输出路径,即在“Programm Arguments”中填写:hdfs://localhost:9000/user/hadoop/input hdfs://localhost:9000/user/hadoop/output。这是运行WordCount程序是的输入参数,也就是main函数要接受的输入参数

                    设定参数后,再次运行程序,可以看到运行成功的提示,刷新 DFS Location后也能看到输出的 output 文件夹,你可以在Eclipse中双击打开output文件夹中的文件,查看程序输出结果。

                    如果出现“output directory already exists”的错误,那么需要先将hdfs中的output文件夹删除再运行。


                    至此,你就可以使用 Eclipse 方便的进行 MapReduce程序的开发了。


                    7.在 Eclipse 中运行 MapReduce 程序会遇到的问题

                    在使用 Eclipse 运行 MapReduce 程序时,会读取 Hadoop-Eclipse-Plugin 的 Advancedparameters 作为 Hadoop运行参数,如果未进行修改,则默认的参数其实就是单机(非分布式)参数,因此程序运行时是读取本地目录而不是HDFS 目录,就会提示 Input 路径不存在。

                      Exception in thread “main”org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does notexist: file:/home/hadoop/workspace/WordCountProject/input

                       所以需要将配置文件复制到项目中的 src目录,来覆盖这些参数,让程序能够正确运行。

                      log4j 用于记录程序的输出日记,需要 log4j.properties这个配置文件,如果没有复制该文件到项目中,运行程序后在 Console面板中会出现警告提示:

                        log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
                        log4j:WARN Please initialize the log4j system properly.
                        log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html\#noconfig for more info.

                        虽然不影响程序的正确运行的,但程序运行时无法看到任何提示消息(只能看到出错信息)。


                        8.使用命令行编译打包运行自己的MapReduce程序

                        (1)将hadoop插件复制到Eclipse安装目录

                        Eclipse是一个插件化的集成开发环境,所有的插件只需要复制到plugins目录下,启动Eclipse即可自动完成安装。

                        确认在root用户登录状态下,在命令行执行以下命令:

                          unzip -qo tools/hadoop2x-eclipse-plugin-master.zip -d tools # 解压到/tools文件夹下
                          cp tools/hadoop2x-eclipse-plugin-master/release/hadoop-eclipse-plugin-2.6.0.jar /usr/local/eclipse/plugins/ # 复制到 eclipse 安装目录的 plugins 目录下

                          (2)切换hadoop用户,启动hadoop

                            su hadoop # 切换hadoop用户
                            /usr/local/hadoop/sbin/start-dfs.sh # 启动hadoop

                            (3)配置环境变量

                            需要配置四个环境变量,包括HADOOP_HOME、PATH、HADOOP_CLASSPATH、CLASSPATH等。只有正确定义了环境变量,才能保证程序的正常编译和执行。

                              # 打开环境变量配置文件
                              vim ~/.bashrc
                              # 配置HADOOP_HOME变量(将下面一行代码加入到bashrc文件中,可以直接复制到某个export语句后面)
                              export HADOOP_HOME=/usr/local/hadoop
                              # 配置PATH变量。在PATH变量中加入Hadoop的bin和sbin目录,这样就可以直接在命令行执行hadoop的各个命令,不再需要写完整的路径。
                              export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
                              # 配置HADOOP_CLASSPATH变量
                              export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.1.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.1.jar:$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar
                              # 配置CLASSPATH变量
                              export CLASSPATH=$HADOOP_CLASSPATH:$CLASSPATH
                                source ~/.bashrc#使变量生效。


                                编译、打包 Hadoop MapReduce 程序

                                (1)WordCount.java源文件(<上边>)

                                (2)编译WordCount.java源文件

                                 在当前目录(/home/hadoop)下,通过 javac 命令编译 WordCount.java ,执行下列命令进行编译:

                                  javac WordCount.java

                                  (3)将.class文件打包成jar包

                                  把 .class 文件打包成 jar,才能在 Hadoop 中运行:

                                    jar -cvf WordCount.jar ./WordCount*.class

                                    (4)测试运行

                                    打包完成后,测试运行。首先在当前文件夹下创建几个输入文件:

                                      # 新建一个本地文件夹input
                                      mkdir input
                                      # 新建一个文件file0,同时生成文件内容
                                      echo "echo of the rainbow" > ./input/file0
                                      # 新建一个文件file1,同时生成文件内容
                                      echo "the waiting game" > ./input/file1

                                      在上传之前,我们先执行下列命令检验一下/user/hadoop/下有没有input文件夹和output文件夹:

                                        hdfs dfs -ls
                                        hdfs dfs -ls /user/hadoop/ #可以显示定义hdfs的文件目录/user/hadoop
                                          #如果这两个文件夹存在,则执行下列命令进行删除:
                                          hdfs dfs -rm -r input #删除input文件夹
                                          hdfs dfs -rm -r output #删除output文件夹
                                            # 把本地文件夹input上传到伪分布式HDFS,HDFS中的文件夹也叫input
                                            hdfs dfs -put ./input input
                                              #运行:
                                              hadoop jar WordCount.jar WordCount input output
                                                查看output文件夹的输出:
                                                hdfs dfs -cat output/part-r-00000

                                                运行WordCount程序

                                                设置一下WordCount的运行参数,在WordCount.java文件上右键,依次点击“Run As”——“Run Configurations”

                                                选择左侧的“Java Application”,双击,在右侧窗口就会出现WordCount.java的运行参数设置窗口。

                                                修改Arguments标签,在“Programm Arguments”中填写:

                                                  hdfs://localhost:9000/user/hadoop/input hdfs://localhost:9000/user/hadoop/output

                                                  这是运行WordCount程序是的输入参数,也就是main函数要接受的输入参数。

                                                   点击“Run”,即可运行程序,并在控制台查看日志


                                                  9.MapReduce编程实践

                                                  (1)编程实现文件合并和去重操作

                                                  对于两个输入文件,即文件A和文件B,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C

                                                    文件A:
                                                    20150101 x
                                                    20150102 y
                                                    20150103 x
                                                    20150104 y
                                                    20150105 z
                                                    20150106 x
                                                      文件B:
                                                      20150101 y
                                                      20150102 y
                                                      20150103 x
                                                      20150104 z
                                                      20150105 y

                                                      java代码

                                                        import java.io.IOException;
                                                        import org.apache.hadoop.conf.Configuration;
                                                        import org.apache.hadoop.fs.Path;
                                                        import org.apache.hadoop.io.IntWritable;
                                                        import org.apache.hadoop.io.Text;
                                                        import org.apache.hadoop.mapreduce.Job;
                                                        import org.apache.hadoop.mapreduce.Mapper;
                                                        import org.apache.hadoop.mapreduce.Reducer;
                                                        import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
                                                        import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
                                                        import org.apache.hadoop.util.GenericOptionsParser;
                                                        public class Merge {
                                                        /**
                                                        * @param args
                                                        * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C
                                                        */
                                                        //重载map函数,直接将输入中的value复制到输出数据的key上
                                                        public static class Map extends Mapper<Object, Text, Text, Text>{
                                                        private static Text text = new Text();
                                                        public void map(Object key, Text value, Context context) throws
                                                        IOException,InterruptedException{
                                                        text = value;
                                                        context.write(text, new Text(""));
                                                        }
                                                        }
                                                        //重载reduce函数,直接将输入中的key复制到输出数据的key上
                                                        public static class Reduce extends Reducer<Text, Text, Text, Text>{
                                                        public void reduce(Text key, Iterable<Text> values, Context context ) throws
                                                        IOException,InterruptedException{
                                                        context.write(key, new Text(""));
                                                        }
                                                        }
                                                        public static void main(String[] args) throws Exception{
                                                        // TODO Auto-generated method stub
                                                        Configuration conf = new Configuration();
                                                        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
                                                        if (otherArgs.length != 2) {
                                                        System.err.println("Usage: wordcount <in> <out>");
                                                        System.exit(2);
                                                        }
                                                        Job job = Job.getInstance(conf,"Merge and duplicate removal");
                                                        job.setJarByClass(Merge.class);
                                                        job.setMapperClass(Map.class);
                                                        job.setCombinerClass(Reduce.class);
                                                        job.setReducerClass(Reduce.class);
                                                        job.setOutputKeyClass(Text.class);
                                                        job.setOutputValueClass(Text.class);
                                                        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                                                        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                                                        System.exit(job.waitForCompletion(true) ? 0 : 1);
                                                        }
                                                        }
                                                        文章转载自糟老头修炼记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                        评论