HDFS源码学习(2)——NameNode初始化

18 October 2012

main()

  public static void main(String argv[]) throws Exception {
    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      //创建nameNode
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null)
        namenode.join();
    } catch (Throwable e) {
      LOG.error(StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }

createNameNode()

  public static NameNode createNameNode(String argv[], 
                                 Configuration conf) throws IOException {
    if (conf == null)
      conf = new Configuration();
      //从命令行参数中提取启动配置项数据
    StartupOption startOpt = parseArguments(argv);
    if (startOpt == null) {
      printUsage();
      return null;
    }
    //设置启动参数
    setStartupOption(conf, startOpt);

    switch (startOpt) {
      case FORMAT:
        boolean aborted = format(conf, true);
        System.exit(aborted ? 1 : 0);
      case FINALIZE:
        aborted = finalize(conf, true);
        System.exit(aborted ? 1 : 0);
      default:
    }

    //新建NameNode
    NameNode namenode = new NameNode(conf);
    return namenode;
  }

NameNode()

  public NameNode(Configuration conf) throws IOException {
    super(conf);
    try {
    //初始化
      initialize(getConf());
    } catch (IOException e) {
      this.stop();
      throw e;
    }
  }

initialize()

  private void initialize(Configuration conf) throws IOException {
    InetSocketAddress socAddr = NameNode.getAddress(conf);
    int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
    // 关键->创建一个RPC Server
    this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
                                handlerCount, false, conf);

    // The rpc-server port can be ephemeral... ensure we have the correct info
    this.serverAddress = this.server.getListenerAddress(); 
    FileSystem.setDefaultUri(conf, getUri(serverAddress));
    LOG.info("Namenode up at: " + this.serverAddress);

    myMetrics = new NameNodeMetrics(conf, this);

    //关键->创建一个FSNameSystem
    this.namesystem = new FSNamesystem(this, conf);
    //启动HTTP Server
    startHttpServer(conf);
    //启动RPC Server
    this.server.start();  //start RPC server   

    startTrashEmptier(conf);
  }

FSNameSystem()

  FSNamesystem(NameNode nn, Configuration conf) throws IOException {
    try {
      //初始化FSNameSystem
      initialize(nn, conf);
      userPasswordInformation = new UserPasswordInformation(conf);
      extendAccessControlList = new ExtendAccessControlList(conf);
    } catch(IOException e) {
      LOG.error(getClass().getSimpleName() + " initialization failed.", e);
      close();
      throw e;
    }
  }

FSNameSystem.initialize()

  private void initialize(NameNode nn, Configuration conf) throws IOException {
    this.systemStart = now();
    this.fsLock = new ReentrantReadWriteLock(); // non-fair locking
    setConfigurationParameters(conf);

    this.nameNodeAddress = nn.getNameNodeAddress();
    this.registerMBean(conf); // register the MBean for the FSNamesystemStutus

    //创建FSDirectory
    this.dir = new FSDirectory(this, conf);
    StartupOption startOpt = NameNode.getStartupOption(conf);

    //加载FSImage
    this.dir.loadFSImage(getNamespaceDirs(conf),
                         getNamespaceEditsDirs(conf), startOpt);
    long timeTakenToLoadFSImage = now() - systemStart;
    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
    NameNode.getNameNodeMetrics().fsImageLoadTime.set(
                              (int) timeTakenToLoadFSImage);
    this.safeMode = new SafeModeInfo(conf);
    setBlockTotal();
    //创建PendingReplicationBlocks
    pendingReplications = new PendingReplicationBlocks(
                            conf.getInt("dfs.replication.pending.timeout.sec", 
                                        -1) * 1000L);
    //创建心跳检查线程                                          
    this.hbthread = new Daemon(new HeartbeatMonitor());
    //创建租约管理线程
    this.lmthread = new Daemon(leaseManager.new Monitor());
    //创建副本管理线程
    this.replthread = new Daemon(new ReplicationMonitor());
    hbthread.start();
    lmthread.start();
    replthread.start();

    // 副本超额block管理线程
    this.overreplthread = new Daemon(new OverReplicationMonitor());
    overreplthread.start();

    this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                                           conf.get("dfs.hosts.exclude",""));
    //创建退役节点管理线程
    this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
        conf.getInt("dfs.namenode.decommission.interval", 30),
        conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
    dnthread.start();

    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
            DNSToSwitchMapping.class), conf);

    /* If the dns to swith mapping supports cache, resolve network 
     * locations of those hosts in the include list, 
     * and store the mapping in the cache; so future calls to resolve
     * will be fast.
     */
    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
      dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
    }
    //创建副本定位器用于定位副本存放位置
    this.replicator = BlockPlacementPolicy.getInstance(
        conf,
        this,
        this.clusterMap,
        this.hostsReader,
        this.dnsToSwitchMapping,
        this);
  }

FSDirectory(this, conf)

新建FSDirecotry

 FSDirectory(FSNamesystem ns, Configuration conf) {
    //创建一个FSImage,并实例化构建FSDirectory
    this(new FSImage(), ns, conf);
    fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
                                FSImage.getCheckpointEditsDirs(conf, null));
  }

this(new FSImage(), ns, conf);

FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
    this.bLock = new ReentrantReadWriteLock(); // non-fair
    this.cond = bLock.writeLock().newCondition();
    //创建根目录
    rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
        ns.createFsOwnerPermissions(new FsPermission((short)0755)),
        Integer.MAX_VALUE, -1);
    this.fsImage = fsImage;
    namesystem = ns;
    initialize(conf);
  }

FSDirectory.initialize()

  private void initialize(Configuration conf) {
    MetricsContext metricsContext = MetricsUtil.getContext("dfs");
    directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
    directoryMetrics.setTag("sessionId", conf.get("session.id"));
  }

FSDirectory.loadFSImage()

  void loadFSImage(Collection<File> dataDirs,
                   Collection<File> editsDirs,
                   StartupOption startOpt) throws IOException {
    // format before starting up if requested
    if (startOpt == StartupOption.FORMAT) {
      fsImage.setStorageDirectories(dataDirs, editsDirs);
      fsImage.format();
      startOpt = StartupOption.REGULAR;
    }
    try {
      //从datadir和editdirs加载FSImage
      if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) {
        fsImage.saveNamespace(true);
      }
      //初始化Editlog
      FSEditLog editLog = fsImage.getEditLog();
      assert editLog != null : "editLog must be initialized";
      if (!editLog.isOpen())
        editLog.open();
      fsImage.setCheckpointDirectories(null, null);
    } catch(IOException e) {
      fsImage.close();
      throw e;
    }
    writeLock();
    try {
      this.ready = true;
      cond.signalAll();
    } finally {
      writeUnlock();
    }
  }

FSImage.recoverTransitionRead()

  boolean recoverTransitionRead(Collection<File> dataDirs,
                             Collection<File> editsDirs,
                                StartupOption startOpt
                                ) throws IOException {
    assert startOpt != StartupOption.FORMAT : 
      "NameNode formatting should be performed before reading the image";

    // none of the data dirs exist
    if (dataDirs.size() == 0 || editsDirs.size() == 0)  
      throw new IOException(
        "All specified directories are not accessible or do not exist.");

    if(startOpt == StartupOption.IMPORT 
        && (checkpointDirs == null || checkpointDirs.isEmpty()))
      throw new IOException("Cannot import image from a checkpoint. "
                          + "\"fs.checkpoint.dir\" is not set." );

    if(startOpt == StartupOption.IMPORT 
        && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
      throw new IOException("Cannot import image from a checkpoint. "
                          + "\"fs.checkpoint.edits.dir\" is not set." );

    setStorageDirectories(dataDirs, editsDirs);
    // 1.检查所有目录的状态和一致性
    Map<StorageDirectory, StorageState> dataDirStates = 
             new HashMap<StorageDirectory, StorageState>();
    boolean isFormatted = false;
    for (Iterator<StorageDirectory> it = 
                      dirIterator(); it.hasNext();) {
      StorageDirectory sd = it.next();
      StorageState curState;
      try {
        curState = sd.analyzeStorage(startOpt);
        // sd is locked but not opened
        switch(curState) {
        case NON_EXISTENT:
          // name-node fails if any of the configured storage dirs are missing
          throw new InconsistentFSStateException(sd.getRoot(),
                                                 "storage directory does not exist or is not accessible.");
        case NOT_FORMATTED:
          break;
        case NORMAL:
          break;
        default:  // recovery is possible
          sd.doRecover(curState);      
        }
        if (curState != StorageState.NOT_FORMATTED 
            && startOpt != StartupOption.ROLLBACK) {
          sd.read(); // read and verify consistency with other directories
          isFormatted = true;
        }
        if (startOpt == StartupOption.IMPORT && isFormatted)
          // import of a checkpoint is allowed only into empty image directories
          throw new IOException("Cannot import image from a checkpoint. " 
              + " NameNode already contains an image in " + sd.getRoot());
      } catch (IOException ioe) {
        sd.unlock();
        throw ioe;
      }
      dataDirStates.put(sd,curState);
    }

    if (!isFormatted && startOpt != StartupOption.ROLLBACK 
                     && startOpt != StartupOption.IMPORT)
      throw new IOException("NameNode is not formatted.");
    if (layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) {
      checkVersionUpgradable(layoutVersion);
    }
    if (startOpt != StartupOption.UPGRADE
          && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION
          && layoutVersion != FSConstants.LAYOUT_VERSION)
        throw new IOException(
                          "\nFile system image contains an old layout version " + layoutVersion
                          + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION
                          + " is required.\nPlease restart NameNode with -upgrade option.");
    // check whether distributed upgrade is reguired and/or should be continued
    verifyDistributedUpgradeProgress(startOpt);

    // 2. Format unformatted dirs.
    this.checkpointTime = 0L;
    for (Iterator<StorageDirectory> it = 
                     dirIterator(); it.hasNext();) {
      StorageDirectory sd = it.next();
      StorageState curState = dataDirStates.get(sd);
      switch(curState) {
      case NON_EXISTENT:
        assert false : StorageState.NON_EXISTENT + " state cannot be here";
      case NOT_FORMATTED:
        LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
        LOG.info("Formatting ...");
        sd.clearDirectory(); // create empty currrent dir
        break;
      default:
        break;
      }
    }


blog comments powered by Disqus