暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

HDFS源码 | NameNode核心启动流程

伦少的博客 2022-11-13
319

前言

hadoop源码版本,选择2.7版本,因为该版本覆盖面广,含有hadoop大部分核心功能。整个hadoop项目,有一两百万行代码,功能点非持多,但是不必每个功能点每行代码都去了解,主要是看核心架构和核心功能点。通过阅读源码,以达到以下目的:

  1. 通过阅读核心源码,从本质上来理解hadoop的架构和功能。

  2. 帮助定位排查问题,正确理解配置项,能进行针对性的优化配置。

  3. 学习优秀开源项目的代码架构思想,借鉴优秀的代码模块。

  4. 对不可避免的bug和性能点,进行二次开发和代码优化。

hdfs源码的阅读,从main方法主线开始到最后启动完成,启动流程主线相当于纲领,主线上的核心功能会在其他章节依次展开,避免主线和分支杂糅一起,这样整个流程思路更清晰明了,本次单讲主线启动流程。

namenode启动脚本和启动类

根据namenode的启动脚本,可以看到最终脚本是通过java启动了org.apache.hadoop.hdfs.server.namenode.NameNode类,因此直接去该类下找main方法,来从头看namenode的启动流程。

启动流程

org.apache.hadoop.hdfs.server.namenode.NameNode类的main方法如下:

  /**
   * 
   * todo: namenode启动类的main方法
   * 
   */

  public static void main(String argv[]) throws Exception {
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      /**
       * todo: 创建namenode对象,进去看一看
       */

      NameNode namenode = createNameNode(argv, null);
      if (namenode != null) {
        namenode.join();
      }
    } catch (Throwable e) {
      LOG.error("Failed to start namenode.", e);
      terminate(1, e);
    }
  }

核心代码就一行,调用createNameNode方法,创建namenode对象,进去看一看

  public static NameNode createNameNode(String argv[], Configuration conf)
      throws IOException 
{
    LOG.info("createNameNode " + Arrays.asList(argv));
    if (conf == null)
      conf = new HdfsConfiguration();
    // Parse out some generic args into Configuration.
    GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
    argv = hParser.getRemainingArgs();
    // Parse the rest, NN specific args.
    StartupOption startOpt = parseArguments(argv);
    if (startOpt == null) {
      printUsage(System.err);
      return null;
    }
    setStartupOption(conf, startOpt);

    /**
     * todo: 这里都是在处理参数,像我们执行hdfs命令,可以format数据目录,执行升级,回滚等操作。这些我们都不关心,只关心正常的namenode启动流程。
     */

    switch (startOpt) {
      case FORMAT: {
        boolean aborted = format(conf, startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());
        terminate(aborted ? 1 : 0);
        return null// avoid javac warning
      }
      case GENCLUSTERID: {
        System.err.println("Generating new cluster id:");
        System.out.println(NNStorage.newClusterID());
        terminate(0);
        return null;
      }
      case FINALIZE: {
        System.err.println("Use of the argument '" + StartupOption.FINALIZE +
            "' is no longer supported. To finalize an upgrade, start the NN " +
            " and then run `hdfs dfsadmin -finalizeUpgrade'");
        terminate(1);
        return null// avoid javac warning
      }
      case ROLLBACK: {
        boolean aborted = doRollback(conf, true);
        terminate(aborted ? 1 : 0);
        return null// avoid warning
      }
      case BOOTSTRAPSTANDBY: {
        String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
        int rc = BootstrapStandby.run(toolArgs, conf);
        terminate(rc);
        return null// avoid warning
      }
      case INITIALIZESHAREDEDITS: {
        boolean aborted = initializeSharedEdits(conf,
            startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());
        terminate(aborted ? 1 : 0);
        return null// avoid warning
      }
      case BACKUP:
      case CHECKPOINT: {
        NamenodeRole role = startOpt.toNodeRole();
        DefaultMetricsSystem.initialize(role.toString().replace(" """));
        return new BackupNode(conf, role);
      }
      case RECOVER: {
        NameNode.doRecovery(startOpt, conf);
        return null;
      }
      case METADATAVERSION: {
        printMetadataVersion(conf);
        terminate(0);
        return null// avoid javac warning
      }
      case UPGRADEONLY: {
        DefaultMetricsSystem.initialize("NameNode");
        new NameNode(conf);
        terminate(0);
        return null;
      }
      default: {
        DefaultMetricsSystem.initialize("NameNode");
        /**
         * todo: 其他case都是特殊的管理操作命令,正常启动都不会进去,那么默认就执行下面的语句,来new一个Namenode对象
         */

        return new NameNode(conf);
      }
    }
  }

