欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop源码分析六启动文件namenode原理详解

程序员文章站 2022-06-25 09:26:05
1、 namenode启动在中分析了hadoop的启动文件,其中提到了namenode启动的时候调用的类为org.apache.hadoop.hdfs.server.namenode.namenode...

1、 namenode启动

在中分析了hadoop的启动文件,其中提到了namenode启动的时候调用的类为

org.apache.hadoop.hdfs.server.namenode.namenode

其main方法的内容如下:

 public static void main(string argv[]) throws exception {
    if (dfsutil.parsehelpargument(argv, namenode.usage, system.out, true)) {
      system.exit(0);
    }
    try {
      stringutils.startupshutdownmessage(namenode.class, argv, log);
      namenode namenode = createnamenode(argv, null);
      if (namenode != null) {
        namenode.join();
      }
    } catch (throwable e) {
      log.error("failed to start namenode.", e);
      terminate(1, e);
    }
  }

这段代码的重点在第8行,这里createnamenode方法创建了一个namenode对象,然后调用其join方法阻塞等待请求。

createnamenode方法的内容如下:

 public static namenode createnamenode(string argv[], configuration conf)
      throws ioexception {
    log.info("createnamenode " + arrays.aslist(argv));
    if (conf == null)
      conf = new hdfsconfiguration();
    // parse out some generic args into configuration.
    genericoptionsparser hparser = new genericoptionsparser(conf, argv);
    argv = hparser.getremainingargs();
    // parse the rest, nn specific args.
    startupoption startopt = parsearguments(argv);
    if (startopt == null) {
      printusage(system.err);
      return null;
    }
    setstartupoption(conf, startopt);
    switch (startopt) {
      case format: {
        boolean aborted = format(conf, startopt.getforceformat(),
            startopt.getinteractiveformat());
        terminate(aborted ? 1 : 0);
        return null; // avoid javac warning
      }
      case genclusterid: {
        system.err.println("generating new cluster id:");
        system.out.println(nnstorage.newclusterid());
        terminate(0);
        return null;
      }
      case finalize: {
        system.err.println("use of the argument '" + startupoption.finalize +
            "' is no longer supported. to finalize an upgrade, start the nn " +
            " and then run `hdfs dfsadmin -finalizeupgrade'");
        terminate(1);
        return null; // avoid javac warning
      }
      case rollback: {
        boolean aborted = dorollback(conf, true);
        terminate(aborted ? 1 : 0);
        return null; // avoid warning
      }
      case bootstrapstandby: {
        string toolargs[] = arrays.copyofrange(argv, 1, argv.length);
        int rc = bootstrapstandby.run(toolargs, conf);
        terminate(rc);
        return null; // avoid warning
      }
      case initializesharededits: {
        boolean aborted = initializesharededits(conf,
            startopt.getforceformat(),
            startopt.getinteractiveformat());
        terminate(aborted ? 1 : 0);
        return null; // avoid warning
      }
      case backup:
      case checkpoint: {
        namenoderole role = startopt.tonoderole();
        defaultmetricssystem.initialize(role.tostring().replace(" ", ""));
        return new backupnode(conf, role);
      }
      case recover: {
        namenode.dorecovery(startopt, conf);
        return null;
      }
      case metadataversion: {
        printmetadataversion(conf);
        terminate(0);
        return null; // avoid javac warning
      }
      case upgradeonly: {
        defaultmetricssystem.initialize("namenode");
        new namenode(conf);
        terminate(0);
        return null;
      }
      default: {
        defaultmetricssystem.initialize("namenode");
        return new namenode(conf);
      }
    }
  }

这段代码很简单。主要做的操作有三个:

  • 1、 创建配置文件对象
  • 2、 解析命令行的参数
  • 3、 根据参数执行对应方法(switch块)

其中创建的配置文件的为hdfsconfiguration(第5行),这里的hdfsconfiguration继承于configuration类,它会加载hadoop的配置文件到内存中。然后解析传入main方法的参数,根据这个参数执行具体的方法。正常启动的时候执行的default里的内容。default的内容也很简单,就是创建一个namenode对象。

这里先从hdfsconfiguration开始分析,详细讲解hdfs的配置文件处理。

首先看hdfsconfiguration的初始化方法如下:

  public hdfsconfiguration() {
    super();
  }

这里是调用其父类的初始化方法。

