hadoop jar example.jar WordCount /file/input /file/output
elif [ "$COMMAND" = "jar" ] ; thenCLASS=org.apache.hadoop.util.RunJar
public static void main(String[] args) throws Throwable {//构建一个RunJar对象并执行run方法new RunJar().run(args);}
获取 jar 文件,即 example.jar 拿到 jar 文件中的 MainClass 名称,即 WordCount.class 解压 jar 文件 通过反射拿到 MainClass 实例对象 拿到 main 方法的实例 通过反射调用 main 方法,也就是我们编写的应用程序的 mian 方法 在应用程序的 main 方法中,通过 Job.waitForCompletion() 等待应用程序执行完毕
public void run(String[] args) throws Throwable {String usage = "RunJar jarFile [mainClass] args...";...//获取到Jar文件,即提交的example.jar文件JarFile jarFile;try {jarFile = new JarFile(fileName);} catch(IOException io) {throw new IOException("Error opening job jar: " + fileName).initCause(io);}//拿到Jar文件的Main Class,如WordCount.classManifest manifest = jarFile.getManifest();if (manifest != null) {mainClassName = manifest.getMainAttributes().getValue("Main-Class");}jarFile.close();...mainClassName = mainClassName.replaceAll("/", ".");File tmpDir = new File(System.getProperty("java.io.tmpdir"));ensureDirectory(tmpDir);//声明一个临时文件final File workDir;try {workDir = File.createTempFile("hadoop-unjar", "", tmpDir);} catch (IOException ioe) {...}...//解压Jar文件unJar(file, workDir);ClassLoader loader = createClassLoader(file, workDir);Thread.currentThread().setContextClassLoader(loader);//通过反射拿到MainClass实例Class<?> mainClass = Class.forName(mainClassName, true, loader);//拿到main方法的实例Method main = mainClass.getMethod("main", new Class[] {Array.newInstance(String.class, 0).getClass()});String[] newArgs = Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]);try {//通过反射调用main方法main.invoke(null, new Object[] { newArgs });} catch (InvocationTargetException e) {throw e.getTargetException();}/*** TODO 到此为止,跳转到Driver的main方法,也就是自己编写的应用程序的main方法* 最后调用Job.waitForCompletion等待应用程序执行完毕*/}
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {//TODO 提交任务submit();}//打印执行的进度信息...return isSuccessful();}
设置当前Job的状态为DEFINE 设置启用NewAPI,2.x之后属于NewAPI 连接Yarn集群,获取到ResourceManager的代理对象 获取一个作业提交器,将作业提交到 Yarn 集群 提交之后,设置MR的状态为RUNNING
public void submit()throws IOException, InterruptedException, ClassNotFoundException {//1.设置当前Job的状态为DEFINEensureState(JobState.DEFINE);//2.设置启用NewAPI,2.x之后属于NewAPIsetUseNewAPI();//3.获取提交客户端,连接Yarn集群,获取到ResourceManager的代理对象connect();//4.获取一个提交器//这里的cluster就是上面初始化的Cluster对象//cluster.getFileSystem() ==> HDFS//cluster.getClient() ==> YARNRunnerfinal JobSubmitter submitter =getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException,ClassNotFoundException {//调用提交器的submitJobInternal方法执行Job的提交return submitter.submitJobInternal(Job.this, cluster);}});//5.提交之后,设置MR的状态为RUNNINGstate = JobState.RUNNING;LOG.info("The url to track the job: " + getTrackingURL());}
第3步:连接Yarn集群,获取到ResourceManager的代理对象
Job 的内部有一个 Cluster 类型的 cluster 成员变量
Cluster 的内部有一个 YARNRunner 类型的(ClientProtocol实现类) client 的成员变量
YARNRunner 的内部有一个 ResourceMgrDelegate 类型的 resMgrDelegate 成员变量
ResourceMgrDelegate 的内部有一个 YarnClientImpl 类型的(YarnClient子类) client 成员变量
YarnClientImpl 的内部有一个 ApplicationClientProtocol 类型的 rmClient 成员变量
a .在 connect() 方法中初始化了 Cluster 对象
private synchronized void connect()throws IOException, InterruptedException, ClassNotFoundException {if (cluster == null) {cluster =ugi.doAs(new PrivilegedExceptionAction<Cluster>() {public Cluster run()throws IOException, InterruptedException,ClassNotFoundException {//初始化了Cluster对象return new Cluster(getConfiguration());}});}}
本地模式:LocalJobRunner
Yarn模式:YARNRunner
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)throws IOException {synchronized (frameworkLoader) {for (ClientProtocolProvider provider : frameworkLoader) {LOG.debug("Trying ClientProtocolProvider : "+ provider.getClass().getName());ClientProtocol clientProtocol = null;try {if (jobTrackAddr == null) {//local模式//provider=LocalClientProtocolProvider//clientProtocol=LocalJobRunnerclientProtocol = provider.create(conf);} else {//Yarn模式//provider=YarnClientProtocolProvider//clientProtocal=YARNRunnerclientProtocol = provider.create(jobTrackAddr, conf);}...}
public YARNRunner(Configuration conf) {this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));}
public ResourceMgrDelegate(YarnConfiguration conf) {super(ResourceMgrDelegate.class.getName());this.conf = conf;//创建一个YarnClient,实现类是YarnClientImplthis.client = YarnClient.createYarnClient();init(conf);start();}
//ResourceManager的代理对象protected ApplicationClientProtocol rmClient;
第4步:获取一个作业提交器,将作业提交到 Yarn 集群
生成或验证各种路径,将作业所需的资源进行上传 对输入数据源进行逻辑切片,并设置Map Task的数量 进行作业的提交
//验证作业的输出路径是否存在,如果存在会报错,//正常情况是我们进行了配置,但是该路径在作业提交时是不存在的checkSpecs(job);//添加应用框架的路径到分布式缓存中//DistributedCache:把应用的一些资源添加到分布式缓存中,那么程序在执行的时候,//无论有多少个节点启动任务,这些节点都会自动把分布式缓存中的各种信息//(小数据文件,配置信息...)同步到节点本地addMRFrameworkToDistributedCache(conf);//通过静态方法getStagingDir()获取作业执行时相关资源的存放路径//默认是/tmp/hadoop-yarn/staging/提交作业用户名{$user}/.stagingPath jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//TODO 记录作业提交的主机IP,主机名,并设置配置信息if (ip != null) {submitHostAddress = ip.getHostAddress();submitHostName = ip.getHostName();conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);}//生成JobIDJobID jobId = submitClient.getNewJobID();job.setJobID(jobId);//构造提交作业的路径,jobStagingArea 后面拼接 jobID//当提交一个 Job时,YARN的客户端会把该Job的一切要用的资源初始化并且存储在HDFS//的工作目录中,以后哪个节点要执行 Task,就从HDFS 目录中拉取资源文件//主要包括以下三类资源://1.xxx.jar//2.job.xml//3.shell启动命令Path submitJobDir = new Path(jobStagingArea, jobId.toString());try {//将jar文件和配置文件上传到上面获取的资源的提交目录 submitJobDircopyAndConfigureFiles(job, submitJobDir);//上传完成后获取完整的配置文件所在路径,也就是job.xml的路径Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);...
第二部分:对输入数据源进行逻辑切片,并设置Map Task的数量
...//获取逻辑切片的数量int maps = writeSplits(job, submitJobDir);//配置需要启动的 Map Task 的个数conf.setInt(MRJobConfig.NUM_MAPS, maps);...
private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();//TODO 根据配置的InputFormat类,通过反射创建对应的实例对象// 默认为TextInputFormat,是FileInputFormat的子类InputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//TODO 核心方法,InputSplit就是切片对象,getSplits() 方法返回切片对象的集合// 这里的getSplits()在FileInputFormat中实现,TextInputFormat并未进行重写List<InputSplit> splits = input.getSplits(job);//将集合转为数组T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);//对数组中的元素根据切片大小进行排序Arrays.sort(array, new SplitComparator());//TODO 将逻辑切片信息写入Job提交的HDFS目录中JobSplitWriter.createSplitFiles(jobSubmitDir, conf,jobSubmitDir.getFileSystem(conf), array);//返回切片数量return array.length;}
public List<InputSplit> getSplits(JobContext job) throws IOException {...//初始化切片对象的集合容器List<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);//遍历输入文件for (FileStatus file: files) {Path path = file.getPath();//拿到文件的大小long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}//TODO 判断文件是否可以进行切分// 如果可以进行切分if (isSplitable(job, path)) {//获取配置的块大小long blockSize = file.getBlockSize();//计算逻辑切片的大小long splitSize = computeSplitSize(blockSize, minSize, maxSize);//bytesRemaining => 文件剩余大小long bytesRemaining = length;//TODO 判断文件的剩余大小是否大于切片大小的 1.1 倍while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//进行切片splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));//更新剩余文件大小bytesRemaining -= splitSize;}//TODO 如果剩余大小小于切片大小的 1.1 倍,且剩余大小不为0,那么将剩余的文件作为一个逻辑切片if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}//TODO 如果无法切分,将文件整体作为一个逻辑切片} else { // not splitablesplits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}//如果文件大小为0,不需要进行切片,添加一个空的逻辑切片} else {//Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}...}return splits;}
遍历输入的所有文件,获取文件大小 如果文件大小为 0 ,则不需要进行切片,直接添加一个空的逻辑切片 如果文件大小不为 0,判断该文件是否可以进行切分,如果不可以切分则将整体文件作为一个逻辑切片 如果文件可以进行切分,首先计算逻辑切片的大小,然后对文件进行切片,并更新剩余文件大小 如果剩余文件大小大于逻辑切片大小的 1.1 倍,则一直进行切分,直到该条件不满足位置 如果剩余文件不满足 1.1 倍逻辑切片大小,且剩余文件大小不为0,则将剩余文件作为一个逻辑切片 返回逻辑切片的集合,这里的逻辑切片用 InputSplit 对象表示
//将解析该job生成的配置文件,写入到HDFS上writeConf(conf, submitJobFile);
第三部分:进行作业的提交
//这里的 submitClient 为 YARNRunner 实例status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {...try {//TODO 提交应用并获取 ApplicationIdApplicationId applicationId =resMgrDelegate.submitApplication(appContext);...}...}
文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




