整个心跳的流程如下图所示:

//这里的 shouldRun()方法默认返回 turewhile (shouldRun()) {try {//TODO 发送心跳offerService();} catch (Exception ex) {LOG.error("Exception in BPOfferService for " + this, ex);sleepAndLogInterrupts(5000, "offering service");}}
private void offerService() throws Exception {...while (shouldRun()) {try {//记录开始时间,以毫秒为单位final long startTime = monotonicNow();//TODO 心跳是每3秒进行一次//startTime:当前时间 lastHeartbeat:上次心跳的时间 heartBeatInterval:3秒//当满足if条件时,说明需要发送心跳了if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {//更新最后一次心跳的时间lastHeartbeat = startTime;//areHeartbeatsDisabledForTests()方法默认返回 falseif (!dn.areHeartbeatsDisabledForTests()) {//TODO 发送心跳,返回来的响应是NameNode给的指令HeartbeatResponse resp = sendHeartBeat();assert resp != null;dn.getMetrics().addHeartbeat(monotonicNow() - startTime);bpos.updateActorStatesFromHeartbeat(this, resp.getNameNodeHaState());state = resp.getNameNodeHaState().getState();if (state == HAServiceState.ACTIVE) {handleRollingUpgradeStatus(resp);}long startProcessCommands = monotonicNow();//TODO 获取到一些NameNode发送过来的指令//然后执行这些指令if (!processCommand(resp.getCommands()))continue;long endProcessCommands = monotonicNow();if (endProcessCommands - startProcessCommands > 2000) {LOG.info("Took " + (endProcessCommands - startProcessCommands)+ "ms to process " + resp.getCommands().length+ " commands from NN");}}}...}
HeartbeatResponse sendHeartBeat() throws IOException {//TODO 每隔3秒就运行一次StorageReport[] reports =dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());if (LOG.isDebugEnabled()) {LOG.debug("Sending heartbeat with " + reports.length +" storage reports from service actor: " + this);}VolumeFailureSummary volumeFailureSummary = dn.getFSDataset().getVolumeFailureSummary();int numFailedVolumes = volumeFailureSummary != null ?volumeFailureSummary.getFailedStorageLocations().length : 0;//TODO 发送心跳,bpNamenode是NameNodeRpcServer的代理对象,// DN发送心跳时会携带一些报告,reports里面保存了存储容量,使用量和剩余量,// 因此可以在50070界面看到DN的存储信息// 还会上报一些信息,比如DN有几块盘坏了return bpNamenode.sendHeartbeat(bpRegistration,reports,dn.getFSDataset().getCacheCapacity(),dn.getFSDataset().getCacheUsed(),dn.getXmitsInProgress(),dn.getXceiverCount(),numFailedVolumes,volumeFailureSummary);}
@Overridepublic HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,int xmitsInProgress, int xceiverCount,int failedVolumes, VolumeFailureSummary volumeFailureSummary)throws IOException {//检查NN是否启动checkNNStartup();verifyRequest(nodeReg);//TODO 处理DataNode发送过来的心跳return namesystem.handleHeartbeat(nodeReg, report,dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,failedVolumes, volumeFailureSummary);}
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,StorageReport[] reports, long cacheCapacity, long cacheUsed,int xceiverCount, int xmitsInProgress, int failedVolumes,VolumeFailureSummary volumeFailureSummary) throws IOException {readLock();try {final int maxTransfer = blockManager.getMaxReplicationStreams()- xmitsInProgress;//TODO NameNode处理DataNode发送过来的心跳,并返回指令集DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(haContext.getState().getServiceState(),getFSImage().getLastAppliedOrWrittenTxId());//TODO 给DataNode返回响应return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);} finally {readUnlock();}}
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,StorageReport[] reports, final String blockPoolId,long cacheCapacity, long cacheUsed, int xceiverCount,int maxTransfers, int failedVolumes,VolumeFailureSummary volumeFailureSummary) throws IOException {synchronized (heartbeatManager) {synchronized (datanodeMap) {DatanodeDescriptor nodeinfo = null;try {//TODO 从已有DataNodeMap里面获取到已经注册过来的DataNode信息,// 如果能获取到信息说明这个DataNode已经注册过了,// 但如果是第一次DataNodeMap里面是没有信息的nodeinfo = getDatanode(nodeReg);} catch(UnregisteredNodeException e) {return new DatanodeCommand[]{RegisterCommand.REGISTER};}//检查DataNode是否需要下线if (nodeinfo != null && nodeinfo.isDisallowed()) {setDatanodeDead(nodeinfo);throw new DisallowedDatanodeException(nodeinfo);}if (nodeinfo == null || !nodeinfo.isAlive) {return new DatanodeCommand[]{RegisterCommand.REGISTER};}//TODO 更新心跳的重要信息heartbeatManager.updateHeartbeat(nodeinfo, reports,cacheCapacity, cacheUsed,xceiverCount, failedVolumes,volumeFailureSummary);...}
然后更新这个 DataNode 的心跳信息。在 NameNode 启动过程中,启动了一些公共服务,其中就包括心跳管理器 HeartbeatManager,在 DataNode 注册的过程中,也将自身的信息保存到了心跳管理器中,这里就是要更新这些信息。
在 HeartbeatManager.updateHeartbeat() 方法中,调用了 DatanodeDescriptor.updateHeartbeat() 方法,之后又调用了updateHeartbeatState() 方法,这里我们直接看这个方法
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,long cacheUsed, int xceiverCount, int volFailures,VolumeFailureSummary volumeFailureSummary) {//初始化一些存储的变量long totalCapacity = 0;long totalRemaining = 0;long totalBlockPoolUsed = 0;long totalDfsUsed = 0;...//TODO 更新该DataNode的存储信息setCacheCapacity(cacheCapacity);setCacheUsed(cacheUsed);setXceiverCount(xceiverCount);//TODO 重要!!修改该DataNode上一次心跳的时间,// DataNode每心跳一次,就要刷新最后心跳的时间为当前时间,// 根据这个时间来判断是否超过630秒DataNode没有进行心跳,从而判断DataNode下线setLastUpdate(Time.now());setLastUpdateMonotonic(Time.monotonicNow());...}}
该方法的作用主要有两个:
根据 DataNode 心跳中携带的存储信息,更新 NameNode 中该 DataNode 对应的存储信息,以便于 NameNode 统计整个集群的存储状况 更新该 DataNode 最后一次心跳的时间,NameNode 要根据 DataNode 最后一次心跳的时间来判定该 DataNode 是否还在线
private final Daemon heartbeatThread = new Daemon(new Monitor());
@Overridepublic void run() {while(namesystem.isRunning()) {try {//获取当前时间final long now = Time.monotonicNow();if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {//如果超过了30秒,就进行心跳检查//TODO 心跳检查heartbeatCheck();lastHeartbeatCheck = now;}if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {synchronized(HeartbeatManager.this) {for(DatanodeDescriptor d : datanodes) {d.needKeyUpdate = true;}}lastBlockKeyUpdate = now;}} catch (Exception e) {LOG.error("Exception while checking heartbeat", e);}try {Thread.sleep(5000); // 5 seconds} catch (InterruptedException ie) {}}}
获取当前时间 判断距离最后一次心跳检查的时间是否超过 30 秒 如果超过 30 秒则执行心跳检查 休息 5 秒之后继续执行上面的逻辑
void heartbeatCheck() {final DatanodeManager dm = blockManager.getDatanodeManager();//如果处于安全模式,直接返回if (namesystem.isInStartupSafeMode()) {return;}//标识所有DN是否全部在线,如果全部在线,下面的while循环只执行一次boolean allAlive = false;while (!allAlive) {DatanodeID dead = null;DatanodeStorageInfo failedStorage = null;int numOfStaleNodes = 0;int numOfStaleStorages = 0;synchronized(this) {/*** 注册的时候就是将DataNode的信息添加到了datanodes这个数据结构里面* 修改datanode上一次心跳时间也是更新的这个数据结构里面的datanode信息*///遍历所有的DataNodefor (DatanodeDescriptor d : datanodes) {//isDatanodeDead(d)方法用来判断这个datanode是否已经下线if (dead == null && dm.isDatanodeDead(d)) {stats.incrExpiredHeartbeats();dead = d;}//如果DN超过630秒没发送心跳if (d.isStale(dm.getStaleInterval())) {//过期DN的数量+1numOfStaleNodes++;}//获取该节点的存储信息DatanodeStorageInfo[] storageInfos = d.getStorageInfos();for(DatanodeStorageInfo storageInfo : storageInfos) {if (storageInfo.areBlockContentsStale()) {//失败存储的数量+1numOfStaleStorages++;}if (failedStorage == null &&storageInfo.areBlocksOnFailedStorage() &&d != dead) {failedStorage = storageInfo;}}}//更新过期的节点数量dm.setNumStaleNodes(numOfStaleNodes);//更新过期的存储数量dm.setNumStaleStorages(numOfStaleStorages);}//allAlive标识DN是否全部在线,只有所有DN都发送了心跳,且不存在损坏的存储时该变量为trueallAlive = dead == null && failedStorage == null;//如果存在下线的节点if (dead != null) {namesystem.writeLock();try {if (namesystem.isInStartupSafeMode()) {return;}synchronized(this) {//TODO 将下线的DataNode移除dm.removeDeadDatanode(dead);}} finally {namesystem.writeUnlock();}}//如果存在失败的存储if (failedStorage != null) {namesystem.writeLock();try {if (namesystem.isInStartupSafeMode()) {return;}synchronized(this) {//将失败的存储进行移除blockManager.removeBlocksAssociatedTo(failedStorage);}} finally {namesystem.writeUnlock();}}}}
遍历 NameNode 中注册的所有 DataNode 信息,检查是否有下线的节点或者节点上有失败的存储 如果存在下线节点或者失败存储,将其从对应的数据结构中移除 如果不存在下线节点或者失败存储,循环结束,该方法返回 否则继续循环执行步骤1,2,3
其中 isDatanodeDead() 方法用来判断该 DataNode 是否离线,判定方法是:如果当前时间 - 630 秒 > 最后一次心跳时间,则说明该 DataNode 已经离线。也就是说如果一个 DataNode 超过 630 秒没有发送心跳,NameNode 就判定该 DataNode 下线了
boolean isDatanodeDead(DatanodeDescriptor node) {return (node.getLastUpdateMonotonic() <(monotonicNow() - heartbeatExpireInterval));//heartbeatExpireInterval:10分钟+30秒}
this.heartbeatExpireInterval =2 * heartbeatRecheckInterval//2*5分钟+ 10 * 1000 * heartbeatIntervalSeconds;//10*1000*3=30秒// 2 * 5 * 60 * 1000 + 10 * 1000 * 3 =630000 毫秒 = 630秒
//TODO 将下线的DataNode移除dm.removeDeadDatanode(dead);
这里调用了 DatanodeManager.removeDeadDatanode() 方法,继而调用 removeDatanode() 方法:
移除 DataNode 节点的本质就是从 NameNode 的各种数据结构中移除该 DataNode 的信息
private void removeDatanode(DatanodeDescriptor nodeInfo) {assert namesystem.hasWriteLock();//从各种数据结构中移除该DN的信息heartbeatManager.removeDatanode(nodeInfo);blockManager.removeBlocksAssociatedTo(nodeInfo);networktopology.remove(nodeInfo);decrementVersionCount(nodeInfo.getSoftwareVersion());if (LOG.isDebugEnabled()) {LOG.debug("remove datanode " + nodeInfo);}//再次进行安全模式的检查namesystem.checkSafeMode();}
DataNode 发送心跳的时间间隔为 3 秒 NameNode 的心跳管理器每隔 5 秒会检查一次距离上次心跳检查是否超过 30 秒,如果超过则执行心跳检查 如果一个 DataNode 超过 630 秒未向 NameNode 发送心跳,NameNode 会判定该 DataNode 下线 DataNode 注册的本质是向 NameNode 的各种数据结构中写入该 DataNode 的信息 DataNode 下线的本质是从 NameNode 的各种数据结构中移除该 DataNode 的信息
文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




