暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

HDFS|DataNode 和 NameNode 的心跳流程分析

大数据记事本 2021-05-08
2294
    心跳机制是长连接场景下一种基本的验活手段,在 HDFS 中也不例外,DataNode 需要定时向 NameNode 发送心跳,以证明自己在线(存活),同时会上报自身的一些存储情况,以便 NameNode 统计整个集群的存储状况。除此之外,NameNode 交给 DataNode 的一系列操作指令,也是在 DataNode 发送心跳时和响应一起返回的(NameNode 无法主动向 DataNode 发送操作指令)。

整个心跳的流程如下图所示:

    在上一篇中,已经分析了 DataNode 如何向 NameNode 进行注册。当 DN 向 NN 注册成功后,会发送心跳,相关代码还是在 BPServiceActor.run() 方法中
    //这里的 shouldRun()方法默认返回 ture
    while (shouldRun()) {
    try {
    //TODO 发送心跳
    offerService();
    } catch (Exception ex) {
    LOG.error("Exception in BPOfferService for " + this, ex);
    sleepAndLogInterrupts(5000, "offering service");
    }
    }
        发送心跳的关键方法为 offerService() 。该方法通过一个 while 循环,不断地判断是否需要发送心跳,如果当前时间距离最后一次发送心跳的时间间隔超过3秒,就会调用 sendHeartBeat() 方法向 NameNode 发送心跳,该方法的返回值为 HeartbeatResponse 类型,里面包含了 NameNode 给 DataNode 返回的操作指令(如创建文件、删除文件的指令等)。之后 DataNode 会执行 NameNode 返回的指令。
      private void offerService() throws Exception {
      ...
      while (shouldRun()) {
      try {
      //记录开始时间,以毫秒为单位
      final long startTime = monotonicNow();
      //TODO 心跳是每3秒进行一次
      //startTime:当前时间 lastHeartbeat:上次心跳的时间 heartBeatInterval:3秒
      //当满足if条件时,说明需要发送心跳了
      if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {


      //更新最后一次心跳的时间
      lastHeartbeat = startTime;
      //areHeartbeatsDisabledForTests()方法默认返回 false
      if (!dn.areHeartbeatsDisabledForTests()) {
      //TODO 发送心跳,返回来的响应是NameNode给的指令
      HeartbeatResponse resp = sendHeartBeat();
      assert resp != null;
      dn.getMetrics().addHeartbeat(monotonicNow() - startTime);


      bpos.updateActorStatesFromHeartbeat(
      this, resp.getNameNodeHaState());
      state = resp.getNameNodeHaState().getState();


      if (state == HAServiceState.ACTIVE) {
      handleRollingUpgradeStatus(resp);
      }


      long startProcessCommands = monotonicNow();
      //TODO 获取到一些NameNode发送过来的指令
      //然后执行这些指令
      if (!processCommand(resp.getCommands()))
      continue;
      long endProcessCommands = monotonicNow();
      if (endProcessCommands - startProcessCommands > 2000) {
      LOG.info("Took " + (endProcessCommands - startProcessCommands)
      + "ms to process " + resp.getCommands().length
      + " commands from NN");
      }
      }
      }
         ...
      }
      这里重点关注 sentHeartBeat() 方法
        HeartbeatResponse sendHeartBeat() throws IOException {
        //TODO 每隔3秒就运行一次
        StorageReport[] reports =
        dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
        if (LOG.isDebugEnabled()) {
        LOG.debug("Sending heartbeat with " + reports.length +
        " storage reports from service actor: " + this);
        }


        VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
        .getVolumeFailureSummary();
        int numFailedVolumes = volumeFailureSummary != null ?
        volumeFailureSummary.getFailedStorageLocations().length : 0;
        //TODO 发送心跳,bpNamenode是NameNodeRpcServer的代理对象,
        // DN发送心跳时会携带一些报告,reports里面保存了存储容量,使用量和剩余量,
        // 因此可以在50070界面看到DN的存储信息
        // 还会上报一些信息,比如DN有几块盘坏了
        return bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary);
        }
            该方法通过 bpNamenode (NameNodeRPC代理对象,之前已经获取过)代理对象,调用 NameNodeRpcServer.sendHeartbeat() 方法,其参数列表中,会携带 DataNode 的一些上报信息,如存储总量,以及存储的使用量、剩余量,磁盘损坏情况等,这样 NameNode 就可以对这些信息进行统计。
            从这里开始,由于进行了 RPC 调用,查看的就应该是 NameNodeRpcServer.sendHeartbeat() 方法
          @Override 
          public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
          StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
          int xmitsInProgress, int xceiverCount,
          int failedVolumes, VolumeFailureSummary volumeFailureSummary)
          throws IOException {
          //检查NN是否启动
          checkNNStartup();
          verifyRequest(nodeReg);
          //TODO 处理DataNode发送过来的心跳
          return namesystem.handleHeartbeat(nodeReg, report,
          dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
          failedVolumes, volumeFailureSummary);
          }
          其处理心跳的核心逻辑是调用 FSNamesystem.handleHeartbeat() 方法
            HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
            StorageReport[] reports, long cacheCapacity, long cacheUsed,
            int xceiverCount, int xmitsInProgress, int failedVolumes,
            VolumeFailureSummary volumeFailureSummary) throws IOException {
            readLock();
            try {
            final int maxTransfer = blockManager.getMaxReplicationStreams()
            - xmitsInProgress;
            //TODO NameNode处理DataNode发送过来的心跳,并返回指令集
            DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
            nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
            xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);


            final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
            haContext.getState().getServiceState(),
            getFSImage().getLastAppliedOrWrittenTxId());
            //TODO 给DataNode返回响应
            return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
            } finally {
            readUnlock();
            }
            }
                在该方法内部,真正处理 DataNode 心跳的是 DataNodeManager.handleHeartbeat() 方法,该方法返回 NameNode 需要 DataNode 执行的一系列指令集合。最后将这个指令集封装到 HeartbeatResponse 响应对象中给 DataNode 返回。
              public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
              StorageReport[] reports, final String blockPoolId,
              long cacheCapacity, long cacheUsed, int xceiverCount,
              int maxTransfers, int failedVolumes,
              VolumeFailureSummary volumeFailureSummary) throws IOException {
              synchronized (heartbeatManager) {
              synchronized (datanodeMap) {
              DatanodeDescriptor nodeinfo = null;
              try {
              //TODO 从已有DataNodeMap里面获取到已经注册过来的DataNode信息,
              // 如果能获取到信息说明这个DataNode已经注册过了,
              // 但如果是第一次DataNodeMap里面是没有信息的
              nodeinfo = getDatanode(nodeReg);
              } catch(UnregisteredNodeException e) {
              return new DatanodeCommand[]{RegisterCommand.REGISTER};
              }


              //检查DataNode是否需要下线
              if (nodeinfo != null && nodeinfo.isDisallowed()) {
              setDatanodeDead(nodeinfo);
              throw new DisallowedDatanodeException(nodeinfo);
              }


              if (nodeinfo == null || !nodeinfo.isAlive) {
              return new DatanodeCommand[]{RegisterCommand.REGISTER};
              }
              //TODO 更新心跳的重要信息
              heartbeatManager.updateHeartbeat(nodeinfo, reports,
              cacheCapacity, cacheUsed,
              xceiverCount, failedVolumes,
              volumeFailureSummary);
              ...
              }
                  在这个方法中,首先根据参数列表中的 DataNode 注册信息,从 NameNode 的数据结构中获取到该 DataNode 的信息。在 DataNode 注册阶段,我们知道注册的本质就是将 DataNode 的信息存放到 NameNode 的一系列数据结构中,这里是反向将 DataNode 的信息从这些数据结构中取出来。

                  然后更新这个 DataNode 的心跳信息。在 NameNode 启动过程中,启动了一些公共服务,其中就包括心跳管理器 HeartbeatManager,在 DataNode 注册的过程中,也将自身的信息保存到了心跳管理器中,这里就是要更新这些信息。

                  在 HeartbeatManager.updateHeartbeat() 方法中,调用了 DatanodeDescriptor.updateHeartbeat() 方法,之后又调用了updateHeartbeatState() 方法,这里我们直接看这个方法

                public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
                long cacheUsed, int xceiverCount, int volFailures,
                VolumeFailureSummary volumeFailureSummary) {
                //初始化一些存储的变量
                long totalCapacity = 0;
                long totalRemaining = 0;
                long totalBlockPoolUsed = 0;
                long totalDfsUsed = 0;
                ...
                //TODO 更新该DataNode的存储信息
                setCacheCapacity(cacheCapacity);
                setCacheUsed(cacheUsed);
                setXceiverCount(xceiverCount);


                //TODO 重要!!修改该DataNode上一次心跳的时间,
                // DataNode每心跳一次,就要刷新最后心跳的时间为当前时间,
                // 根据这个时间来判断是否超过630秒DataNode没有进行心跳,从而判断DataNode下线
                setLastUpdate(Time.now());
                setLastUpdateMonotonic(Time.monotonicNow());
                ...
                }
                }

                该方法的作用主要有两个:

                1. 根据 DataNode 心跳中携带的存储信息,更新 NameNode 中该 DataNode 对应的存储信息,以便于 NameNode 统计整个集群的存储状况
                2. 更新该 DataNode 最后一次心跳的时间,NameNode 要根据 DataNode 最后一次心跳的时间来判定该 DataNode 是否还在线
                    至此,DataNode 发送心跳以及 NameNode 处理心跳的流程基本已经结束了,那么它更新的最后一次心跳时间是在哪里用到的呢?也就是说,NameNode 如何判定一个 DataNode 是否在线呢?这里就是由 NameNode 的心跳管理器 HeartbeatManager 管理的,其内部会启动一个心跳线程 heartbeatThread
                  private final Daemon heartbeatThread = new Daemon(new Monitor());
                      该线程的任务类型为 Monitor,是 HeartbeatManager 的一个内部类,逻辑在其 run() 方法中:
                    @Override
                    public void run() {
                    while(namesystem.isRunning()) {
                    try {
                    //获取当前时间
                    final long now = Time.monotonicNow();
                    if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {//如果超过了30秒,就进行心跳检查
                    //TODO 心跳检查
                    heartbeatCheck();
                    lastHeartbeatCheck = now;
                    }
                    if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
                    synchronized(HeartbeatManager.this) {
                    for(DatanodeDescriptor d : datanodes) {
                    d.needKeyUpdate = true;
                    }
                    }
                    lastBlockKeyUpdate = now;
                    }
                    } catch (Exception e) {
                    LOG.error("Exception while checking heartbeat", e);
                    }
                    try {
                    Thread.sleep(5000); // 5 seconds
                    } catch (InterruptedException ie) {
                    }
                    }
                    }
                    该方法的逻辑是:
                    1. 获取当前时间
                    2. 判断距离最后一次心跳检查的时间是否超过 30 秒
                    3. 如果超过 30 秒则执行心跳检查
                    4. 休息 5 秒之后继续执行上面的逻辑
                    所以,心跳检查的关键方法为 heartbeatCheck(),代码如下:
                      void heartbeatCheck() {
                      final DatanodeManager dm = blockManager.getDatanodeManager();
                      //如果处于安全模式,直接返回
                      if (namesystem.isInStartupSafeMode()) {
                      return;
                      }
                      //标识所有DN是否全部在线,如果全部在线,下面的while循环只执行一次
                      boolean allAlive = false;
                      while (!allAlive) {
                      DatanodeID dead = null;


                      DatanodeStorageInfo failedStorage = null;


                      int numOfStaleNodes = 0;
                      int numOfStaleStorages = 0;
                      synchronized(this) {
                      /**
                      * 注册的时候就是将DataNode的信息添加到了datanodes这个数据结构里面
                      * 修改datanode上一次心跳时间也是更新的这个数据结构里面的datanode信息
                      */
                      //遍历所有的DataNode
                      for (DatanodeDescriptor d : datanodes) {
                      //isDatanodeDead(d)方法用来判断这个datanode是否已经下线
                      if (dead == null && dm.isDatanodeDead(d)) {
                      stats.incrExpiredHeartbeats();
                      dead = d;
                      }
                      //如果DN超过630秒没发送心跳
                      if (d.isStale(dm.getStaleInterval())) {
                      //过期DN的数量+1
                      numOfStaleNodes++;
                      }
                      //获取该节点的存储信息
                      DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
                      for(DatanodeStorageInfo storageInfo : storageInfos) {
                      if (storageInfo.areBlockContentsStale()) {
                      //失败存储的数量+1
                      numOfStaleStorages++;
                      }


                      if (failedStorage == null &&
                      storageInfo.areBlocksOnFailedStorage() &&
                      d != dead) {
                      failedStorage = storageInfo;
                      }
                      }


                      }
                      //更新过期的节点数量
                      dm.setNumStaleNodes(numOfStaleNodes);
                      //更新过期的存储数量
                      dm.setNumStaleStorages(numOfStaleStorages);
                      }
                      //allAlive标识DN是否全部在线,只有所有DN都发送了心跳,且不存在损坏的存储时该变量为true
                      allAlive = dead == null && failedStorage == null;
                      //如果存在下线的节点
                      if (dead != null) {
                      namesystem.writeLock();
                      try {
                      if (namesystem.isInStartupSafeMode()) {
                      return;
                      }
                      synchronized(this) {
                      //TODO 将下线的DataNode移除
                      dm.removeDeadDatanode(dead);
                      }
                      } finally {
                      namesystem.writeUnlock();
                      }
                      }
                      //如果存在失败的存储
                      if (failedStorage != null) {
                      namesystem.writeLock();
                      try {
                      if (namesystem.isInStartupSafeMode()) {
                      return;
                      }
                      synchronized(this) {
                      //将失败的存储进行移除
                      blockManager.removeBlocksAssociatedTo(failedStorage);
                      }
                      } finally {
                      namesystem.writeUnlock();
                      }
                      }
                      }
                      }
                      该方法的整体逻辑是:
                      1. 遍历 NameNode 中注册的所有 DataNode 信息,检查是否有下线的节点或者节点上有失败的存储
                      2. 如果存在下线节点或者失败存储,将其从对应的数据结构中移除
                      3. 如果不存在下线节点或者失败存储,循环结束,该方法返回
                      4. 否则继续循环执行步骤1,2,3

                          其中 isDatanodeDead() 方法用来判断该 DataNode 是否离线,判定方法是:如果当前时间 - 630 秒  > 最后一次心跳时间,则说明该 DataNode 已经离线。也就是说如果一个 DataNode 超过 630 秒没有发送心跳,NameNode 就判定该 DataNode 下线了

                        boolean isDatanodeDead(DatanodeDescriptor node) {
                        return (node.getLastUpdateMonotonic() <
                        (monotonicNow() - heartbeatExpireInterval));//heartbeatExpireInterval:10分钟+30秒
                        }
                          this.heartbeatExpireInterval = 
                          2 * heartbeatRecheckInterval//2*5分钟
                          + 10 * 1000 * heartbeatIntervalSeconds;//10*1000*3=30秒
                          // 2 * 5 * 60 * 1000 + 10 * 1000 * 3 =630000 毫秒 = 630秒
                          将下线 DataNode 移除的代码如下:
                            //TODO 将下线的DataNode移除
                            dm.removeDeadDatanode(dead);

                                这里调用了 DatanodeManager.removeDeadDatanode() 方法,继而调用 removeDatanode() 方法:

                                移除 DataNode 节点的本质就是从 NameNode 的各种数据结构中移除该 DataNode 的信息

                              private void removeDatanode(DatanodeDescriptor nodeInfo) {
                              assert namesystem.hasWriteLock();
                              //从各种数据结构中移除该DN的信息
                              heartbeatManager.removeDatanode(nodeInfo);
                              blockManager.removeBlocksAssociatedTo(nodeInfo);
                              networktopology.remove(nodeInfo);
                              decrementVersionCount(nodeInfo.getSoftwareVersion());


                              if (LOG.isDebugEnabled()) {
                              LOG.debug("remove datanode " + nodeInfo);
                              }
                              //再次进行安全模式的检查
                              namesystem.checkSafeMode();
                              }
                              总结:
                              • DataNode 发送心跳的时间间隔为 3 秒
                              • NameNode 的心跳管理器每隔 5 秒会检查一次距离上次心跳检查是否超过 30 秒,如果超过则执行心跳检查
                              • 如果一个 DataNode 超过 630 秒未向 NameNode 发送心跳,NameNode 会判定该 DataNode 下线
                              • DataNode 注册的本质是向 NameNode 的各种数据结构中写入该 DataNode 的信息
                              • DataNode 下线的本质是从 NameNode 的各种数据结构中移除该 DataNode 的信息
                              文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论