欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

MapReduce之Job提交流程源码和切片源码分析

程序员文章站 2023-04-04 22:27:56
hadoop2.7.2 MapReduce Job提交源码及切片源码分析 1. 首先从 函数进入 2. 进入 方法 3. 进入 方法 MapReduce作业提交时连接集群通过Job的Connect方法实现,它实际上是构造集群Cluster实例cluster cluster是连接MapReduce集群 ......

hadoop2.7.2 mapreduce job提交源码及切片源码分析

  1. 首先从waitforcompletion函数进入
boolean result = job.waitforcompletion(true);
/**
   * submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws ioexception thrown if the communication with the 
   *         <code>jobtracker</code> is lost
   */
  public boolean waitforcompletion(boolean verbose
                                   ) throws ioexception, interruptedexception,
                                            classnotfoundexception {
    // 首先判断state,当state为define时可以提交,进入 submit() 方法
    if (state == jobstate.define) {
      submit();
    }
    if (verbose) {
      monitorandprintjob();
    } else {
      // get the completion poll interval from the client.
      int completionpollintervalmillis = 
        job.getcompletionpollinterval(cluster.getconf());
      while (!iscomplete()) {
        try {
          thread.sleep(completionpollintervalmillis);
        } catch (interruptedexception ie) {
        }
      }
    }
    return issuccessful();
  }
  1. 进入submit()方法
/**
   * submit the job to the cluster and return immediately.
   * @throws ioexception
   */
  public void submit() 
         throws ioexception, interruptedexception, classnotfoundexception {
    // 确认jobstate状态为可提交状态,否则不能提交
    ensurestate(jobstate.define);
    // 设置使用最新的api
    setusenewapi();
    // 进入connect()方法,mapreduce作业提交时连接集群是通过job类的connect()方法实现的,
    // 它实际上是构造集群cluster实例cluster
    connect();
    // connect()方法执行完之后,定义提交者submitter
    final jobsubmitter submitter = 
        getjobsubmitter(cluster.getfilesystem(), cluster.getclient());
    status = ugi.doas(new privilegedexceptionaction<jobstatus>() {
      public jobstatus run() throws ioexception, interruptedexception, 
      classnotfoundexception {
        // 这里的核心方法是submitjobinternal(),顾名思义,提交job的内部方法,实现了提交job的所有业务逻辑
          // 进入submitjobinternal
        return submitter.submitjobinternal(job.this, cluster);
      }
    });
    // 提交之后state状态改变
    state = jobstate.running;
    log.info("the url to track the job: " + gettrackingurl());
   }
  1. 进入connect()方法
  • mapreduce作业提交时连接集群通过job的connect方法实现,它实际上是构造集群cluster实例cluster
  • cluster是连接mapreduce集群的一种工具,提供了获取mapreduce集群信息的方法
  • 在cluster内部,有一个与集群进行通信的客户端通信协议clientprotocol的实例client,它由clientprotocolprovider的静态create()方法构造
  • 在create内部,hadoop2.x中提供了两种模式的clientprotocol,分别为yarn模式的yarnrunner和local模式的localjobrunner,cluster实际上是由它们负责与集群进行通信的
  private synchronized void connect()
          throws ioexception, interruptedexception, classnotfoundexception {
    if (cluster == null) {// cluster提供了远程获取mapreduce的方法
      cluster = 
        ugi.doas(new privilegedexceptionaction<cluster>() {
                   public cluster run()
                          throws ioexception, interruptedexception, 
                                 classnotfoundexception {
                     // 只需关注这个cluster()构造器,构造集群cluster实例
                     return new cluster(getconfiguration());
                   }
                 });
    }
  }
  1. 进入cluster()构造器
// 首先调用一个参数的构造器,间接调用两个参数的构造器
public cluster(configuration conf) throws ioexception {
    this(null, conf);
  }

  public cluster(inetsocketaddress jobtrackaddr, configuration conf) 
      throws ioexception {
    this.conf = conf;
    this.ugi = usergroupinformation.getcurrentuser();
    // 最重要的initialize方法
    initialize(jobtrackaddr, conf);
  }
  