createNameNode方法会解析命令行参数,根据不同的子命令,进入到不同的case,正常启动namenode,是不会进入case分支,最终就到default分支,new一个NameNode对象,实例化代码如下:

  public NameNode(Configuration conf) throws IOException {
    this(conf, NamenodeRole.NAMENODE);
  }

  protected NameNode(Configuration conf, NamenodeRole role) 
      throws IOException 

    this.conf = conf;
    this.role = role;
    setClientNamenodeAddress(conf);
    String nsId = getNameServiceId(conf);
    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
    this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
    state = createHAState(getStartupOption(conf));
    this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
    this.haContext = createHAContext();
    try {
      // 设置nameservice之类的配置
      initializeGenericKeys(conf, nsId, namenodeId);
      /**
       * todo: 重点,构建rpc服务端,加载数据文件都是在initialize方法里进行
       */

      initialize(conf);
      try {
        haContext.writeLock();
        state.prepareToEnterState(haContext);
        state.enterState(haContext);
      } finally {
        haContext.writeUnlock();
      }
    } catch (IOException e) {
      this.stop();
      throw e;
    } catch (HadoopIllegalArgumentException e) {
      this.stop();
      throw e;
    }
    this.started.set(true);
  }

实例化NameNode方法做了一些选项处理和配置项处理,重点在initialize方法,里面加载image文件到内存,构建了FSNamesystem对象,启动了namenode的rpc服务端。其中FSNamesystem是非常重要的一个对象,它还持有很多namenode的核心对象。

  /**
   * Initialize name-node.
   * todo: 初始化namenode,构建FSNamesystem对象,并从磁盘加载数据文件到内存,最后构建并启动rpc服务端。
   * 
   * @param conf the configuration
   */

  protected void initialize(Configuration conf) throws IOException {
    if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
      String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
      if (intervals != null) {
        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
          intervals);
      }
    }

    UserGroupInformation.setConfiguration(conf);
    loginAsNameNodeUser(conf);

    NameNode.initMetrics(conf, this.getRole());
    StartupProgressMetrics.register(startupProgress);

    if (NamenodeRole.NAMENODE == role) {
      // 启动http服务,就是namenode的50070服务
      startHttpServer(conf);
    }

    this.spanReceiverHost =
      SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);

    /**
     * todo: 核心方法,构建FSNamesystem对象,并加载数据文件到内存
     */

    loadNamesystem(conf);

    /**
     * todo: 创建rpc服务端
     */

    rpcServer = createRpcServer(conf);
    if (clientNamenodeAddress == null) {
      // This is expected for MiniDFSCluster. Set it now using 
      // the RPC server's bind address.
      clientNamenodeAddress = 
          NetUtils.getHostPortString(rpcServer.getRpcAddress());
      LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
          + " this namenode/service.");
    }
    if (NamenodeRole.NAMENODE == role) {
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
    }

    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

    /**
     * todo: 启动http服务,启动rpc服务,启动一系列管理线程,如副本管理线程,块管理线程,datanode管理线程等
     */

    startCommonServices(conf);
  }