其父类为configuration类,它的初始化方法如下:

  /** a new configuration. */
  public configuration() {
    this(true);
  }

这里可以看见他是调用了一个重载方法,传入了一个参数:true。

接着细看这个重载方法:

  public configuration(boolean loaddefaults) {
    this.loaddefaults = loaddefaults;
    updatingresource = new concurrenthashmap<string, string[]>();
    synchronized(configuration.class) {
      registry.put(this, null);
    }
  }

这里也很简单,这里主要是为两个参数赋值,并将新创建的configuration添加到registry中。
至此便创建好了一个配置文件。但是关于配置文件的初始化解析还未完成。在java里可以使用static关键字声明一段代码块,这段代码块在类加载的时候会被执行。在configuration和hdfsconfiguration中都有静态代码块。
首先在configuration类中,在第682行有一段静态代码块,其内容如下:

Hadoop源码分析六启动文件namenode原理详解

这段代码的重点在第695行和第696行,这里调用了一个adddefaultresource方法,这里传入了两个参数core-default.xml和core-site.xml。其中core-site.xml就是在安装hadoop的时候设置的配置文件。而core-default.xml是hadoop自带的配置文件,这个文件可以在hadoop的官方文档里查到,文档链接如下:https://hadoop.apache.org/docs/r2.7.6/hadoop-project-dist/hadoop-common/core-default.xml
同样在hadoop的源码里也有这个文件,它在hadoop-common-xx.jar中。

接着继续分析调用的adddefaultresource方法

其内容如下:

  public static synchronized void adddefaultresource(string name) {
    if(!defaultresources.contains(name)) {
      defaultresources.add(name);
      for(configuration conf : registry.keyset()) {
        if(conf.loaddefaults) {
          conf.reloadconfiguration();
        }
      }
    }
  }

这段代码也很简单。首先是第二行先从defaultresources中判断是否已经存在该配置文件,

这里的defaultresources是一个list

其定义如下:

 private static final copyonwritearraylist<string> defaultresources =
    new copyonwritearraylist<string>();

若defaultresources中不存在这个配置文件,则继续向下执行,将这个配置文件添加到defaultresources中(第3行)。然后遍历registry中的key(第4行),这里的key就是在上文提到的configuration对象。然后根据其loaddefaults的值来判断是否执行reloadconfiguration方法。
这里的loaddefaults的值就是上文分析的传入重载方法的值,上文传入的为true,所以其创建的configuration对象在这里会执行reloadconfiguration方法。

reloadconfiguration方法内容如下:

  public synchronized void reloadconfiguration() {
    properties = null;                            // trigger reload
    finalparameters.clear();                      // clear site-limits
  }

这里可以看见这个reloadconfiguration方法并没有真正的重新加载配置文件而是将properties的值设置为空。

同样在hdfsconfiguration也有类似的静态代码块,在第30行,其内容如下:

Hadoop源码分析六启动文件namenode原理详解

这里首先调用了一个adddeprecatedkeys方法然后调用了一个adddefaultresource。这里的adddefaultresource传了两个文件hdfs-default.xml和hdfs-site.xml。其中hdfs-site.xml是安装时的配置文件,hdfs-default.xml是其自带的默认文件,同上文的core-default.xml一样。官网链接为:https://hadoop.apache.org/docs/r2.7.6/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml。文件位于:hadoop-hdfs-2.7.6.jar。