// cluster中要关注的两个成员变量是客户端通讯协议提供者clientprotocolprovider和客户端通讯协议clientprotocol实例client
  private void initialize(inetsocketaddress jobtrackaddr, configuration conf)
      throws ioexception {

    synchronized (frameworkloader) {
      for (clientprotocolprovider provider : frameworkloader) {
        log.debug("trying clientprotocolprovider : "
            + provider.getclass().getname());
        clientprotocol clientprotocol = null; 
        try {
          // 如果配置文件没有配置yarn信息,则构建localrunner,mr任务本地运行
          // 如果配置文件有配置yarn信息,则构建yarnrunner,mr任务在yarn集群上运行
          if (jobtrackaddr == null) {
            // 客户端通讯协议client是调用clientprotocolprovider的create()方法实现
            clientprotocol = provider.create(conf);
          } else {
            clientprotocol = provider.create(jobtrackaddr, conf);
          }

          if (clientprotocol != null) {
            clientprotocolprovider = provider;
            client = clientprotocol;
            log.debug("picked " + provider.getclass().getname()
                + " as the clientprotocolprovider");
            break;
          }
          else {
            log.debug("cannot pick " + provider.getclass().getname()
                + " as the clientprotocolprovider - returned null protocol");
          }
        } 
        catch (exception e) {
          log.info("failed to use " + provider.getclass().getname()
              + " due to error: ", e);
        }
      }
    }

    if (null == clientprotocolprovider || null == client) {
      throw new ioexception(
          "cannot initialize cluster. please check your configuration for "
              + mrconfig.framework_name
              + " and the correspond server addresses.");
    }
  }
  1. 进入submitjobinternal(),job的内部提交方法,用于提交job到集群
