暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

HBase2.x源码剖析:HMaster启动过程

大数据开发运维架构 2020-04-03
1779

一、概述

   本文基于HBase-2.2.1分析HMaster的启动流程。由于HMaster启动代码比较多,这里只是将主要函数拿出来说一下,其实主要做了以下几件事:

    1).获取配置文件,对HBasemaster进行了实例化,由于HMaster继承自HRregionServer,先调用HRegionServer的构造函数进行初始化;

    2).HRegionServer构造函数主要是对做了一些校验,如果启用了Kerberos先进行进行了登录,启动zookeper对于hbase znode的监听,获取master对应znode节点变化,同时启动对于集群状态变化的监听;

    3).HMasterr构造函数启动balaner,实例化MetricsMaster统计Master数据供webUI接口调用;

    4).最后调用run函数,循环向zk中注册成为active Master,一直阻塞循环直到写入成功为止,成功后进入finishInitialization()函数并初始化master。

二、源码剖析

1.一般我们单独启动HMaster的启动命令是:

    /data/app/hbase-2.2.1/bin/hbase-daemon.sh start master

    2.hbase-daemon.sh脚本做了以下工作:

      1).设置master进程文件目录,默认是/tmp,进程文件名为:hbase-${USER}-master.pid

        2).master日志路径,默认为$HBASE_HOME/logs

        3).获取JAVA_HOME,所以环境变量或者hbase-env.sh一定配置JAVA_HOME

        4).最后调用以下函数进行启动

      (start)
      check_before_start
      hbase_rotate_log $HBASE_LOGOUT
      hbase_rotate_log $HBASE_LOGGC
      echo running $command, logging to $HBASE_LOGOUT
      $thiscmd --config "${HBASE_CONF_DIR}" \
      foreground_start $command $args < dev/null > ${HBASE_LOGOUT} 2>&1 &
      disown -h -r
      sleep 1; head "${HBASE_LOGOUT}"
      ;;

      3.入口函数是是HMaster类的main函数,代码如下:

          /**
        * @see org.apache.hadoop.hbase.master.HMasterCommandLine
        */
        public static void main(String [] args) {
        LOG.info("STARTING service " + HMaster.class.getSimpleName());
            //这里没啥东西  就是打印版本信息
        VersionInfo.logVersion();
            //实例化一个HMasterCommandLine对象,执行该对象的doMain(args)方法
        new HMasterCommandLine(HMaster.class).doMain(args);
        }

        4.下面看HMasterCommandLine的doMain(),这个函数在其父类ServerCommandLine中,代码如下:

            public void doMain(String args[]) {
          try {
              //通过ToolRunner机制执行启动/停止等命令
          int ret = ToolRunner.run(HBaseConfiguration.create(), this, args);
          if (ret != 0) {
          System.exit(ret);
          }
          } catch (Exception e) {
          LOG.error("Failed to run", e);
          System.exit(-1);
          }
          }

          5.调用ToolRunner.run(),将conf,args封装到了GenericOptionsParser,然后返回到HMasterCommandLine的run()方法中去执行,封装代码:

            public static int run(Configuration conf, Tool tool, String[] args) 
            throws Exception{
            if(conf == null) {
            conf = new Configuration();
            }
                //封装成了GenericOptionsParser 对象,然后调用HMasterCommandLine的run函数启动master
            GenericOptionsParser parser = new GenericOptionsParser(conf, args);
            //set the configuration back, so that Tool can configure itself
                tool.setConf(conf);  
            //get the args w/o generic hadoop args
            String[] toolArgs = parser.getRemainingArgs();
            return tool.run(toolArgs);
            }

            6.后面直接看HMasterCommandLine的run()函数,代码如下:

              @Override
              public int run(String args[]) throws Exception {
              Options opt = new Options();
              opt.addOption("localRegionServers", true,
              "RegionServers to start in master process when running standalone");
              opt.addOption("masters", true, "Masters to start in this process");
              opt.addOption("minRegionServers", true, "Minimum RegionServers needed to host user tables");
              opt.addOption("backup", false, "Do not try to become HMaster until the primary fails");

                  CommandLine cmd; //获取cmd 其实就一个start 参数
              try {
              cmd = new GnuParser().parse(opt, args);
              } catch (ParseException e) {
              LOG.error("Could not parse: ", e);
              usage(null);
              return 1;
              }
                  //下面就是一些判断了,直接跳过 下面代码省略掉一部分判断,各种判断
              if (cmd.hasOption("minRegionServers")) {
                  .....................................
              @SuppressWarnings("unchecked")
              List<String> remainingArgs = cmd.getArgList();
              if (remainingArgs.size() != 1) {
              usage(null);
              return 1;
              }

              String command = remainingArgs.get(0);

              if ("start".equals(command)) {
                  //  跳了一圈最后执行的是这个startMaster()函数
              return startMaster();
              } else if ("stop".equals(command)) {
              return stopMaster();
              } else if ("clear".equals(command)) {
              return (ZNodeClearer.clear(getConf()) ? 0 : 1);
              } else {
              usage("Invalid command: " + command);
              return 1;
              }
              }

              7.下面这个startMaster()函数,才是真正的HMaster启动,代码如下:

                private int startMaster() {
                    //初始化conf配置
                Configuration conf = getConf();
                    TraceUtil.initTracer(conf);
                try {
                // If 'local', defer to LocalHBaseCluster instance. Starts master
                // and regionserver both in the one JVM.
                      //如果是本地默认,这master和regionserver在一个节点上启动,我们是集群模式 直接跳过
                if (LocalHBaseCluster.isLocal(conf)) {
                .............................
                } else {

                      //直接看这里,打印配置信息到日志里面
                logProcessInfo(getConf());
                        //这里构造一个HMaster线程,由于Hmaster继承自HRegionser类,这里会构造HRegionser,执行初始化
                HMaster master = HMaster.constructMaster(masterClass, conf);
                if (master.isStopped()) {
                LOG.info("Won't bring the Master up as a shutdown is requested");
                return 1;
                }
                        //调用start 和join方法
                master.start();
                master.join();
                if(master.isAborted())
                throw new RuntimeException("HMaster Aborted");
                }
                } catch (Throwable t) {
                LOG.error("Master exiting", t);
                return 1;
                }
                return 0;
                }

                8.由于Hmaster继承自HRegionser类,这里会构造HRegionser,执行初始化,看下HRegionser的构造函数:

                  public HRegionServer(Configuration conf) throws IOException {
                  super("RegionServer"); // thread name
                  TraceUtil.initTracer(conf);
                  try {
                  this.startcode = System.currentTimeMillis();
                  this.conf = conf;
                  this.fsOk = true;
                        //下面都是一些校验
                  this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
                  this.eventLoopGroupConfig = setupNetty(this.conf);
                        //检查是否有足够的内存分配给Memstore和Block Cache使用
                        //memstore 默认分配40%的内存给Memstore 由参数hbase.regionserver.global.memstore.size控制
                        //block cache也是默认分配40% 由参数hfile.block.cache.size控制
                        //就是说memstore和Block cache 内存不能大于80%  否则报错
                  MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
                        //文件格式通过hfile.format.version配置。老版本是2  现在是3 只能是2 和3
                  HFile.checkHFileVersion(this.conf);
                  checkCodecs(this.conf);
                  this.userProvider = UserProvider.instantiate(conf);
                        //checksum校验 一般设置为flase不校验
                  FSUtils.setupShortCircuitRead(this.conf);

                  // Disable usage of meta replicas in the regionserver
                  this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
                  // Config'ed params
                        //重试次数
                  this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
                  HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
                  this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
                         //regionserver合并周期
                  this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
                  this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
                  this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);

                  this.sleeper = new Sleeper(this.msgInterval, this);

                  boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
                  this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;

                  this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10);

                  this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
                  HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);

                  this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
                  HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);

                  this.abortRequested = false;
                  this.stopped = false;

                  //这里比较重要 调用createRpcService生成RSRpcServices对象
                        //在构造RpcServer对象的过程中,HMaster和HRegionServer分别创建rpcserver服务
                        // 以使HMaster和HRegionServer响应不同的rpc服务
                  rpcServices = createRpcServices();
                  useThisHostnameInstead = getUseThisHostnameInstead(conf);
                  String hostName =
                  StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName()
                  : this.useThisHostnameInstead;
                        //根据主机名 端口和 启动时间确定服务名
                        //格式大约是这样的master.hadoop.ljs:16000:时间戳
                  serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);

                  rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
                  rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);

                  // login the zookeeper client principal (if using security)
                        //这里只有在开启了kerberos的安全集群才会进行zookeeper的登录
                        //非安全集群这里忽略
                  ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
                  HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
                  // login the server principal (if using secure Hadoop)
                  //这里也一样 非安全集群 可忽略
                  login(userProvider, hostName);
                  // init superusers and add the server principal (if using security)
                  // or process owner as default super user.
                  Superusers.initialize(conf);
                        //实例化RegionServerAccounting 它用来记录此rs中所有的memstore所占大小的实例
                  regionServerAccounting = new RegionServerAccounting(conf);
                        //HBase2.0中Master节点可以有表 默认为false
                  boolean isMasterNotCarryTable =
                  this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);

                        // isMasterNotCarryTable为false 需要创建block cache缓存和 MOB缓存
                  if (!isMasterNotCarryTable) {
                  blockCache = BlockCacheFactory.createBlockCache(conf);
                  mobFileCache = new MobFileCache(conf);
                  }

                  uncaughtExceptionHandler = new UncaughtExceptionHandler() {
                  @Override
                  public void uncaughtException(Thread t, Throwable e) {
                  abort("Uncaught exception in executorService thread " + t.getName(), e);
                  }
                  };
                        //获取HBase在hdfs上的各个存储目录 比如WAL预写日志 数据存储路径等
                  initializeFileSystem();

                  //hbase-site.xml中读取span RegionServer参数指标
                  spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());

                  this.configurationManager = new ConfigurationManager();
                  setupWindows(getConfiguration(), getConfigurationManager());

                  // Some unit tests don't need a cluster, so no zookeeper at all
                        if (!conf.getBoolean("hbase.testing.nocluster"false)) {
                          //这里获取zookeeper连接,并启动hbase znode节点的监听
                  zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
                  rpcServices.isa.getPort(), this, canCreateBaseZNode());
                  // If no master in cluster, skip trying to track one or look for a cluster status.
                  if (!this.masterless) {
                  if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
                  DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
                  this.csm = new ZkCoordinatedStateManager(this);
                  }
                            //根据zookeeper相关信息,设置MasterAddressTracker 构造一个Master地址的监听器
                  masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
                           //启动对于Hbase znode对应的监听,
                  masterAddressTracker.start();
                            //创建一个对集群状态的监听
                  clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
                  //启动监听
                  clusterStatusTracker.start();
                  } else {
                  masterAddressTracker = null;
                  clusterStatusTracker = null;
                  }
                  } else {
                  zooKeeper = null;
                  masterAddressTracker = null;
                  clusterStatusTracker = null;
                  }
                        //启动rpc  等待regionserver端和客户端的请求
                  this.rpcServices.start(zooKeeper);
                  // This violates 'no starting stuff in Constructor' but Master depends on the below chore
                  // and executor being created and takes a different startup route. Lots of overlap between HRS
                  // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
                  // Master expects Constructor to put up web servers. Ugh.
                  // class HRS. TODO.
                  this.choreService = new ChoreService(getName(), true);
                  this.executorService = new ExecutorService(getName());
                  putUpWebUI();
                  } catch (Throwable t) {
                  // Make sure we log the exception. HRegionServer is often started via reflection and the
                  // cause of failed startup is lost.
                  LOG.error("Failed construction RegionServer", t);
                  throw t;
                  }
                  }

                    9.HRegionserver父类构造函数执行完成之后,调用HMaster的构造函数,这里看下代码:

                    public HMaster(final Configuration conf)
                    throws IOException, KeeperException {
                    super(conf);
                    TraceUtil.initTracer(conf);
                    try {
                    if (conf.getBoolean(MAINTENANCE_MODE, false)) {
                    LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
                    maintenanceMode = true;
                    } else if (Boolean.getBoolean(MAINTENANCE_MODE)) {
                    LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE);
                    maintenanceMode = true;
                    } else {
                    maintenanceMode = false;
                    }
                          //存储regionserver异常的内存缓存
                    this.rsFatals = new MemoryBoundedLogMessageBuffer(
                    conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
                    LOG.info("hbase.rootdir=" + getRootDir() +
                    ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));

                    // Disable usage of meta replicas in the master
                    this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);

                          //这里修改conf配置,然后将replication相关特性写入conf中
                    decorateMasterConfiguration(this.conf);

                    // Hack! Maps DFSClient => Master for logs. HDFS made this
                    // config param for task trackers, but we can piggyback off of it.
                    if (this.conf.get("mapreduce.task.attempt.id") == null) {
                    this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
                    }
                    //实例化hbase的监控
                    this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));

                    // preload table descriptor at startup
                    this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);

                    this.maxBlancingTime = getMaxBalancingTime();
                    this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
                    HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);

                          // 集群状态发表。 比如当regionService 死了,要立即告知client ,不要用client等待socket回应了。    
                    boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
                    HConstants.STATUS_PUBLISHED_DEFAULT);
                    Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
                    conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
                    ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
                    ClusterStatusPublisher.Publisher.class);

                    if (shouldPublish) {
                    if (publisherClass == null) {
                    LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
                    ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
                    " is not set - not publishing status");
                    } else {
                    clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
                    getChoreService().scheduleChore(clusterStatusPublisherChore);
                    }
                    }

                    // Some unit tests don't need a cluster, so no zookeeper at all
                    if (!conf.getBoolean("hbase.testing.nocluster", false)) {
                            //视图将Master信息写入到zookeeper中,这里构造函数会启动一个zookeeper  对Mater的znode的监听
                            //接收zookeeer的事件
                    this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
                    } else {
                    this.activeMasterManager = null;
                    }
                    } catch (Throwable t) {
                    // Make sure we log the exception. HMaster is often started via reflection and the
                    // cause of failed startup is lost.
                    LOG.error("Failed construction of Master", t);
                    throw t;
                    }
                    }

                    10.最后调用HMaster中的run()函数,这里会启动一个循环,在成为活动主服务器后,通过调用regionserver运行循环;在成为主节点之前一直阻塞。

                      @Override
                      public void run() {
                      try {
                      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
                      Threads.setDaemonThreadRunning(new Thread(() -> {
                      try {
                      int infoPort = putUpJettyServer();
                      startActiveMasterManager(infoPort);
                      } catch (Throwable t) {
                      // Make sure we log the exception.
                      String error = "Failed to become Active Master";
                      LOG.error(error, t);
                      // Abort should have been called already.
                      if (!isAborted()) {
                      abort(error, t);
                      }
                      }
                      }), getName() + ":becomeActiveMaster");
                      }
                      // Fall in here even if we have been aborted. Need to run the shutdown services and
                      // the super run call will do this for us.
                      super.run();
                      } finally {
                      if (this.clusterSchemaService != null) {
                      // If on way out, then we are no longer active master.
                      this.clusterSchemaService.stopAsync();
                      try {
                      this.clusterSchemaService.awaitTerminated(
                      getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
                      DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
                      } catch (TimeoutException te) {
                      LOG.warn("Failed shutdown of clusterSchemaService", te);
                      }
                      }
                      this.activeMaster = false;
                      }
                      }

                          至此,HMaster启动的大体流程剖析完毕,这里只是分析了主流程,很多细节需要你自己再去看看源码。

                          如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!


                      文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论