其中adddeprecatedkeys方法内容如下:

 private static void adddeprecatedkeys() {
    configuration.adddeprecations(new deprecationdelta[] {
      new deprecationdelta("dfs.backup.address",
        dfsconfigkeys.dfs_namenode_backup_address_key),
      new deprecationdelta("dfs.backup.http.address",
        dfsconfigkeys.dfs_namenode_backup_http_address_key),
      new deprecationdelta("dfs.balance.bandwidthpersec",
        dfsconfigkeys.dfs_datanode_balance_bandwidthpersec_key),
      new deprecationdelta("dfs.data.dir",
        dfsconfigkeys.dfs_datanode_data_dir_key),
      new deprecationdelta("dfs.http.address",
        dfsconfigkeys.dfs_namenode_http_address_key),
      new deprecationdelta("dfs.https.address",
        dfsconfigkeys.dfs_namenode_https_address_key),
      new deprecationdelta("dfs.max.objects",
        dfsconfigkeys.dfs_namenode_max_objects_key),
      new deprecationdelta("dfs.name.dir",
        dfsconfigkeys.dfs_namenode_name_dir_key),
      new deprecationdelta("dfs.name.dir.restore",
        dfsconfigkeys.dfs_namenode_name_dir_restore_key),
      new deprecationdelta("dfs.name.edits.dir",
        dfsconfigkeys.dfs_namenode_edits_dir_key),
      new deprecationdelta("dfs.read.prefetch.size",
        dfsconfigkeys.dfs_client_read_prefetch_size_key),
      new deprecationdelta("dfs.safemode.extension",
        dfsconfigkeys.dfs_namenode_safemode_extension_key),
      new deprecationdelta("dfs.safemode.threshold.pct",
        dfsconfigkeys.dfs_namenode_safemode_threshold_pct_key),
      new deprecationdelta("dfs.secondary.http.address",
        dfsconfigkeys.dfs_namenode_secondary_http_address_key),
      new deprecationdelta("dfs.socket.timeout",
        dfsconfigkeys.dfs_client_socket_timeout_key),
      new deprecationdelta("fs.checkpoint.dir",
        dfsconfigkeys.dfs_namenode_checkpoint_dir_key),
      new deprecationdelta("fs.checkpoint.edits.dir",
        dfsconfigkeys.dfs_namenode_checkpoint_edits_dir_key),
      new deprecationdelta("fs.checkpoint.period",
        dfsconfigkeys.dfs_namenode_checkpoint_period_key),
      new deprecationdelta("heartbeat.recheck.interval",
        dfsconfigkeys.dfs_namenode_heartbeat_recheck_interval_key),
      new deprecationdelta("dfs.https.client.keystore.resource",
        dfsconfigkeys.dfs_client_https_keystore_resource_key),
      new deprecationdelta("dfs.https.need.client.auth",
        dfsconfigkeys.dfs_client_https_need_auth_key),
      new deprecationdelta("slave.host.name",
        dfsconfigkeys.dfs_datanode_host_name_key),
      new deprecationdelta("session.id",
        dfsconfigkeys.dfs_metrics_session_id_key),
      new deprecationdelta("dfs.access.time.precision",
        dfsconfigkeys.dfs_namenode_accesstime_precision_key),
      new deprecationdelta("dfs.replication.considerload",
        dfsconfigkeys.dfs_namenode_replication_considerload_key),
      new deprecationdelta("dfs.replication.interval",
        dfsconfigkeys.dfs_namenode_replication_interval_key),
      new deprecationdelta("dfs.replication.min",
        dfsconfigkeys.dfs_namenode_replication_min_key),
      new deprecationdelta("dfs.replication.pending.timeout.sec",
        dfsconfigkeys.dfs_namenode_replication_pending_timeout_sec_key),
      new deprecationdelta("dfs.max-repl-streams",
        dfsconfigkeys.dfs_namenode_replication_max_streams_key),
      new deprecationdelta("dfs.permissions",
        dfsconfigkeys.dfs_permissions_enabled_key),
      new deprecationdelta("dfs.permissions.supergroup",
        dfsconfigkeys.dfs_permissions_superusergroup_key),
      new deprecationdelta("dfs.write.packet.size",
        dfsconfigkeys.dfs_client_write_packet_size_key),
      new deprecationdelta("dfs.block.size",
        dfsconfigkeys.dfs_block_size_key),
      new deprecationdelta("dfs.datanode.max.xcievers",
        dfsconfigkeys.dfs_datanode_max_receiver_threads_key),
      new deprecationdelta("io.bytes.per.checksum",
        dfsconfigkeys.dfs_bytes_per_checksum_key),
      new deprecationdelta("dfs.federation.nameservices",
        dfsconfigkeys.dfs_nameservices),
      new deprecationdelta("dfs.federation.nameservice.id",
        dfsconfigkeys.dfs_nameservice_id),
      new deprecationdelta("dfs.client.file-block-storage-locations.timeout",
        dfsconfigkeys.dfs_client_file_block_storage_locations_timeout_ms),
    });
  }

这段代码很简单,只有一句话。调用了 configuration的静态方法adddeprecations,并向其中传入了一个参数,参数类型为deprecationdelta类的数组,并为数组中的数据进行赋值。

以上就是关于hadoop源码分析启动文件namenode原理详解的详细内容,更多关于hadoop源码分析的资料请持续关注其它相关文章!