jobstatus submitjobinternal(job job, cluster cluster) 
  throws classnotfoundexception, interruptedexception, ioexception {

    //validate the jobs output specs 
    // 检查结果的输出路径是否已经存在,如果存在会报异常
    checkspecs(job);

    // conf里边是集群的xml配置文件信息
    configuration conf = job.getconfiguration();
    // 添加mr框架到分布式缓存中
    addmrframeworktodistributedcache(conf);

    // 获取提交执行时相关资源的临时存放路径
    // 参数未配置时默认是(工作空间根目录下的)/tmp/hadoop-yarn/staging/提交作业用户名/.staging
    path jobstagingarea = jobsubmissionfiles.getstagingdir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    inetaddress ip = inetaddress.getlocalhost();
    if (ip != null) {//记录提交作业的主机ip、主机名,并且设置配置信息conf
      submithostaddress = ip.gethostaddress();
      submithostname = ip.gethostname();
      conf.set(mrjobconfig.job_submithost,submithostname);
      conf.set(mrjobconfig.job_submithostaddr,submithostaddress);
    }
    // 获取jobid
    jobid jobid = submitclient.getnewjobid();
    // 设置jobid
    job.setjobid(jobid);
    // 提交作业的路径path(path parent, string child),会将两个参数拼接为一个路径
    path submitjobdir = new path(jobstagingarea, jobid.tostring());
    // job的状态
    jobstatus status = null;
    try {
      conf.set(mrjobconfig.user_name,
          usergroupinformation.getcurrentuser().getshortusername());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.amfilterinitializer");
      conf.set(mrjobconfig.mapreduce_job_dir, submitjobdir.tostring());
      log.debug("configuring job " + jobid + " with " + submitjobdir 
          + " as the submit dir");
      // get delegation token for the dir
      tokencache.obtaintokensfornamenodes(job.getcredentials(),
          new path[] { submitjobdir }, conf);
      
      populatetokencache(conf, job.getcredentials());

      // generate a secret to authenticate shuffle transfers
      if (tokencache.getshufflesecretkey(job.getcredentials()) == null) {
        keygenerator keygen;
        try {
          keygen = keygenerator.getinstance(shuffle_keygen_algorithm);
          keygen.init(shuffle_key_length);
        } catch (nosuchalgorithmexception e) {
          throw new ioexception("error generating shuffle secret key", e);
        }
        secretkey shufflekey = keygen.generatekey();
        tokencache.setshufflesecretkey(shufflekey.getencoded(),
            job.getcredentials());
      }
      if (cryptoutils.isencryptedspillenabled(conf)) {
        conf.setint(mrjobconfig.mr_am_max_attempts, 1);
        log.warn("max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }

      // 拷贝jar包到集群
      // 此方法中调用如下方法:ruploader.uploadfiles(job, jobsubmitdir);
      // uploadfiles方法将jar包拷贝到集群
      copyandconfigurefiles(job, submitjobdir);

      path submitjobfile = jobsubmissionfiles.getjobconfpath(submitjobdir);
      
      // create the splits for the job
      log.debug("creating splits at " + jtfs.makequalified(submitjobdir));
      // 计算切片,生成切片规划文件
      int maps = writesplits(job, submitjobdir);
      conf.setint(mrjobconfig.num_maps, maps);
      log.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      string queue = conf.get(mrjobconfig.queue_name,
          jobconf.default_queue_name);
      accesscontrollist acl = submitclient.getqueueadmins(queue);
      conf.set(tofullpropertyname(queue,
          queueacl.administer_jobs.getaclname()), acl.getaclstring());

      // removing jobtoken referrals before copying the jobconf to hdfs
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      tokencache.cleanuptokenreferral(conf);

      if (conf.getboolean(
          mrjobconfig.job_token_tracking_ids_enabled,
          mrjobconfig.default_job_token_tracking_ids_enabled)) {
        // add hdfs tracking ids
        arraylist<string> trackingids = new arraylist<string>();
        for (token<? extends tokenidentifier> t :
            job.getcredentials().getalltokens()) {
          trackingids.add(t.decodeidentifier().gettrackingid());
        }
        conf.setstrings(mrjobconfig.job_token_tracking_ids,
            trackingids.toarray(new string[trackingids.size()]));
      }

      // set reservation info if it exists
      reservationid reservationid = job.getreservationid();
      if (reservationid != null) {
        conf.set(mrjobconfig.reservation_id, reservationid.tostring());
      }

      // write job file to submit dir
      writeconf(conf, submitjobfile);
      
      //
      // now, actually submit the job (using the submit name)
      // 开始正式提交job
      printtokens(jobid, job.getcredentials());
      status = submitclient.submitjob(
          jobid, submitjobdir.tostring(), job.getcredentials());
      if (status != null) {
        return status;
      } else {
        throw new ioexception("could not launch job");
      }
    } finally {
      if (status == null) {
        log.info("cleaning up the staging area " + submitjobdir);
        if (jtfs != null && submitjobdir != null)
          jtfs.delete(submitjobdir, true);

      }
    }
  }
  1. 进入writesplits(job, submitjobdir),计算切片,生成切片规划文件
  • 内部会调用writenewsplits(job, jobsubmitdir)方法
  • writenewsplits(job, jobsubmitdir)内部定义了一个inputformat类型的实例input
  • inputformat主要作用
    • 验证job的输入规范
    • 对输入的文件进行切分,形成多个inputsplit(切片)文件,每一个inputsplit对应着一个map任务(maptask)
    • 将切片后的数据按照规则形成key,value键值对recordreader
  • input调用getsplits()方法:list<inputsplit> splits = input.getsplits(job);
  1. 进入fileinputformat类下的getsplits(job)方法
/** 
   * generate the list of files and make them into filesplits.
   * @param job the job context
   * @throws ioexception
   */
  public list<inputsplit> getsplits(jobcontext job) throws ioexception {
    stopwatch sw = new stopwatch().start();
      
    // getformatminsplitsize()返回值固定为1,getminsplitsize(job)返回job大小
    long minsize = math.max(getformatminsplitsize(), getminsplitsize(job));
    // getmaxsplitsize(job)返回lang类型的最大值
    long maxsize = getmaxsplitsize(job);

    // generate splits 生成切片
    list<inputsplit> splits = new arraylist<inputsplit>();
    list<filestatus> files = liststatus(job);
    // 遍历job下的所有文件
    for (filestatus file: files) {
      // 获取文件路径
      path path = file.getpath();
      // 获取文件大小
      long length = file.getlen();
      if (length != 0) {
        blocklocation[] blklocations;
        if (file instanceof locatedfilestatus) {
          blklocations = ((locatedfilestatus) file).getblocklocations();
        } else {
          filesystem fs = path.getfilesystem(job.getconfiguration());
          blklocations = fs.getfileblocklocations(file, 0, length);
        }
        // 判断是否可分割
        if (issplitable(job, path)) {
          // 获取块大小
          // 本地环境块大小默认为32mb,yarn环境在hadoop2.x新版本为128mb,旧版本为64mb
          long blocksize = file.getblocksize();
          // 计算切片的逻辑大小,默认等于块大小
          // 返回值为:return math.max(minsize, math.min(maxsize, blocksize));
          // 其中minsize=1, maxsize=long类型最大值, blocksize为切片大小
          long splitsize = computesplitsize(blocksize, minsize, maxsize);

          long bytesremaining = length;
          // 每次切片时就要判断切片剩下的部分是否大于切片大小的split_slop(默认为1.1)倍,
          // 否则就不再切分,划为一块
          while (((double) bytesremaining)/splitsize > split_slop) {
            int blkindex = getblockindex(blklocations, length-bytesremaining);
            splits.add(makesplit(path, length-bytesremaining, splitsize,
                        blklocations[blkindex].gethosts(),
                        blklocations[blkindex].getcachedhosts()));
            bytesremaining -= splitsize;
          }

          if (bytesremaining != 0) {
            int blkindex = getblockindex(blklocations, length-bytesremaining);
            splits.add(makesplit(path, length-bytesremaining, bytesremaining,
                       blklocations[blkindex].gethosts(),
                       blklocations[blkindex].getcachedhosts()));
          }
        } else { // not splitable
          splits.add(makesplit(path, 0, length, blklocations[0].gethosts(),
                      blklocations[0].getcachedhosts()));
        }
      } else { 
        //create empty hosts array for zero length files
        splits.add(makesplit(path, 0, length, new string[0]));
      }
    }
    // save the number of input files for metrics/loadgen
    job.getconfiguration().setlong(num_input_files, files.size());
    sw.stop();
    if (log.isdebugenabled()) {
      log.debug("total # of splits generated by getsplits: " + splits.size()
          + ", timetaken: " + sw.now(timeunit.milliseconds));
    }
    return splits;
  }
欢迎关注下方公众号,获取更多文章信息

MapReduce之Job提交流程源码和切片源码分析