前言
hadoop源码版本,选择2.7版本,因为该版本覆盖面广,含有hadoop大部分核心功能。整个hadoop项目,有一两百万行代码,功能点非持多,但是不必每个功能点每行代码都去了解,主要是看核心架构和核心功能点。通过阅读源码,以达到以下目的:
通过阅读核心源码,从本质上来理解hadoop的架构和功能。
帮助定位排查问题,正确理解配置项,能进行针对性的优化配置。
学习优秀开源项目的代码架构思想,借鉴优秀的代码模块。
对不可避免的bug和性能点,进行二次开发和代码优化。
hdfs源码的阅读,从main方法主线开始到最后启动完成,启动流程主线相当于纲领,主线上的核心功能会在其他章节依次展开,避免主线和分支杂糅一起,这样整个流程思路更清晰明了,本次单讲主线启动流程。
namenode启动脚本和启动类
根据namenode的启动脚本,可以看到最终脚本是通过java启动了org.apache.hadoop.hdfs.server.namenode.NameNode类,因此直接去该类下找main方法,来从头看namenode的启动流程。
启动流程
org.apache.hadoop.hdfs.server.namenode.NameNode类的main方法如下:
/**
*
* todo: namenode启动类的main方法
*
*/
public static void main(String argv[]) throws Exception {
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
/**
* todo: 创建namenode对象,进去看一看
*/
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
namenode.join();
}
} catch (Throwable e) {
LOG.error("Failed to start namenode.", e);
terminate(1, e);
}
}
核心代码就一行,调用createNameNode方法,创建namenode对象,进去看一看
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
LOG.info("createNameNode " + Arrays.asList(argv));
if (conf == null)
conf = new HdfsConfiguration();
// Parse out some generic args into Configuration.
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// Parse the rest, NN specific args.
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);
/**
* todo: 这里都是在处理参数,像我们执行hdfs命令,可以format数据目录,执行升级,回滚等操作。这些我们都不关心,只关心正常的namenode启动流程。
*/
switch (startOpt) {
case FORMAT: {
boolean aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
}
case GENCLUSTERID: {
System.err.println("Generating new cluster id:");
System.out.println(NNStorage.newClusterID());
terminate(0);
return null;
}
case FINALIZE: {
System.err.println("Use of the argument '" + StartupOption.FINALIZE +
"' is no longer supported. To finalize an upgrade, start the NN " +
" and then run `hdfs dfsadmin -finalizeUpgrade'");
terminate(1);
return null; // avoid javac warning
}
case ROLLBACK: {
boolean aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BOOTSTRAPSTANDBY: {
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return null; // avoid warning
}
case INITIALIZESHAREDEDITS: {
boolean aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BACKUP:
case CHECKPOINT: {
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role);
}
case RECOVER: {
NameNode.doRecovery(startOpt, conf);
return null;
}
case METADATAVERSION: {
printMetadataVersion(conf);
terminate(0);
return null; // avoid javac warning
}
case UPGRADEONLY: {
DefaultMetricsSystem.initialize("NameNode");
new NameNode(conf);
terminate(0);
return null;
}
default: {
DefaultMetricsSystem.initialize("NameNode");
/**
* todo: 其他case都是特殊的管理操作命令,正常启动都不会进去,那么默认就执行下面的语句,来new一个Namenode对象
*/
return new NameNode(conf);
}
}
}
createNameNode方法会解析命令行参数,根据不同的子命令,进入到不同的case,正常启动namenode,是不会进入case分支,最终就到default分支,new一个NameNode对象,实例化代码如下:
public NameNode(Configuration conf) throws IOException {
this(conf, NamenodeRole.NAMENODE);
}
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
this.conf = conf;
this.role = role;
setClientNamenodeAddress(conf);
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
// 设置nameservice之类的配置
initializeGenericKeys(conf, nsId, namenodeId);
/**
* todo: 重点,构建rpc服务端,加载数据文件都是在initialize方法里进行
*/
initialize(conf);
try {
haContext.writeLock();
state.prepareToEnterState(haContext);
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stop();
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stop();
throw e;
}
this.started.set(true);
}
实例化NameNode方法做了一些选项处理和配置项处理,重点在initialize方法,里面加载image文件到内存,构建了FSNamesystem对象,启动了namenode的rpc服务端。其中FSNamesystem是非常重要的一个对象,它还持有很多namenode的核心对象。
/**
* Initialize name-node.
* todo: 初始化namenode,构建FSNamesystem对象,并从磁盘加载数据文件到内存,最后构建并启动rpc服务端。
*
* @param conf the configuration
*/
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf);
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
if (NamenodeRole.NAMENODE == role) {
// 启动http服务,就是namenode的50070服务
startHttpServer(conf);
}
this.spanReceiverHost =
SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);
/**
* todo: 核心方法,构建FSNamesystem对象,并加载数据文件到内存
*/
loadNamesystem(conf);
/**
* todo: 创建rpc服务端
*/
rpcServer = createRpcServer(conf);
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(rpcServer.getRpcAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
/**
* todo: 启动http服务,启动rpc服务,启动一系列管理线程,如副本管理线程,块管理线程,datanode管理线程等
*/
startCommonServices(conf);
}
比较重要的是loadNamesystem和startCommonServices方法,其中loadNamesystem方法构建了FSNamesystem,之后FSNamesystem对象加载数据文件到内存。然后startCommonServices方法启动了一系列重要管理线程。
1、loadNamesystem方法
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
loadFromDisk方法返回FSNamesystem对象,该方法的注释说明已经简要说明了该方法的功能,即构建一个加载了image和edits的FSNamesystem对象,该对象包含了namespace内存命名空间。
/**
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
*
* @param conf the Configuration which specifies the storage directories
* from which to load
* @return an FSNamesystem which contains the loaded namespace
* @throws IOException if loading fails
*/
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
checkConfiguration(conf);
/**
* todo: 先构建一个FSImage对象,通过类名就知道,文件映像类,真正执行加载数据文件的类。
* 它里面最终会通过protobuf格式的loader来加载数据文件,因为namenode的数据文件是protobuf格式序列化存储的。
*/
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
/**
* todo: 创建FSNamesystem对象,传入了刚刚创建的FSImage对象
*/
FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long loadStart = monotonicNow();
try {
/**
* todo: 加载数据文件到内存
*/
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
fsImage.close();
throw ioe;
}
long timeTakenToLoadFSImage = monotonicNow() - loadStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
if (nnMetrics != null) {
nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
}
/**
* todo: 最后返回FSNamesystem对象
*/
return namesystem;
}
2、startCommonServices方法
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
/**
* todo: namesystem就是FSNamesystem对象,就是之前loadFromDisk创建的。
* 然后调用startCommonServices方法,启动一些公共服务线程。
* 另外还会做一些资源检查和块检查工作,来决定是否进入安全模式。
*/
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) {
/**
* todo: 启动http服务
*/
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
/**
* todo: 启动rpc服务
*/
rpcServer.start();
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service RPC up at: "
+ rpcServer.getServiceRpcAddress());
}
}
http和rpc服务到此就完全启动了,然后再看看startCommonServices里面做了什么
/**
* Start services common to both active and standby states
*/
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
/**
* todo: 为了线程安全,这里加了写锁,因为要访问之前加载到内存的元数据
*/
writeLock();
this.haContext = haContext;
try {
/**
* todo: 创建一个资源检查器,然后执行checkAvailableResources方法来检查磁盘空间是否够用,设置hasResourcesAvailable标志,后面会用。
*/
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
assert safeMode != null && !isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
getCompleteBlocksTotal());
/**
* todo: 设置总的块数量,同时在该方法里判断是否要进入安全模式
*/
setBlockTotal();
/**
* todo: 启动一系列管理线程
*/
blockManager.activate(conf);
} finally {
writeUnlock();
}
registerMXBean();
DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
snapshotManager.registerMXBean();
}
1) 看看如何判断进入安全模式
/**
* Set the total number of blocks in the system.
*/
public void setBlockTotal() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return;
safeMode.setBlockTotal((int)getCompleteBlocksTotal());
}
继续看看setBlockTotal方法
/**
* Set total number of blocks.
*/
private synchronized void setBlockTotal(int total) {
this.blockTotal = total;
this.blockThreshold = (int) (blockTotal * threshold);
this.blockReplQueueThreshold =
(int) (blockTotal * replQueueThreshold);
if (haEnabled) {
// After we initialize the block count, any further namespace
// modifications done while in safe mode need to keep track
// of the number of total blocks in the system.
this.shouldIncrementallyTrackBlocks = true;
}
if(blockSafe < 0)
this.blockSafe = 0;
/**
* todo: 判断是否进入安全模式
*/
checkMode();
}
checkMode是判断是否进入安全模式的方法,下面是checkMode的节选
if (smmthread == null && needEnter()) {
/**
* todo: 如果needEnter,那么就enter,进入安全模式
*/
enter();
// check if we are ready to initialize replication queues
if (canInitializeReplQueues() && !isPopulatingReplQueues()
&& !haEnabled) {
initializeReplQueues();
}
reportStatus("STATE* Safe mode ON.", false);
return;
}
needEnter会判断三件事:
汇报上来的块是否足够了
判断活的datanode数里是否达到要求
判断namenode的资源是否够,就是根据之前检查磁盘资源后设置的hasResourcesAvailable变量判断的
/**
* There is no need to enter safe mode
* if DFS is empty or {@link #threshold} == 0
*/
private boolean needEnter() {
return (threshold != 0 && blockSafe < blockThreshold) ||
(datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
(!nameNodeHasResourcesAvailable());
}
如果满足上面三个任意一个条件,就会执行enter方法进入安全模式,该方法就是设置了标志位。
/**
* Enter safe mode.
*/
private void enter() {
this.reached = 0;
this.reachedTimestamp = 0;
}
2) 再看看启动了哪些线程
public void activate(Configuration conf) {
pendingReplications.start(); //PendingReplicationMonitor线程
datanodeManager.activate(conf); //datanode管理线程,如心跳线程,下线线程
this.replicationThread.start(); //ReplicationMonitor线程
this.blockReportThread.start(); //块汇报线程
}
到此整个namenode的启动就完成了。
最后
namenode的启动流程主线就是这些,其中有几个重要的分支,没有细化深入,比如rpc服务如何构建和使用的,namenode数据文件格式如何加载和组织的,内存如何管理Inode和块的等等,这些后续再逐个细化深入分析。