比较重要的是loadNamesystem和startCommonServices方法,其中loadNamesystem方法构建了FSNamesystem,之后FSNamesystem对象加载数据文件到内存。然后startCommonServices方法启动了一系列重要管理线程。

1、loadNamesystem方法

  protected void loadNamesystem(Configuration conf) throws IOException {
    this.namesystem = FSNamesystem.loadFromDisk(conf);
  }

loadFromDisk方法返回FSNamesystem对象,该方法的注释说明已经简要说明了该方法的功能,即构建一个加载了image和edits的FSNamesystem对象,该对象包含了namespace内存命名空间。

  /**
   * Instantiates an FSNamesystem loaded from the image and edits
   * directories specified in the passed Configuration.
   *
   * @param conf the Configuration which specifies the storage directories
   *             from which to load
   * @return an FSNamesystem which contains the loaded namespace
   * @throws IOException if loading fails
   */

  static FSNamesystem loadFromDisk(Configuration conf) throws IOException {

    checkConfiguration(conf);
    /**
     * todo: 先构建一个FSImage对象,通过类名就知道,文件映像类,真正执行加载数据文件的类。
     *       它里面最终会通过protobuf格式的loader来加载数据文件,因为namenode的数据文件是protobuf格式序列化存储的。
     */

    FSImage fsImage = new FSImage(conf,
        FSNamesystem.getNamespaceDirs(conf),
        FSNamesystem.getNamespaceEditsDirs(conf));

    /**
     * todo: 创建FSNamesystem对象,传入了刚刚创建的FSImage对象
     */

    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
    StartupOption startOpt = NameNode.getStartupOption(conf);
    if (startOpt == StartupOption.RECOVER) {
      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
    }

    long loadStart = monotonicNow();
    try {
      /**
       * todo: 加载数据文件到内存
       */

      namesystem.loadFSImage(startOpt);
    } catch (IOException ioe) {
      LOG.warn("Encountered exception loading fsimage", ioe);
      fsImage.close();
      throw ioe;
    }
    long timeTakenToLoadFSImage = monotonicNow() - loadStart;
    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
    NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
    if (nnMetrics != null) {
      nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
    }
    /**
     * todo: 最后返回FSNamesystem对象
     */

    return namesystem;
  }

2、startCommonServices方法

  /** Start the services common to active and standby states */
  private void startCommonServices(Configuration conf) throws IOException {
    /**
     * todo: namesystem就是FSNamesystem对象,就是之前loadFromDisk创建的。
     *       然后调用startCommonServices方法,启动一些公共服务线程。
     *       另外还会做一些资源检查和块检查工作,来决定是否进入安全模式。
     */

    namesystem.startCommonServices(conf, haContext);
    registerNNSMXBean();
    if (NamenodeRole.NAMENODE != role) {
      /**
       * todo: 启动http服务
       */

      startHttpServer(conf);
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
    }
    /**
     * todo: 启动rpc服务
     */

    rpcServer.start();
    plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
        ServicePlugin.class);
    for (ServicePlugin p: plugins) {
      try {
        p.start(this);
      } catch (Throwable t) {
        LOG.warn("ServicePlugin " + p + " could not be started", t);
      }
    }
    LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
    if (rpcServer.getServiceRpcAddress() != null) {
      LOG.info(getRole() + " service RPC up at: "
          + rpcServer.getServiceRpcAddress());
    }
  }

