启动 DataXceiverServer 服务
启动 HttpServer 服务
启动 DataNode RPC 服务
DataNode 向 NameNode 注册并定时进行心跳
整个启动流程如下图所示:

DataNode is a class (and program) that stores a set of* blocks for a DFS deployment. A single deployment can* have one or many DataNodes. Each DataNode communicates* regularly with a single NameNode. It also communicates* with client code and other DataNodes from time to time.** TODO* DataNode存储HDFS上的block文件块,在一个文件系统中可以有多个* DataNode,每个DataNode周期性的跟NameNode进行通信,客户端也可以* 和DataNode进行交互,或者DataNode之间也可以进行交互** DataNodes store a series of named blocks. The DataNode* allows client code to read these blocks, or to write new* block data. The DataNode may also, in response to instructions* from its NameNode, delete blocks or copy blocks to/from other* DataNodes.** TODO* DataNode存储一系列block,DataNode允许客户端去读写block* DataNode也会去响应NameNode。响应NameNode发送过来的一系列指令,* 包括删除block,复制block等** The DataNode maintains just one critical table:* block-> stream of bytes (of BLOCK_SIZE or less)** This info is stored on a local disk. The DataNode* reports the table's contents to the NameNode upon startup* and every so often afterwards.** TODO* DataNode管理了一个重要的表:block-> stream of bytes(块和对应的块大小)* 这些信息存储在本地磁盘,DataNode启动的时候会把这些信息汇报给NameNode* 启动后也会不断地进行汇报** DataNodes spend their lives in an endless loop of asking* the NameNode for something to do. A NameNode cannot connect* to a DataNode directly; a NameNode simply returns values from* functions invoked by a DataNode.** TODO* NameNode是不能直接去操作DataNode的。DataNode启动以后,会跟NameNode进行心跳,NameNode* 接收到心跳以后,如果需要这个DataNode做什么事,就会给DataNode一个返回值(指令),DataNode* 接收到这些指令以后就知道NameNode想要它做什么了** DataNodes maintain an open server socket so that client code* or other DataNodes can read/write data. The host/port for* this server is reported to the NameNode, which then sends that* information to clients or other DataNodes that might be interested.** TODO* DataNode开放了socket服务,让客户端或者别的DataNode进行读写数据* DataNode启动的时候会把自己的主机名和端口号汇报给NameNode* 也就是说如果client或者另外的DataNode想要去访问某个DataNode,首先要和NameNode进行通信* 从NameNode那里获取目标Data的主机名和端口号,这样才能访问到对应的DataNode
一个集群中可以有多个DataNode,这些DataNode就是用来存储数据的 DataNode启动了以后会周期性地和NameNode进行通信(心跳,块汇报) NameNode不能直接操作DataNode,而是通过心跳返回指令的方式去操作 DataNode DataNode启动以后开放了一个socket服务,等待别人去调用它进行读写数据
执行main方法
主要调用了 secureMain() 方法
public static void main(String args[]) {if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {System.exit(0);}//TODO 核心代码secureMain(args, null);}
public static void secureMain(String args[], SecureResources resources) {int errorCode = 0;try {StringUtils.startupShutdownMessage(DataNode.class, args, LOG);//TODO 初始化DataNodeDataNode datanode = createDataNode(args, null, resources);...}
createDataNode 方法主要分为两步:
实例化 DataNode
启动 DataNode 后台线程,也就是启动实例化 DataNode 过程中初始化的一些服务对象,如RPC服务,BlockPoolManager服务等
实例化 DataNode 的过程相对复杂,这里先看一下启动 DataNode 后台线程具体做了什么:
启动了 BlockPoolManager服务 启动了 dataXceiveServer服务 启动了 DataNode RPC服务
public void runDatanodeDaemon() throws IOException {//启动BlockPoolManager服务blockPoolManager.startAll();// 启动dataXceiveServer服务dataXceiverServer.start();if (localDataXceiverServer != null) {localDataXceiverServer.start();}//启动RPC服务ipcServer.start();startPlugins(conf);}
public static DataNode createDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {//TODO 实例化DataNodeDataNode dn = instantiateDataNode(args, conf, resources);if (dn != null) {//TODO 启动DataNode后台线程dn.runDatanodeDaemon();}return dn;}
instantiateDataNode 方法内部依次调用了 makeInstance 方法以及 DataNode 的构造方法,这里直接看构造方法
DataNode(final Configuration conf,final List<StorageLocation> dataDirs,final SecureResources resources) throws IOException {...try {hostName = getHostName(conf);LOG.info("Configured hostname is " + hostName);//TODO 启动DataNodestartDataNode(conf, dataDirs, resources);} catch (IOException ie) {shutdown();throw ie;}...}
初始化 DataStorage 初始化 DataXceiverServer 服务 启动 HttpServer 服务 初始化 IpcServer,即RPC 服务 初始化 BlockPoolManager 对象,用于 DataNode 向 NameNode 注册并定时进行心跳
对应流程图中的如下部分:

void startDataNode(Configuration conf,List<StorageLocation> dataDirs,SecureResources resources) throws IOException {...//1.初始化DataStoragestorage = new DataStorage();// global DN settingsregisterMXBean();//TODO 2.初始化DataXceiverServerinitDataXceiver(conf);//TODO 3.启动HttpServer服务startInfoServer(conf);pauseMonitor = new JvmPauseMonitor(conf);pauseMonitor.start();this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();LOG.info("dnUserName = " + dnUserName);LOG.info("supergroup = " + supergroup);//TODO 4.初始化RPC服务initIpcServer(conf);metrics = DataNodeMetrics.create(conf, getDisplayName());metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);//TODO 5.创建了BlockPoolManager/*** BlockPool:正常情况下,一个集群只有一个BlockPool* 如果我们采用的是联邦机制,就会有多个namenode,也就会有多个联邦* 一个联邦就是一个BlockPool* 假设一个集群里面4个NameNode,2个联邦* 联邦一:hadoop1(Active);hadoop2(StandBy)(BlockPool是同一个)* 联邦二:hadoop3(Active);hadoop4(StandBy)(BlockPool是同一个)*/blockPoolManager = new BlockPoolManager(this);//TODO 重要!!// 1)向NameNode注册// 2)跟NameNode进行心跳blockPoolManager.refreshNamenodes(conf);readaheadPool = ReadaheadPool.getInstance();saslClient = new SaslDataTransferClient(dnConf.conf,dnConf.saslPropsResolver, dnConf.trustedChannelResolver);saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);}
//1.初始化DataStoragestorage = new DataStorage();
b.初始化 DataXceiverServer 服务
//TODO 2.初始化DataXceiverServerinitDataXceiver(conf);
private void initDataXceiver(Configuration conf) throws IOException {...//TODO 实例化了一个DataXceiverServer// 这个东西就是DataNode用来接收客户端和其他DataNode传过来数据的服务xserver = new DataXceiverServer(tcpPeerServer, conf, this);//设置为后台线程this.dataXceiverServer = new Daemon(threadGroup, xserver);this.threadGroup.setDaemon(true); // auto destroy when empty...}
和 NameNode 启动过程一样,DataNode 启动过程中也会启动一个 HttpServer2 服务 infoServer,然后在上面绑定多个 servlet
private void startInfoServer(Configuration conf)throws IOException {Configuration confForInfoServer = new Configuration(conf);confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);//TODO 用来接收Http请求HttpServer2.Builder builder = new HttpServer2.Builder().setName("datanode").setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))).addEndpoint(URI.create("http://localhost:0")).setFindPort(true);this.infoServer = builder.build();//TODO 往这个HttpServer上面绑定了多个servletthis.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);this.infoServer.addInternalServlet(null, "/getFileChecksum/*",FileChecksumServlets.GetServlet.class);this.infoServer.setAttribute("datanode", this);this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);this.infoServer.addServlet(null, "/blockScannerReport",BlockScanner.Servlet.class);//TODO 启动HttpServerthis.infoServer.start();InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);...}
//TODO 初始化RPC服务initIpcServer(conf);
initIpcServer 方法主要初始化了 RPC 服务的对象 ipcServer
private void initIpcServer(Configuration conf) throws IOException {...//下面代码就是用来创建一个RPC服务端对象ipcServer = new RPC.Builder(conf).setProtocol(ClientDatanodeProtocolPB.class).setInstance(service).setBindAddress(ipcAddr.getHostName()).setPort(ipcAddr.getPort()).setNumHandlers(conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false).setSecretManager(blockPoolTokenSecretManager).build();InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =new InterDatanodeProtocolServerSideTranslatorPB(this);//DataNode和DataNode之间进行通信协议service = InterDatanodeProtocolService.newReflectiveBlockingService(interDatanodeProtocolXlator);DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,ipcServer);...}
e.初始化 BlockPoolManager 对象,向 NameNode 注册并发送心跳
blockPoolManager = new BlockPoolManager(this);//TODO 重要!!// 1)向NameNode注册// 2)跟NameNode进行心跳blockPoolManager.refreshNamenodes(conf);
假设一个集群里面4个NameNode,2个联邦,每个联邦都是HA的联邦一:hadoop1(Active);hadoop2(StandBy)(BlockPool是同一个)联邦二:hadoop3(Active);hadoop4(StandBy)(BlockPool是同一个)
private void doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {assert Thread.holdsLock(refreshNamenodesLock);Set<String> toRefresh = Sets.newLinkedHashSet();Set<String> toAdd = Sets.newLinkedHashSet();Set<String> toRemove;synchronized (this) {//遍历nameservices,通常来说,nameservice只有一个,而对于联邦机制,则会有多个for (String nameserviceId : addrMap.keySet()) {if (bpByNameserviceId.containsKey(nameserviceId)) {toRefresh.add(nameserviceId);} else {//有多少个联邦,toAdd里面就对应有多少个nameserviceIdtoAdd.add(nameserviceId);}}...// Step 3. Start new nameservicesif (!toAdd.isEmpty()) {LOG.info("Starting BPOfferServices for nameservices: " +Joiner.on(",").useForNull("<default>").join(toAdd));//TODO 遍历所有的联邦,每个联邦里面会有2个NameNode(HA)// BPOfferService:一个联邦对应一个for (String nsToAdd : toAdd) {ArrayList<InetSocketAddress> addrs =Lists.newArrayList(addrMap.get(nsToAdd).values());//TODO 重要的关系//一个联邦对应一个BPOfferService//一个联邦里面的一个NameNode就是一个BPServiceActor//也就是正常来说,一个BPOfferService对应两个BPServiceActorBPOfferService bpos = createBPOS(addrs);bpByNameserviceId.put(nsToAdd, bpos);//假设有两个联邦,那么offerServices这个集合中就有两个BPOfferServiceofferServices.add(bpos);}}//TODO DataNode向NameNode进行注册和心跳startAll();}...}
这里注意几个概念:
nameservice:一个联邦对应一个
BPOfferService:一个联邦对应一个
BPServiceActor:联邦中的每个 NameNode 对应一个
假设集群采用了联邦机制,存在两个联邦,每个联邦内部包含两个 NameNode,那么这里就有两个 nameserviceId,两个BPOfferService 以及 四个 BPServiceActor,结构图如下:


synchronized void startAll() throws IOException {try {UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {//TODO 遍历所有的BPOfferService(即遍历所有的联邦)for (BPOfferService bpos : offerServices) {//TODO 重要,启动所有的联邦bpos.start();}return null;}});} catch (InterruptedException ex) {IOException ioe = new IOException();ioe.initCause(ex.getCause());throw ioe;}}
void start() {//TODO 遍历所有的BPServiceActor(即遍历所有的NameNode)for (BPServiceActor actor : bpServices) {//TODO 启动actor.start();}}
void start() {if ((bpThread != null) && (bpThread.isAlive())) {//Thread is started alreadyreturn;}bpThread = new Thread(this, formatThreadName());bpThread.setDaemon(true); // needed for JUnit testing//TODO 启动线程,直接观察run方法的逻辑bpThread.start();}
这里每个 BPServiceActor 内部有一个 bpThread 线程对象,直接查看 BPServiceActor 的 run() 方法,该方法主要做了两件事:
DataNode 向 NameNode 注册
注册成功后发送心跳
其中,注册调用 connectToNNAndHandshake() 方法,这里通过一个 while(true) 循环来进行不间断地注册,直到注册成功(网络可能导致注册失败,注册失败后间隔 5 秒进行重试)
public void run() {...try {//通过while (true) 来保证不间断注册,直到注册成功(网络可能导致注册失败)while (true) {try {//TODO 注册的核心代码connectToNNAndHandshake();break;} catch (IOException ioe) {runningState = RunningState.INIT_FAILED;if (shouldRetryInit()) {//TODO 如果注册失败后休息5秒重试sleepAndLogInterrupts(5000, "initializing");} else {runningState = RunningState.FAILED;return;}}}//注册结束了runningState = RunningState.RUNNING;while (shouldRun()) {try {//TODO 发送心跳offerService();} catch (Exception ex) {sleepAndLogInterrupts(5000, "offering service");}}runningState = RunningState.EXITED;} catch (Throwable ex) {runningState = RunningState.FAILED;} finally {cleanUp();}}
private void connectToNNAndHandshake() throws IOException {//TODO 获取到NameNode的代理bpNamenode = dn.connectToNN(nnAddr);//获取namespace信息NamespaceInfo nsInfo = retrieveNamespaceInfo();//TODO 校验Namespace的信息bpos.verifyAndSetNamespaceInfo(nsInfo);//TODO 注册register(nsInfo);}
void register(NamespaceInfo nsInfo) throws IOException { //TODO 创建注册信息 bpRegistration = bpos.createRegistration(); while (shouldRun()) { try { /TODO 这里的bpNamenode是代理对象,所以下面调用的其实是NameNode的registerDatanode方法 //准确来说应该是NameNodeRpcServer bpRegistration = bpNamenode.registerDatanode(bpRegistration); //如果代码执行到这里,说明DataNode的注册已经完成了 bpRegistration.setNamespaceInfo(nsInfo); break; } catch(EOFException e) { ...}
@Overridepublic DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)throws IOException {//检查NN是否启动checkNNStartup();verifySoftwareVersion(nodeReg);//TODO 注册DataNodenamesystem.registerDatanode(nodeReg);return nodeReg;}
void registerDatanode(DatanodeRegistration nodeReg) throws IOException {writeLock();try {//DatanodeManager用于处理关于Datanode的事getBlockManager().getDatanodeManager().registerDatanode(nodeReg);checkSafeMode();} finally {writeUnlock();}}
DataNodeManager.registerDatanode() 方法,该方法内容很多,最主要的两个步骤如下:
将 DataNode 信息注册到 NameNode 中,所谓注册的本质就是在 NameNode 的各种数据结构中添加该 DataNode 的信息
将新注册的 DataNode 放到 heartbeatManager 心跳管理器中进行管理,用于后续 DN 和 NN 进行心跳
public void registerDatanode(DatanodeRegistration nodeReg)throws DisallowedDatanodeException, UnresolvedTopologyException {...//TODO 注册新的DataNode,注册的本质就是往NameNode的各种数据结构中添加信息addDatanode(nodeDescr);//TODO 把新注册的DataNode放到heartbeatManager里面heartbeatManager.addDatanode(nodeDescr);...}




