HDFS源码学习(13)——DataNode启动过程
18 October 2012
##main()
public static void main(String args[]) {
try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
DataNode datanode = createDataNode(args, null);
if (datanode != null)
datanode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
##createDataNode()
public static DataNode createDataNode(String args[],
Configuration conf) throws IOException {
// 初始化datanode
DataNode dn = instantiateDataNode(args, conf);
// 启动datanode后台线程
runDatanodeDaemon(dn);
return dn;
}
1.instantiateDataNode()
public static DataNode instantiateDataNode(String args[],
Configuration conf) throws IOException {
// 处理配置
if (conf == null)
conf = new Configuration();
if (!parseArguments(args, conf)) {
printUsage();
return null;
}
if (conf.get("dfs.network.script") != null) {
LOG.error("This configuration for rack identification is not supported" +
" anymore. RackID resolution is handled by the NameNode.");
System.exit(-1);
}
// 获取data目录配置
String[] dataDirs = conf.getStrings("dfs.data.dir");
dnThreadName = "DataNode: [" +
StringUtils.arrayToString(dataDirs) + "]";
//创建datanode实例
return makeInstance(dataDirs, conf);
} #### 1.1. makeInfstance() 该方法主要用于检查给定的data目录中至少有一个可以创建,并实例化DataNode
public static DataNode makeInstance(String[] dataDirs, Configuration conf)
throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
if (dirs.size() > 0)
return new DataNode(conf, dirs);
LOG.error("All directories in dfs.data.dir are invalid.");
return null;
} #### 1.2 new DataNode()
DataNode(Configuration conf,
AbstractList<File> dataDirs) throws IOException {
// 设置配置信息
super(conf);
datanodeObject = this;
supportAppends = conf.getBoolean("dfs.support.append", false);
this.conf = conf;
try {
// 启动DataNode
startDataNode(conf, dataDirs);
} catch (IOException ie) {
shutdown();
throw ie;
}
} ##### 1.2.1 startDataNode() 代码较长,仅列出主要步骤:
- 设置配置信息
- 向NameNode发起RPC请求,获取版本和StorageID信息
- 获取启动配置
- 初始化存储信息,构建FSDataSet
- 获取可用的端口号
- 调整注册信息中的机器名,加上端口号
- 初始化DataXceiverServer
- 设置blockReport和heartbeat各自的时间间隔
- 初始化blockScanner
- 初始胡并启动servlet info server,提供内容查询的http服务
- 初始化ipc server,该ipc server主要用于完成DataNode间的block recover。
runDatanodeDaemon()
public static void runDatanodeDaemon(DataNode dn) throws IOException {
if (dn != null) {
//register datanode
dn.register();
dn.dataNodeThread = new Thread(dn, dnThreadName);
dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
dn.dataNodeThread.start();
}
}
2.1 向NameNode注册 —— dn.register();
private void register() throws IOException {
if (dnRegistration.getStorageID().equals("")) {
setNewStorageID(dnRegistration);
}
while(shouldRun) {
try {
// reset name to machineName. Mainly for web interface.
dnRegistration.name = machineName + ":" + dnRegistration.getPort();
// 通过NameProtocal向NameNode注册
dnRegistration = namenode.register(dnRegistration);
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + getNameNodeAddr());
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {}
}
}
assert ("".equals(storage.getStorageID())
&& !"".equals(dnRegistration.getStorageID()))
|| storage.getStorageID().equals(dnRegistration.getStorageID()) :
"New storageID can be assigned only if data-node is not formatted";
if (storage.getStorageID().equals("")) {
storage.setStorageID(dnRegistration.getStorageID());
storage.writeAll();
LOG.info("New storage id " + dnRegistration.getStorageID()
+ " is assigned to data-node " + dnRegistration.getName());
}
if(! storage.getStorageID().equals(dnRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ dnRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
if (supportAppends) {
Block[] bbwReport = data.getBlocksBeingWrittenReport();
long[] blocksBeingWritten = BlockListAsLongs.convertToArrayLongs(bbwReport);
//如果支持append,则报告正在写入的block信息
namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten);
}
// 调整下一次的BR时间,使其在下次heartbeat时进行
scheduleBlockReport(initialBlockReportDelay);
}
2.2 启动datanode线程 —— dn.dataNodeThread.start();
datanode线程本身非常简单,不停调用offerSevice提供服务:
public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);
// start dataXceiveServer
dataXceiverServer.start();
new Thread(new CrashVolumeChecker()).start();//added by wukong
while (shouldRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
shutdown();
}
2.2.1 offerService()
offerService的核心是周期性进行heartbeat和blockReport,主要流程如下:
public void offerService() throws Exception {
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +
" Initial delay: " + initialBlockReportDelay + "msec");
LOG.info("using DELETEREPORT_INTERVAL of " + deletedReportInterval + "msec");
LOG.info("using HEARTBEAT_INTERVAL of " + heartBeatInterval + "msec");
LOG.info("using HEARTBEAT_EXPIRE_INTERVAL of " + heartbeatExpireInterval + "msec");
//
// Now loop for a long time....
//
while (shouldRun) {
try {
long startTime = now();
//
// Every so often, send heartbeat or block-report
//
if (startTime - lastHeartbeat > heartBeatInterval /* 3 secs*/) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
lastHeartbeat = startTime;
DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
xmitsInProgress.get(),
getXceiverCount());
myMetrics.heartbeats.inc(now() - startTime);
//LOG.info("Just sent heartbeat, with name " + localName);
if (!processCommand(cmds))
continue;
}
reportReceivedBlocks();
DatanodeCommand cmd = blockReport();
processCommand(cmd);
// start block scanner
if (blockScanner != null && blockScannerThread == null &&
upgradeManager.isUpgradeCompleted()) {
LOG.info("Starting Periodic block scanner.");
blockScannerThread = new Daemon(blockScanner);
blockScannerThread.start();
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedAndDeletedBlockList) {
if (waitTime > 0 && receivedAndDeletedBlockList.size() == 0) {
try {
receivedAndDeletedBlockList.wait(waitTime);
} catch (InterruptedException ie) {
}
delayBeforeBlockReceived();
}
} // synchronized
} catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn("DataNode is shutting down: " +
StringUtils.stringifyException(re));
shutdown();
return;
}
LOG.warn(StringUtils.stringifyException(re));
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
} // while (shouldRun)
} // offerService
blog comments powered by Disqus