http和rpc服务到此就完全启动了,然后再看看startCommonServices里面做了什么

  /** 
   * Start services common to both active and standby states
   */

  void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
    this.registerMBean(); // register the MBean for the FSNamesystemState
    /**
     * todo: 为了线程安全,这里加了写锁,因为要访问之前加载到内存的元数据
     */

    writeLock();
    this.haContext = haContext;
    try {
      /**
       * todo: 创建一个资源检查器,然后执行checkAvailableResources方法来检查磁盘空间是否够用,设置hasResourcesAvailable标志,后面会用。
       */

      nnResourceChecker = new NameNodeResourceChecker(conf);
      checkAvailableResources();
      assert safeMode != null && !isPopulatingReplQueues();
      StartupProgress prog = NameNode.getStartupProgress();
      prog.beginPhase(Phase.SAFEMODE);
      prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        getCompleteBlocksTotal());

      /**
       * todo: 设置总的块数量,同时在该方法里判断是否要进入安全模式
       */

      setBlockTotal();
      /**
       * todo: 启动一系列管理线程
       */

      blockManager.activate(conf);
    } finally {
      writeUnlock();
    }

    registerMXBean();
    DefaultMetricsSystem.instance().register(this);
    if (inodeAttributeProvider != null) {
      inodeAttributeProvider.start();
      dir.setINodeAttributeProvider(inodeAttributeProvider);
    }
    snapshotManager.registerMXBean();
  }

1) 看看如何判断进入安全模式
  /**
   * Set the total number of blocks in the system. 
   */

  public void setBlockTotal() {
    // safeMode is volatile, and may be set to null at any time
    SafeModeInfo safeMode = this.safeMode;
    if (safeMode == null)
      return;
    safeMode.setBlockTotal((int)getCompleteBlocksTotal());
  }

继续看看setBlockTotal方法

    /**
     * Set total number of blocks.
     */

    private synchronized void setBlockTotal(int total) {
      this.blockTotal = total;
      this.blockThreshold = (int) (blockTotal * threshold);
      this.blockReplQueueThreshold = 
        (int) (blockTotal * replQueueThreshold);
      if (haEnabled) {
        // After we initialize the block count, any further namespace
        // modifications done while in safe mode need to keep track
        // of the number of total blocks in the system.
        this.shouldIncrementallyTrackBlocks = true;
      }
      if(blockSafe < 0)
        this.blockSafe = 0;
      /**
       * todo: 判断是否进入安全模式
       */

      checkMode();
    }

checkMode是判断是否进入安全模式的方法,下面是checkMode的节选

      if (smmthread == null && needEnter()) {
        /**
         * todo: 如果needEnter,那么就enter,进入安全模式
         */

        enter();
        // check if we are ready to initialize replication queues
        if (canInitializeReplQueues() && !isPopulatingReplQueues()
            && !haEnabled) {
          initializeReplQueues();
        }
        reportStatus("STATE* Safe mode ON."false);
        return;
      }

needEnter会判断三件事:

  1. 汇报上来的块是否足够了

  2. 判断活的datanode数里是否达到要求

  3. 判断namenode的资源是否够,就是根据之前检查磁盘资源后设置的hasResourcesAvailable变量判断的

    /** 
     * There is no need to enter safe mode 
     * if DFS is empty or {@link #threshold} == 0
     */

    private boolean needEnter() {
      return (threshold != 0 && blockSafe < blockThreshold) ||
        (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
        (!nameNodeHasResourcesAvailable());
    }

如果满足上面三个任意一个条件,就会执行enter方法进入安全模式,该方法就是设置了标志位。

    /**
     * Enter safe mode.
     */

    private void enter() {
      this.reached = 0;
      this.reachedTimestamp = 0;
    }

2) 再看看启动了哪些线程
  public void activate(Configuration conf) {
    pendingReplications.start();  //PendingReplicationMonitor线程
    datanodeManager.activate(conf); //datanode管理线程,如心跳线程,下线线程
    this.replicationThread.start(); //ReplicationMonitor线程
    this.blockReportThread.start(); //块汇报线程
  }

到此整个namenode的启动就完成了。

最后

namenode的启动流程主线就是这些,其中有几个重要的分支,没有细化深入,比如rpc服务如何构建和使用的,namenode数据文件格式如何加载和组织的,内存如何管理Inode和块的等等,这些后续再逐个细化深入分析。

IT优秀博客推荐

文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论