暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

HDFS|DataNode 启动流程详解

大数据记事本 2021-05-07
1785
    DataNode 作为 HDFS 集群的数据节点,用于存储集群的实际数据。DataNode 的启动流程主要分为以下四个步骤:
  • 启动 DataXceiverServer 服务

  • 启动 HttpServer 服务

  • 启动 DataNode RPC 服务

  • DataNode 向 NameNode 注册并定时进行心跳

整个启动流程如下图所示:

对于 DataNode,同样需要重点关注该类的注释信息
    DataNode is a class (and program) that stores a set of
    * blocks for a DFS deployment. A single deployment can
    * have one or many DataNodes. Each DataNode communicates
    * regularly with a single NameNode. It also communicates
    * with client code and other DataNodes from time to time.
    *
    * TODO
    * DataNode存储HDFS上的block文件块,在一个文件系统中可以有多个
    * DataNode,每个DataNode周期性的跟NameNode进行通信,客户端也可以
    * 和DataNode进行交互,或者DataNode之间也可以进行交互
    *
    * DataNodes store a series of named blocks. The DataNode
    * allows client code to read these blocks, or to write new
    * block data. The DataNode may also, in response to instructions
    * from its NameNode, delete blocks or copy blocks to/from other
    * DataNodes.
    *
    * TODO
    * DataNode存储一系列block,DataNode允许客户端去读写block
    * DataNode也会去响应NameNode。响应NameNode发送过来的一系列指令,
    * 包括删除block,复制block等
    *
    * The DataNode maintains just one critical table:
    * block-> stream of bytes (of BLOCK_SIZE or less)
    *
    * This info is stored on a local disk. The DataNode
    * reports the table's contents to the NameNode upon startup
    * and every so often afterwards.
    *
    * TODO
    * DataNode管理了一个重要的表:block-> stream of bytes(块和对应的块大小)
    * 这些信息存储在本地磁盘,DataNode启动的时候会把这些信息汇报给NameNode
    * 启动后也会不断地进行汇报
    *
    * DataNodes spend their lives in an endless loop of asking
    * the NameNode for something to do. A NameNode cannot connect
    * to a DataNode directly; a NameNode simply returns values from
    * functions invoked by a DataNode.
    *
    * TODO
    * NameNode是不能直接去操作DataNode的。DataNode启动以后,会跟NameNode进行心跳,NameNode
    * 接收到心跳以后,如果需要这个DataNode做什么事,就会给DataNode一个返回值(指令),DataNode
    * 接收到这些指令以后就知道NameNode想要它做什么了
    *
    * DataNodes maintain an open server socket so that client code
    * or other DataNodes can read/write data. The host/port for
    * this server is reported to the NameNode, which then sends that
    * information to clients or other DataNodes that might be interested.
    *
    * TODO
    * DataNode开放了socket服务,让客户端或者别的DataNode进行读写数据
    * DataNode启动的时候会把自己的主机名和端口号汇报给NameNode
    * 也就是说如果client或者另外的DataNode想要去访问某个DataNode,首先要和NameNode进行通信
    * 从NameNode那里获取目标Data的主机名和端口号,这样才能访问到对应的DataNode
    从注释中可以知道:
    • 一个集群中可以有多个DataNode,这些DataNode就是用来存储数据的 
    • DataNode启动了以后会周期性地和NameNode进行通信(心跳,块汇报) 
    • NameNode不能直接操作DataNode,而是通过心跳返回指令的方式去操作 DataNode 
    • DataNode启动以后开放了一个socket服务,等待别人去调用它进行读写数据
    DataNode 启动流程

    执行main方法

    • 主要调用了 secureMain() 方法

      public static void main(String args[]) {
      if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
      }
      //TODO 核心代码
      secureMain(args, null);
      }
          secureMain 方法,该方法最主要的就是进行 DataNode 的初始化,即调用 createDataNode 方法
        public static void secureMain(String args[], SecureResources resources) {
        int errorCode = 0;
        try {
        StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
        //TODO 初始化DataNode
        DataNode datanode = createDataNode(args, null, resources);
        ...
        }

        createDataNode 方法主要分为两步:

        • 实例化 DataNode

        • 启动 DataNode 后台线程,也就是启动实例化 DataNode 过程中初始化的一些服务对象,如RPC服务,BlockPoolManager服务等

            实例化 DataNode 的过程相对复杂,这里先看一下启动 DataNode 后台线程具体做了什么:

        • 启动了 BlockPoolManager服务
        • 启动了 dataXceiveServer服务
        • 启动了 DataNode RPC服务
            从这里也可以看出,在实例化 DataNode 的过程中,这些服务只是进行了初始化,并没有进行启动。
          public void runDatanodeDaemon() throws IOException {
          //启动BlockPoolManager服务
          blockPoolManager.startAll();


          // 启动dataXceiveServer服务
          dataXceiverServer.start();
          if (localDataXceiverServer != null) {
          localDataXceiverServer.start();
          }
          //启动RPC服务
          ipcServer.start();
          startPlugins(conf);
          }
              实例化 DataNode:
            public static DataNode createDataNode(String args[], Configuration conf,
            SecureResources resources) throws IOException {
            //TODO 实例化DataNode
            DataNode dn = instantiateDataNode(args, conf, resources);
            if (dn != null) {
            //TODO 启动DataNode后台线程
            dn.runDatanodeDaemon();
            }
            return dn;
            }

                instantiateDataNode 方法内部依次调用了 makeInstance 方法以及 DataNode 的构造方法,这里直接看构造方法

              DataNode(final Configuration conf,
              final List<StorageLocation> dataDirs,
              final SecureResources resources) throws IOException {
              ...
              try {
              hostName = getHostName(conf);
              LOG.info("Configured hostname is " + hostName);
              //TODO 启动DataNode
              startDataNode(conf, dataDirs, resources);
              } catch (IOException ie) {
              shutdown();
              throw ie;
              }
              ...
              }
                  构造方法主要调用了 startDataNode 方法,其是DataNode启动过程中一个极其重要的方法,主要做了5件事
              1. 初始化 DataStorage
              2. 初始化 DataXceiverServer 服务
              3. 启动 HttpServer 服务
              4. 初始化 IpcServer,即RPC 服务
              5. 初始化 BlockPoolManager 对象,用于 DataNode 向 NameNode 注册并定时进行心跳

              对应流程图中的如下部分:

                void startDataNode(Configuration conf, 
                List<StorageLocation> dataDirs,
                SecureResources resources
                ) throws IOException {


                ...
                //1.初始化DataStorage
                storage = new DataStorage();


                // global DN settings
                registerMXBean();
                //TODO 2.初始化DataXceiverServer
                initDataXceiver(conf);
                //TODO 3.启动HttpServer服务
                startInfoServer(conf);
                pauseMonitor = new JvmPauseMonitor(conf);
                pauseMonitor.start();


                this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();


                dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
                LOG.info("dnUserName = " + dnUserName);
                LOG.info("supergroup = " + supergroup);
                //TODO 4.初始化RPC服务
                initIpcServer(conf);


                metrics = DataNodeMetrics.create(conf, getDisplayName());
                metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);




                //TODO 5.创建了BlockPoolManager
                /**
                * BlockPool:正常情况下,一个集群只有一个BlockPool
                * 如果我们采用的是联邦机制,就会有多个namenode,也就会有多个联邦
                * 一个联邦就是一个BlockPool
                * 假设一个集群里面4个NameNode,2个联邦
                * 联邦一:hadoop1(Active);hadoop2(StandBy)(BlockPool是同一个)
                * 联邦二:hadoop3(Active);hadoop4(StandBy)(BlockPool是同一个)
                */
                blockPoolManager = new BlockPoolManager(this);
                //TODO 重要!!
                // 1)向NameNode注册
                // 2)跟NameNode进行心跳
                blockPoolManager.refreshNamenodes(conf);


                readaheadPool = ReadaheadPool.getInstance();
                saslClient = new SaslDataTransferClient(dnConf.conf,
                dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
                saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
                }


                a.初始化 DataStorage
                  //1.初始化DataStorage
                  storage = new DataStorage();

                  b.初始化 DataXceiverServer 服务

                    //TODO 2.初始化DataXceiverServer
                    initDataXceiver(conf);
                        这个 DataXceiverServer 服务的作用就是和客户端或者其它 DataNode 进行交互,完成数据的读写操作
                      private void initDataXceiver(Configuration conf) throws IOException {
                      ...


                      //TODO 实例化了一个DataXceiverServer
                      // 这个东西就是DataNode用来接收客户端和其他DataNode传过来数据的服务
                      xserver = new DataXceiverServer(tcpPeerServer, conf, this);
                      //设置为后台线程
                      this.dataXceiverServer = new Daemon(threadGroup, xserver);
                      this.threadGroup.setDaemon(true); // auto destroy when empty


                      ...
                      }
                      c.启动HttpServer

                          和 NameNode 启动过程一样,DataNode 启动过程中也会启动一个 HttpServer2 服务 infoServer,然后在上面绑定多个 servlet

                        private void startInfoServer(Configuration conf)
                        throws IOException {
                        Configuration confForInfoServer = new Configuration(conf);
                        confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
                        //TODO 用来接收Http请求
                        HttpServer2.Builder builder = new HttpServer2.Builder()
                        .setName("datanode")
                        .setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
                        .addEndpoint(URI.create("http://localhost:0"))
                        .setFindPort(true);


                        this.infoServer = builder.build();
                        //TODO 往这个HttpServer上面绑定了多个servlet
                        this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
                        this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
                        FileChecksumServlets.GetServlet.class);


                        this.infoServer.setAttribute("datanode", this);
                        this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
                        this.infoServer.addServlet(null, "/blockScannerReport",
                        BlockScanner.Servlet.class);
                        //TODO 启动HttpServer
                        this.infoServer.start();
                        InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
                        ...
                        }
                        d.初始化 RPC 服务
                          //TODO 初始化RPC服务
                          initIpcServer(conf);

                              initIpcServer 方法主要初始化了 RPC 服务的对象 ipcServer

                            private void initIpcServer(Configuration conf) throws IOException {
                            ...
                            //下面代码就是用来创建一个RPC服务端对象
                            ipcServer = new RPC.Builder(conf)
                            .setProtocol(ClientDatanodeProtocolPB.class)
                            .setInstance(service)
                            .setBindAddress(ipcAddr.getHostName())
                            .setPort(ipcAddr.getPort())
                            .setNumHandlers(
                            conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
                            DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
                            .setSecretManager(blockPoolTokenSecretManager)
                            .build();


                            InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
                            new InterDatanodeProtocolServerSideTranslatorPB(this);
                            //DataNode和DataNode之间进行通信协议
                            service = InterDatanodeProtocolService
                            .newReflectiveBlockingService(interDatanodeProtocolXlator);
                            DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
                            ipcServer);


                            ...
                            }

                            e.初始化 BlockPoolManager 对象,向 NameNode 注册并发送心跳

                              blockPoolManager = new BlockPoolManager(this);
                              //TODO 重要!!
                              // 1)向NameNode注册
                              // 2)跟NameNode进行心跳
                              blockPoolManager.refreshNamenodes(conf);
                                  这里注意一下 BlockPool ,一般情况下,一个集群只有一个 BlockPool。在 Hadoop 2.x 版本,为了解决 NameNode 内存受限的问题,引入了联邦机制(一般集群节点数上千才会用到),也就是将 HDFS 集群的元数据信息分片存储在多个 NameNode 节点上(这里区别于HA,HA是两个NN的元数据信息是一致的,而联邦机制下多个NN上的元数据信息是不一致的)。在这种情况下,一个联邦就是一个 BlockPool。
                              举个例子:
                                假设一个集群里面4个NameNode,2个联邦,每个联邦都是HA的
                                联邦一:hadoop1(Active);hadoop2(StandBy)(BlockPool是同一个)
                                联邦二:hadoop3(Active);hadoop4(StandBy)(BlockPool是同一个)
                                    blockPoolManager.refreshNamenodes()  内部调用了 doRefreshNamenodes() 方法:
                                  private void doRefreshNamenodes(
                                  Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
                                  assert Thread.holdsLock(refreshNamenodesLock);


                                  Set<String> toRefresh = Sets.newLinkedHashSet();
                                  Set<String> toAdd = Sets.newLinkedHashSet();
                                  Set<String> toRemove;


                                  synchronized (this) {
                                  //遍历nameservices,通常来说,nameservice只有一个,而对于联邦机制,则会有多个
                                  for (String nameserviceId : addrMap.keySet()) {
                                  if (bpByNameserviceId.containsKey(nameserviceId)) {
                                  toRefresh.add(nameserviceId);
                                  } else {
                                  //有多少个联邦,toAdd里面就对应有多少个nameserviceId
                                  toAdd.add(nameserviceId);
                                  }
                                  }
                                  ...
                                  // Step 3. Start new nameservices
                                  if (!toAdd.isEmpty()) {
                                  LOG.info("Starting BPOfferServices for nameservices: " +
                                  Joiner.on(",").useForNull("<default>").join(toAdd));


                                  //TODO 遍历所有的联邦,每个联邦里面会有2个NameNode(HA)
                                  // BPOfferService:一个联邦对应一个
                                  for (String nsToAdd : toAdd) {
                                  ArrayList<InetSocketAddress> addrs =
                                  Lists.newArrayList(addrMap.get(nsToAdd).values());
                                  //TODO 重要的关系
                                  //一个联邦对应一个BPOfferService
                                  //一个联邦里面的一个NameNode就是一个BPServiceActor
                                  //也就是正常来说,一个BPOfferService对应两个BPServiceActor
                                  BPOfferService bpos = createBPOS(addrs);
                                  bpByNameserviceId.put(nsToAdd, bpos);
                                  //假设有两个联邦,那么offerServices这个集合中就有两个BPOfferService
                                  offerServices.add(bpos);
                                  }
                                  }
                                  //TODO DataNode向NameNode进行注册和心跳
                                  startAll();
                                  }
                                  ...
                                  }

                                  这里注意几个概念:

                                  • nameservice:一个联邦对应一个

                                  • BPOfferService:一个联邦对应一个

                                  • BPServiceActor:联邦中的每个 NameNode 对应一个

                                      假设集群采用了联邦机制,存在两个联邦,每个联邦内部包含两个 NameNode,那么这里就有两个 nameserviceId两个BPOfferService 以及 四个 BPServiceActor,结构图如下:

                                  之后会调用 BlockPoolManager.startAll() 方法向 NameNode 进行注册和心跳
                                  其中注册的过程对应流程图中的如下部分:

                                      startAll() 方法如下:
                                    synchronized void startAll() throws IOException {
                                    try {
                                    UserGroupInformation.getLoginUser().doAs(
                                    new PrivilegedExceptionAction<Object>() {
                                    @Override
                                    public Object run() throws Exception {
                                    //TODO 遍历所有的BPOfferService(即遍历所有的联邦)
                                    for (BPOfferService bpos : offerServices) {
                                    //TODO 重要,启动所有的联邦
                                    bpos.start();
                                    }
                                    return null;
                                    }
                                    });
                                    } catch (InterruptedException ex) {
                                    IOException ioe = new IOException();
                                    ioe.initCause(ex.getCause());
                                    throw ioe;
                                    }
                                    }
                                        内部调用 BPOfferService.start() 方法
                                      void start() {
                                      //TODO 遍历所有的BPServiceActor(即遍历所有的NameNode)
                                      for (BPServiceActor actor : bpServices) {
                                      //TODO 启动
                                      actor.start();
                                      }
                                      }
                                          该方法会遍历所有的 BPServiceActor,也就是每个 NameNode,调用其 start() 方法
                                        void start() {
                                        if ((bpThread != null) && (bpThread.isAlive())) {
                                        //Thread is started already
                                        return;
                                        }
                                        bpThread = new Thread(this, formatThreadName());
                                        bpThread.setDaemon(true); // needed for JUnit testing
                                        //TODO 启动线程,直接观察run方法的逻辑
                                        bpThread.start();
                                        }

                                            这里每个 BPServiceActor 内部有一个 bpThread 线程对象,直接查看 BPServiceActor 的 run() 方法,该方法主要做了两件事:

                                        • DataNode 向 NameNode 注册

                                        • 注册成功后发送心跳

                                            其中,注册调用 connectToNNAndHandshake() 方法,这里通过一个 while(true) 循环来进行不间断地注册,直到注册成功(网络可能导致注册失败,注册失败后间隔 5 秒进行重试

                                          public void run() {
                                          ...
                                          try {
                                          //通过while (true) 来保证不间断注册,直到注册成功(网络可能导致注册失败)
                                          while (true) {
                                          try {
                                          //TODO 注册的核心代码
                                          connectToNNAndHandshake();
                                          break;
                                                } catch (IOException ioe) {
                                          runningState = RunningState.INIT_FAILED;
                                          if (shouldRetryInit()) {
                                          //TODO 如果注册失败后休息5秒重试
                                          sleepAndLogInterrupts(5000, "initializing");
                                          } else {
                                          runningState = RunningState.FAILED;
                                          return;
                                          }
                                          }
                                          }
                                          //注册结束了
                                          runningState = RunningState.RUNNING;
                                          while (shouldRun()) {
                                          try {
                                          //TODO 发送心跳
                                          offerService();
                                          } catch (Exception ex) {
                                          sleepAndLogInterrupts(5000, "offering service");
                                          }
                                          }
                                          runningState = RunningState.EXITED;
                                          } catch (Throwable ex) {
                                          runningState = RunningState.FAILED;
                                          } finally {
                                          cleanUp();
                                          }
                                          }
                                              注册的过程调用了 connectToNNAndHandshake() 方法:
                                            private void connectToNNAndHandshake() throws IOException {
                                            //TODO 获取到NameNode的代理
                                            bpNamenode = dn.connectToNN(nnAddr);
                                            //获取namespace信息
                                            NamespaceInfo nsInfo = retrieveNamespaceInfo();


                                            //TODO 校验Namespace的信息
                                            bpos.verifyAndSetNamespaceInfo(nsInfo);


                                            //TODO 注册
                                            register(nsInfo);
                                            }
                                                首先获取 NameNode 的 RPC 代理对象,通过这个代理对象就可以调用 NameNode 的方法,获取namespace信息并进行校验,然后执行 DataNode 的注册,即调用 register() 方法
                                              void register(NamespaceInfo nsInfo) throws IOException {  //TODO 创建注册信息  bpRegistration = bpos.createRegistration();  while (shouldRun()) {    try {      /TODO 这里的bpNamenode是代理对象,所以下面调用的其实是NameNode的registerDatanode方法      //准确来说应该是NameNodeRpcServer      bpRegistration = bpNamenode.registerDatanode(bpRegistration);      //如果代码执行到这里,说明DataNode的注册已经完成了      bpRegistration.setNamespaceInfo(nsInfo);      break;    } catch(EOFException e) {     ...}
                                                  register 方法中,首先会初始化 DataNode 的注册信息 bpRegistration,然后通过 NameNode RPC代理对象调用其 registerDatanode 方法进行注册(准确来说这里调用的是 NameNodeRpcServer 的 registerDatanode 方法)
                                                @Override 
                                                public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
                                                throws IOException {
                                                //检查NN是否启动
                                                checkNNStartup();
                                                verifySoftwareVersion(nodeReg);
                                                //TODO 注册DataNode
                                                namesystem.registerDatanode(nodeReg);
                                                return nodeReg;
                                                }
                                                    该方法首先检查 NameNode 是否启动,然后调用 FSNamesystem.registerDatanode 进行 DataNode 的注册
                                                  void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
                                                  writeLock();
                                                  try {
                                                  //DatanodeManager用于处理关于Datanode的事
                                                  getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
                                                  checkSafeMode();
                                                  } finally {
                                                  writeUnlock();
                                                  }
                                                  }

                                                      DataNodeManager.registerDatanode() 方法,该方法内容很多,最主要的两个步骤如下:

                                                  • 将 DataNode 信息注册到 NameNode 中,所谓注册的本质就是在 NameNode 的各种数据结构中添加该 DataNode 的信息

                                                  • 将新注册的 DataNode 放到 heartbeatManager 心跳管理器中进行管理,用于后续 DN 和 NN 进行心跳

                                                    public void registerDatanode(DatanodeRegistration nodeReg)
                                                    throws DisallowedDatanodeException, UnresolvedTopologyException {
                                                    ...
                                                    //TODO 注册新的DataNode,注册的本质就是往NameNode的各种数据结构中添加信息
                                                    addDatanode(nodeDescr);


                                                    //TODO 把新注册的DataNode放到heartbeatManager里面
                                                    heartbeatManager.addDatanode(nodeDescr);
                                                    ...
                                                    }
                                                        此时 DataNode 已经向 NameNode 注册成功,接着就是定时进行心跳,所以继续回到 BPServiceActor.run() 方法。
                                                        心跳的过程相对复杂,后续会单独写一篇进行详细分析。
                                                    文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                    评论