本次阅读的是MapReduce中的核心源码,主要分成三个部分查看:
    1、客户端Client部分
    2、MapTask部分
    3、ReduceTask部分

一、客户端Client部分

    客户端在 MapReduce 中是一个重要的角色,主要负责 任务的初始化和Split切片的划分。

    当我们执行 MapReduce 任务时,输入的命令是 Hadoop jar wc.jar WordCount.class input output

    可以知道 MapReduce 程序的入口是我们编写的配置 Job 的 WordCount.class,

    这个此类中我们工作也就是设置 Job 的参数然后 submit 提交,

    所以我们在客户端的源码从 Job.waitForCompletion() 开始即可

Job类路径:org.apache.hadoop.mapreduce.Job ,注意是 mapreduce 包下

 /**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit(); //提交任务,进入
    }
    if (verbose) { //是否打印日志
      monitorAndPrintJob();
	// 省略
  }

然后再进入 submit() 方法:

  /**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI(); //是否使用新api
    connect();  //连接yarn,获取连接对象cluster
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    //ugi是UserGroupInformation类的实例,表示Hadoop中的用户和组信息
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);  //提交job, 进入
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

进入 submitter.submitJobInternal(Job.this, cluster)  

  /**
   * Internal method for submitting jobs to the system.
   * 
   * <p>The job submission process involves:
   * <ol>
   *   <li>
   *   Checking the input and output specifications of the job. //检查输入输出路径
   *   </li>
   *   <li>
   *   Computing the {@link InputSplit}s for the job. // 计算split分片
   *   </li>
   *   <li>
   *   Setup the requisite accounting information for the 
   *   {@link DistributedCache} of the job, if necessary.
   *   </li>
   *   <li>
   *   Copying the job's jar and configuration to the map-reduce system //复制job jar文件和配置到集群
   *   directory on the distributed file-system. 
   *   </li>
   *   <li>
   *   Submitting the job to the <code>JobTracker</code> and optionally
   *   monitoring it's status.
   *   </li>
   * </ol></p>
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job); //检查输入输出路径,输出路径要为空

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf); // 将MapReduce框架加入分布式缓存中

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //初始化job的工作根目录
    //configure the command line options correctly on the submitting dfs
	
	// 省略
	
    JobID jobId = submitClient.getNewJobID(); // submitClient的类型是ClientProtocol接口,用来进行JobClient和JobTracker之间的RPC通信
    job.setJobID(jobId);
	
	// 省略

      copyAndConfigureFiles(job, submitJobDir); //复制配置文件

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
	  
      int maps = writeSplits(job, submitJobDir);  //获取mapTask数量==split数量,进入
	  
      conf.setInt(MRJobConfig.NUM_MAPS, maps); // 设置map数量
      LOG.info("number of splits:" + maps);

    // 省略

      // Write job file to submit dir
      writeConf(conf, submitJobFile); //写入配置文件到提交目录
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      /**
       * 最终submitClient提交作业
       * 接下来查看 org.apache.hadoop.mapred.MapTask.run() ,注意是mapred不是mapreduce
       */
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
		  
    // 省略
  }

 可以看到在这里:

检查输入输出路径,输出路径要为空
将MapReduce框架加入分布式缓存中
初始化job的工作根目录
获取mapTask数量既split数量
写入配置文件到提交目录

进入 int maps = writeSplits(job, submitJobDir) 方法查看如何 划分Split 得到 MapTask的数目

writeSplits()  调用了 writeNewSplits() ,进入

  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();

    /**
     * 根据配置反射得到InputFormat的实现类,
     * 查看job.getInputFormatClass(),位置在JobContextImpl中,ctrl+alt+左键,进入
     */
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job); // 得到分片数组,进入(实现类为TextInputFormat父类FileInputFormat)
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest go first
    // 按大小排序split数组,因为这些split不单单的一个文件,而是输入目录中所有文件的split
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;  //返回mapTask数量
  }

首先 这里配置了输入格式化类 InputFormat,进入 job.getInputFormatClass() 查看配置,job实现类为 JobContextImpl

  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
     throws ClassNotFoundException {
    /**
     * 如果conf中没有配置,默认为TextInputFormat
     * job.setInputFormatClass();
     * TextInputFormat继承自FileInputFormat
      */
    return (Class<? extends InputFormat<?,?>>) 
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); // mapreduce.job.inputformat.class
  }

可以看到 InputFormat 默认为继承自 FileInputFormat TextInputFormat,而用户可以通过  job.setInputFormatClass()  配置。

返回 writeNewSplits() 方法,进入 input.getSplits(job) 方法查看 分片过程 input.getSplits() FileInputFormat  中实现的:

  /** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 最小值配置SPLIT_MINSIZE,默认值为1
    long maxSize = getMaxSplitSize(job); // 最大值配置SPLIT_MINSIZE,默认值Long.MAX_VALUE

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>(); //存放获取的split对象
    List<FileStatus> files = listStatus(job); //获取每个文件的信息
    for (FileStatus file: files) {  //遍历文件
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) { //如果是本地文件系统
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length); // 获取文件的block列表
        }
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize(); //block size
          /**
           * split分片的大小
           * Math.max(minSize, Math.min(maxSize, blockSize)
           * 如果要比blockSize大,设置minSize比blockSize大
           * 如果要比blockSize小,设置MaxSize比blockSize小
           * 默认情况split大小就是block大小
           * FileInputFotmat.setMaxInputSplitSize(job, size)
           * conf.set("mapred.min.split.size", size);
           */
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //当剩余的字节数为切片大小的110%时才去寻找最适合这个split的blockLocation
            // 获取一个split的最优的本地化位置,对应的block,规则是splitOffset在这个block上就行
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            // 构建split对象 (filePath, spiltOffset, splitSize, blockHosts) 关键是blockHosts, 向该host发送MapTask任务
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
	// 省略    
	return splits;
  }

  /**
   * Get the lower bound on split size imposed by the format.
   * @return the number of bytes of the minimal split for this format
   */
  protected long getFormatMinSplitSize() {
    return 1; // 最小Split为1
  }
  
  /**
   * Get the minimum split size
   * @param job the job
   * @return the minimum number of bytes that can be in a split
   */
  public static long getMinSplitSize(JobContext job) {
    // mapreduce.input.fileinputformat.split.minsize
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); //返回用设置的SPLIT_MINSIZE,默认1
  }
  
  /**
   * Get the maximum split size.
   * @param context the job to look at.
   * @return the maximum number of bytes a split can include
   */
  public static long getMaxSplitSize(JobContext context) {
    // mapreduce.input.fileinputformat.split.maxsize
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE); // 返回用设置的SPLIT_MAXSIZE,默认去Long.MAX_VALUE
  }
  
  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

  // 计算在那个block块上
  protected int getBlockIndex(BlockLocation[] blkLocations, 
                              long offset) {
    for (int i = 0 ; i < blkLocations.length; i++) {
      // is the offset inside this block? splitOffset在这个block上就行
      if ((blkLocations[i].getOffset() <= offset) &&
          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
        return i;
      }
    }
    BlockLocation last = blkLocations[blkLocations.length -1];
    long fileLength = last.getOffset() + last.getLength() -1;
    // 省略
  }

可以看到 

    Split切片大小的计算公式 为: Math.max(minSize, Math.min(maxSize, blockSize))

    minSize,由用户设置( mapred.min.split.size FileInputFotmat.setMinInputSplitSize(job, size) ),默认值 1;

    maxSize,由用户设置( mapred.max.split.size FileInputFotmat.setMaxInputSplitSize(job, size) ),默认值 Long.MAX_VALUE;

    blockSize,由用户上传文件时设置( dfs.blocksize ),默认值 128M;

    只要一个split的边界范围在一个block块上,那么这个block就被split包括;

    split对象的组成 split(filePath, spiltOffset, splitSize, blockHosts) , 关键是 blockHosts , 向该host发送MapTask任务

回到 writeNewSplits() 方法:

    List<InputSplit> splits = input.getSplits(job); // 得到分片数组,进入(实现类为TextInputFormat父类FileInputFormat)
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest go first
    // 按大小排序split数组,因为这些split不单单是一个文件的,而是输入目录中所有文件的split
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;  //返回mapTask数量

可知 mapTask与split分片的数量相同

回到 submitJobInternal() 方法,

/**
       * 最终submitClient提交作业
       * 接下来查看 org.apache.hadoop.mapred.MapTask.run() ,注意是mapred不是mapreduce
       */
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());

此时初始化配置完成,复制配置到Job目录,执行 status = submitClient.submitJob() 真正提交了任务;

客户端主要源码看完


二、MapTask部分
    框架分发MapTask到各个节点,处理属于它的split数据;
    MapTask根据split的blockHosts来进行数据本地化分发,实现计算向数据移动;

MapTask类路径:org.apache.hadoop.mapred.MapTask ,注意是 mapred 包下,不是 mapredcue;

MapTask为一个进程,首先看 run() 方法:

  @Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) //入口
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        mapPhase = getProgress().addPhase("map", 0.667f); //设置进度权重
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
	// 省略
    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);  // runNewMapper
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    // 省略
  }

查看 runNewMapper() 方法:

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =  //创建taskContext,实现类为JobContextImpl
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // 通过反射得到mapper的实现类(用户设置的那个mapperClass,默认为Mapper.class)
    // make the input format
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);  //得到InputFormat, 默认为TextInputFormat
    // rebuild the input split
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()), //得到分配的split
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>                 // 创建RecordReader,NewTrackingRecordReader为内部类
        (split, inputFormat, reporter, taskContext); // 里面默认调用TextInputFormat.createRecordReader(split, taskContext)创建LineRecordReader
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null; // RecordWriter - context.write();
    
    // get an output object 新建一个OutputCollector封装类
    if (job.getNumReduceTasks() == 0) { // reduce的数量,job.setReduceTasks(num)
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter); //
    } else {
      // NewOutputCollector为内部类, 等看到代码context.write()时再来查看NewOutputCollector的构造方法
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, //传入了RecordReader(LineRecordReader),RecordWriter(MapOutputCollector)
          committer, 
          reporter, split); // 创建MapContext,实现类MapContextImpl,MapContextImpl是TaskInputOutputContextImpl的实现类

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext); //mapperContext调用的MapContextImpl的方法,// 用户mapper里调用context.write(k, v),执行的是WrappedMapper里的方法

    try {
      input.initialize(split, mapperContext); //调用LineRecordReader的initialize()初始化input,进入,里面细节多
      mapper.run(mapperContext); // 调用mapper的run方法,传入MapContext,进入
      mapPhase.complete(); // map进度完成
      setPhase(TaskStatus.Phase.SORT);  //开始排序sort阶段
      statusUpdate(umbilical);
      input.close(); //关闭RecordReader
      input = null;
      output.close(mapperContext); //执行flush,溢写buffer中剩下的,并执行merge,进入
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }

    反射mapper的代码: getMapperClass()

  public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
     throws ClassNotFoundException {
    return (Class<? extends Mapper<?,?,?,?>>) 
      conf.getClass(MAP_CLASS_ATTR, Mapper.class);  //用户设置mapperClass,默认为Mapper.class
  }

    创建 NewTrackingRecordReader 的代码,可知其实现类为 LineRecordReader

  // 构造方法
  NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
    org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
    TaskReporter reporter,
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
    throws InterruptedException, IOException {
    // 省略
    this.real = inputFormat.createRecordReader(split, taskContext); // 默认为调用TextInputFormat.createRecordReader(split, taskContext)创建LineRecordReader
    // 省略
  }
  // TextInputFormat类
  @Override
  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    // 省略
    return new LineRecordReader(recordDelimiterBytes); // LineRecordReader
  }

进入 input.initialize(split, mapperContext) 既 LineRecordReader 的初始化:

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);  // split行数
    start = split.getStart(); // split 起始位置
    end = start + split.getLength();  // split 结束位置
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file); // 打开了这个文件,返回文件流
    
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null!=codec) {  // 如果指定了压缩
      // 省略
    } else {  // 如果没有指定压缩
	
      fileIn.seek(start); // seek到split起始offset位置开始读取,因为mapTask已经优化成本地化执行了,所以相当于从本地读取
	  
      in = new UncompressedSplitLineReader( // SplitLineReader 切割行读取
          fileIn, job, this.recordDelimiterBytes, split.getLength());
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      // 如果不是第一个split切片,在初始化时候就舍弃掉第一行
      // 原因是在split大小等于block块大小时,防止两个block把单词切割了(特别是中文编码),
      // 而前一个Mapper会把多读一行的
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start; // split应该开始读的偏移量
  }

 这里两个重点:

1、通过流的 seek() 方法传入split的 offset 实现分布式运算;

2、除了第一个分片,放弃掉第一行 ,防止两个block把 单词或字符切割了

回到 MapTask.runNewMapper(),进入 mapper.run(mapperContext) ,此时就执行 用户编写的mapper 了:

mapper类路径:org.apache.hadoop.mapreduce.mapper

  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context); // 预先执行
    try { //context中的mapContext的实现类为mapContextImpl
      // mapContextImpl.nextKeyValue() 调用的 RecordReader.nextKeyValue(),RecordReader默认为LineRecordReader
      while (context.nextKeyValue()) { // 是否有下一行,同时更新currentKey, currentValue,进入
        map(context.getCurrentKey(), context.getCurrentValue(), context);
        // 在Mapper实现类里面我们会调用context.write(key, value), 回去MapTask查看new NewOutputCollector
      }
    } finally {
      cleanup(context); //最后执行
    }
  }

 可知

    此方法里用 nextKeyValue()(实现类为 LineRecordReader ) 判断是否有下一行,

     有下一行就传入 currentKeycurrentValue 用户Mappermap方法

进入 nextKeyValue() 查看:

    context.nextKeyValue() 实现为 mapContextImpl.nextKeyValue()

    mapContextImpl 调用的 RecordReader.nextKeyValue()

    RecordReader 默认为 LineRecordReader

  /**
   * 是否有下一行,同时更新currentKey, currentValue
   */
  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable(); // 默认map输入key为long,记录offset
    }
    key.set(pos); // 修改key
    if (value == null) {
      value = new Text();
    }
    int newSize = 0; //记录读取的字节数
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark(); // 文件开始跳过编码
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); // 读取一行修改value
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false; //没有行了,返回false
    } else {
      return true;
    }
  }
  
  private LongWritable key;
  private Text value;

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

 可知 

nextKeyValue() 判断是 否有下一行的同时更新 currentKey, currentValue

使用 FileInputFormat 时mapper默认传入的 Key为LongWritable 类型,值为 偏移量offset

数据传入map方法后,在 Mapper实现类 里面我们会调用 context.write(key, value) 输出,

这时先回去 MapTask runNewMapper() 方法里 查看 new NewOutputCollector() ,看看输出怎么实现的:

  NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                   JobConf job,
                   TaskUmbilicalProtocol umbilical,
                   TaskReporter reporter
                   ) throws IOException, ClassNotFoundException {
    collector = createSortingCollector(job, reporter); //创造一个排序的collector输出池(MapOutputBuffer),进入
    partitions = jobContext.getNumReduceTasks(); // 得到partition的数量,与reduceTask数量配置相同(mapreduce.job.reduces),默认为1
    if (partitions > 1) {
      // 当partition数量大于1,就用户自己配置了时,反射获取用户配置的分区器,默认为HashPartitioner
      // job.setPartitionerClass();
      partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
        ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
    } else {
      // partition数量为1时,创建返回分区号为0的分区器
      partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
        @Override
        public int getPartition(K key, V value, int numPartitions) {
          return partitions - 1;
        }
      };
    }
  }

  // JobContextImpl
  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
     throws ClassNotFoundException {
    // job.setPartitionerClass();
    // 反射获取用户配置的分区器,默认为HashPartitioner
    return (Class<? extends Partitioner<?,?>>)
      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
  }

 NewOutputCollector在初始化时:

创建了一个 有序的collector输出池 (MapOutputBuffer)

设置了 分区器,默认 分区数量与reduceTask数量相同,数目通过设置 mapreduce.job.reduces job.setNumReduceTasks(),默认为 1

分区器,由配置 mapreduce.job.partitioner.class job.setPartitionerClass() 反射得到,

未配置分区为 HashPartitioner

当reduceTask数量为1时,分区号为0

进入createSortingCollector() 查看输出池:

  private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector.Context context =
      new MapOutputCollector.Context(this, job, reporter);

    // 用户配置(mapreduce.job.map.output.collector.class)或默认取MapOutputBuffer内存环
    Class<?>[] collectorClasses = job.getClasses(
      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);

    int remainingCollectors = collectorClasses.length;
    for (Class clazz : collectorClasses) {
      try {
        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
          throw new IOException("Invalid output collector class: " + clazz.getName() +
            " (does not implement MapOutputCollector)");
        }
        Class<? extends MapOutputCollector> subclazz =
          clazz.asSubclass(MapOutputCollector.class);
        LOG.debug("Trying map output collector class: " + subclazz.getName());
        MapOutputCollector<KEY, VALUE> collector =
          ReflectionUtils.newInstance(subclazz, job); // 最终在此反射
        /**
         * 设置溢写阈值(0.8),内存环大小(100M)
         * 设置排序算法,默认快排
         * 设置排序比较器(用户配置 || key比较器)
         * 设置combiner (用户配置 || null)
         * 设置merge时执行combine的最小文件数
         * 启动守护溢写进程
         */
        collector.init(context); // 内存环初始化,进入
    // 省略
  }

发现 输出池实现 默认为 MapTask.MapOutputBuffer,可由 mapreduce.job.map.output.collector.class 配置

进入 collector.init(context)  查看 MapOutputBuffer的初始化:

  /**
   * 设置溢写阈值(0.8),内存环大小(100M)
   * 设置排序算法,默认快排
   * 设置排序比较器(用户配置 || key比较器)
   * 设置combiner (用户配置 || null)
   * 设置merge时执行combine的最小文件数
   * 启动守护溢写进程
   */
  @SuppressWarnings("unchecked")
  public void init(MapOutputCollector.Context context
                  ) throws IOException, ClassNotFoundException {
    job = context.getJobConf();
    reporter = context.getReporter();
    mapTask = context.getMapTask();
    mapOutputFile = mapTask.getMapOutputFile();
    sortPhase = mapTask.getSortPhase();
    spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
    partitions = job.getNumReduceTasks();
    rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

    //sanity checks
    // 溢写内存比例阈值,配置mapreduce.map.sort.spill.percent或默认0.8
    final float spillper =
      job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
    // 缓存环大小默认值,配置mapreduce.task.io.sort.mb,默认100M
    final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
    indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                       INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
    if (spillper > (float)1.0 || spillper <= (float)0.0) {
      throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
          "\": " + spillper);
    }
    if ((sortmb & 0x7FF) != sortmb) {
      throw new IOException(
          "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
    }
    // 设置排序算法,配置map.sort.class,默认快排
    sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
          QuickSort.class, IndexedSorter.class), job);
    // buffers and accounting
    int maxMemUsage = sortmb << 20;
    maxMemUsage -= maxMemUsage % METASIZE;// 可用空间调整为METASIZE既16的整数倍
    kvbuffer = new byte[maxMemUsage]; // 定义缓存数组,buff环的意思不是环队列,而是首尾同时使用的数组
    bufvoid = kvbuffer.length; //bufvoid 用于标识用于存放数据的最大位置。
    kvmeta = ByteBuffer.wrap(kvbuffer) // 将kvbuffer包装成int类型的数组
       .order(ByteOrder.nativeOrder())
       .asIntBuffer();
    // 设置 buf 和 kvmeta 的分界线,赤道
    // 以赤道为中心,序列化数据和meta数据分别向两边写,
    // 在溢写时,在buffindex和kvindex中重新画赤道,然后分别往两边写数据,这样可以在溢写时同时能就收受map输出
    setEquator(0); // 还初始化了kvindex, 其位置在环形数组中相当于按照逆时针方向减去METASIZE
    bufstart = bufend = bufindex = equator; // buff数组正向存序列化值的key value值
    kvstart = kvend = kvindex; // buff数组反向存 [val序列化值起始index,key序列化值起始index,partitionId的值,val序列化值的长度]
    /**
     * k1_ser_value占50字节
     * v1_ser_value占50字节
     * [k1_ser_value v1_ser_value, ..... ,  0, 50, 0, 50]
     * 数组左边存KeyValue序列化值,右边存[val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度],每个值用int存,长4个字节
     */

    // kvmeta 中存放元数据实体的最大个数
    maxRec = kvmeta.capacity() / NMETA;
    softLimit = (int)(kvbuffer.length * spillper);
    bufferRemaining = softLimit; // 可用空间设置为溢写阈值,默认0.8
    LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
    LOG.info("soft limit at " + softLimit);
    LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
    LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

    // k/v serialization
    /**
     *  排序比较器
     *  默认取用户设置的排序比较器,job.setComparatorClass()
     *  没有配置,返回map输出中key对象的比较器
     */
    comparator = job.getOutputKeyComparator();
    keyClass = (Class<K>)job.getMapOutputKeyClass();
    valClass = (Class<V>)job.getMapOutputValueClass();
    serializationFactory = new SerializationFactory(job); // 序列化
    keySerializer = serializationFactory.getSerializer(keyClass);
    keySerializer.open(bb);
    valSerializer = serializationFactory.getSerializer(valClass);
    valSerializer.open(bb);

    // 省略

    // combiner map端合并
    final Counters.Counter combineInputCounter =
      reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
    // JobContextImpl.getCombinerClass()
    // 获取combiner类,并设置combiner分组比较器job.setCombinerKeyGroupingComparator()
    // 配置mapreduce.job.combine.class 或 job.setCombinerClass()
    // 默认没有,没有配置就不执行combiner
    combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                           combineInputCounter,
                                           reporter, null);
    if (combinerRunner != null) {
      final Counters.Counter combineOutputCounter =
        reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
      combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
    } else {
      combineCollector = null;
    }
    spillInProgress = false;
    /**
     * 每次溢写就会执行一次combiner,最后map输出完了,merge的时候触发combine的最小文件个数
     * 配置mapreduce.map.combine.minspills,默认为3
     * 也就是merge时候多于或等于三个小文件,就会执行combine
      */
    minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
    spillThread.setDaemon(true); // SpillThread溢写守护进程
    spillThread.setName("SpillThread");
    spillLock.lock();
    try {
      spillThread.start(); // 启动溢写线程
    // 省略
  }

  // MapOutputBuffer的变量
  // k/v accounting
  // 存放 meta 数据的 IntBuffer,都是int类型,(包装自kvbuffer, 本质是同一个数组)
  private IntBuffer kvmeta; // metadata overlay on backing store
  int kvstart;            // marks origin of spill metadata 标记meta区域的边界起始
  int kvend;              // marks end of spill metadata 标记meta区域的边界结束
  int kvindex;            // marks end of fully serialized records 在kvbuffer中kvmeta的左边界索引

  // 分割 meta 和 key value 内容的标识
  // meta 数据和 key value 内容都存放在同一个环形缓冲区,所以需要分隔开
  int equator;            // marks origin of meta/serialization

  int bufstart;           // marks beginning of spill 标记序列化区域的边界起始
  int bufend;             // marks beginning of collectable 标记序列化区域的边界结束
  int bufmark;            // marks end of record
  int bufindex;           // marks end of collected 在kvbuffer中序列化数据的右边界索引
  int bufvoid;            // marks the point where we should stop reading at the end of the buffer , bufvoid 用于标识用于存放数据的最大位置。

  // 存放 key value 的 byte 数组,单位是 byte,注意与 kvmeta 区分
  byte[] kvbuffer;        // main output buffer
  private final byte[] b0 = new byte[0];

  // buff数组反向存 [val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度] kvindex代表左边界的值
  // key value索引 存放位置在kvbuffer中存放kvmata区域偏移自kvindex的距离,为int类型,转为byte,占4个byte字节
  private static final int VALSTART = 0;         // val offset in acct
  private static final int KEYSTART = 1;         // key offset in acct
  // PARTITION分区号(直接存值)存放位置在kvbuffer中存放kvmata区域偏移自kvindex的距离,为int类型,转为byte,占4个byte字节
  private static final int PARTITION = 2;        // partition offset in acct
  // value的序列化数据长度值 存放位置在kvbuffer中存放kvmata区域偏移自kvindex的距离,为int类型,转为byte,占4个byte字节
  private static final int VALLEN = 3;           // length of value
  // 一对输出的meta数据在kvmeta中占用的int个数
  private static final int NMETA = 4;            // num meta ints
  // 一对输出的meta数据在kvmeta中占用的byte个数
  private static final int METASIZE = NMETA * 4; // size in bytes

  public RawComparator getOutputKeyComparator() {
    // 默认取用户设置的排序比较器 mapreduce.job.output.key.comparator.class
      // job.setComparatorClass()
    Class<? extends RawComparator> theClass = getClass(
    JobContext.KEY_COMPARATOR, null, RawComparator.class);
    if (theClass != null)
    return ReflectionUtils.newInstance(theClass, this);
    // 返回map输出中key对象的比较器
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
  }

MapOutputBuffer数据结构为数组:

        其逻辑包装成一个缓存环,目的是 在buffer在溢写时不阻塞Mapper的输出buffer环的具体实现下面讲溢写时再讲;

init() 方法中,完成了下列配置:

设置 buffer环的溢写阈值:默认 0.8,可由 mapreduce.map.sort.spill.percent 配置;

设置 buffer环内存大小:默认 100M,可由 mapreduce.task.io.sort.mb 配置;

设置 溢写时的排序算法:默认 QuickSort快排,可由 map.sort.class 配置;

设置 排序比较器:默认为 map输出的key的比较器,可由 job.setComparatorClass()mapreduce.job.output.key.comparator.class 

        设置 combiner:默认 没有combiner,设置了就运行,配置 mapreduce.job.combine.classjob.setCombinerClass() 

            combiner也可配置分组比较器:job.setCombinerKeyGroupingComparator()

        设置 Map输出完最后merge时执行combine的最小文件数:默认为 3 个,可由 mapreduce.map.combine.minspills 配置;

        启动 守护溢写进程spillThread:当达到配置的溢写阈值时就会溢写;

 初始化看完后,当用户调用 context.write() 的时候:

 查看:org.apache.hadoop.mapreduce.lib.map.WrappedMapper

  @Override
  public void write(KEYOUT key, VALUEOUT value) throws IOException,
      InterruptedException {
    mapContext.write(key, value); // 用户mapper里调用context.write(k, v),执行的是此处方法,MapContextImpl是TaskInputOutputContextImpl的实现类
  }

    TaskInputOutputContextImpl.write()

  /**
   * Generate an output key/value pair.
   */
  public void write(KEYOUT key, VALUEOUT value
                    ) throws IOException, InterruptedException {
    /**
     * 调用的是RecordWriter,而RecordWriter的实现对象为NewOutputCollector
     * NewOutputCollector里面是实例化了一个MapOutBuffer
     * 实际中调用的是MapOutBuffer.collect()
     */
    output.write(key, value);
  }

    MapTask.NewOutputCollector.write() :

  @Override
  public void write(K key, V value) throws IOException, InterruptedException {
     /**
     * 调用MapOutBuffer.collect()
     * 顺便在写入buffer的时候就用分区器计算好了分区号
     * collect(key, value, partitionId)
     */
     collector.collect(key, value,
                       partitioner.getPartition(key, value, partitions));
  }

在调用 MapOutBuffer.collect() 时就用 分区器计算好了分区号 :

    collector.collect(key, value, partitionId) :

  /**
   * 往数组中存数据时的算法
   * Serialize the key, value to intermediate storage.
   * When this method returns, kvindex must refer to sufficient unused
   * storage to store one METADATA.
   */
  public synchronized void collect(K key, V value, final int partition
                                   ) throws IOException {
    // 省略
    /**
     * ETASIZE = NMETA * 4
     * 每一对输出所占的meta空间,每一个值占4个字节,所以总共占16字节
     * 对于序列化值(k ,v),需要存的meta有四个,分别为 v.start, k.start, pId, v.len
     * 例:
     * k1_ser_value占50字节
     * v1_ser_value占50字节
     * [k1_ser_value v1_ser_value, ..... ,  0, 50, 0, 50]
     * 数组左边存KeyValue序列化值,右边存[val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度],每个值用int存,长4个字节
     */
    bufferRemaining -= METASIZE; // bufferRemaining记录可用空间
    if (bufferRemaining <= 0) {
      // start spill if the thread is not running and the soft limit has been
      // reached
      spillLock.lock();
      try {
        do {
          // 首次spill时,spillInProgress是false
          if (!spillInProgress) {
            final int kvbidx = 4 * kvindex; // byte位置
            final int kvbend = 4 * kvend;
            // serialized, unspilled bytes always lie between kvindex and
            // bufindex, crossing the equator. Note that any void space
            // created by a reset must be included in "used" bytes
            final int bUsed = distanceTo(kvbidx, bufindex);
            final boolean bufsoftlimit = bUsed >= softLimit;
            if ((kvbend + METASIZE) % kvbuffer.length !=
                equator - (equator % METASIZE)) {
              // spill finished, reclaim space
              resetSpill();
              bufferRemaining = Math.min(
                  distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                  softLimit - bUsed) - METASIZE;
              continue;
            } else if (bufsoftlimit && kvindex != kvend) {
              // spill records, if any collected; check latter, as it may
              // be possible for metadata alignment to hit spill pcnt
              startSpill();
              final int avgRec = (int)
                (mapOutputByteCounter.getCounter() /
                mapOutputRecordCounter.getCounter());
              // leave at least half the split buffer for serialization data
              // ensure that kvindex >= bufindex
              final int distkvi = distanceTo(bufindex, kvbidx);
              final int newPos = (bufindex +
                Math.max(2 * METASIZE - 1,
                        Math.min(distkvi / 2,
                                 distkvi / (METASIZE + avgRec) * METASIZE)))
                % kvbuffer.length;
              setEquator(newPos);
              bufmark = bufindex = newPos;
              final int serBound = 4 * kvend;
              // bytes remaining before the lock must be held and limits
              // checked is the minimum of three arcs: the metadata space, the
              // serialization space, and the soft limit
              bufferRemaining = Math.min(
                  // metadata max
                  distanceTo(bufend, newPos),
                  Math.min(
                    // serialization max
                    distanceTo(newPos, serBound),
                    // soft limit
                    softLimit)) - 2 * METASIZE;
            }
          }
        } while (false);
      } finally {
        spillLock.unlock();
      }
    }

    try {
      // serialize key bytes into buffer
      int keystart = bufindex; //序列化后的key起始索引
      keySerializer.serialize(key);
      // key所占空间被bufvoid分隔,则移动key,
      if (bufindex < keystart) {
        // 将其值放在连续的空间中便于sort时key的对比
        // wrapped the key; must make contiguous
        bb.shiftBufferedKey();
        keystart = 0;
      }
      // serialize value bytes into buffer
      final int valstart = bufindex; //序列化后的value起始索引, 调用.serialize(), bufindex会移动的
      valSerializer.serialize(value);
      // It's possible for records to have zero length, i.e. the serializer
      // will perform no writes. To ensure that the boundary conditions are
      // checked and that the kvindex invariant is maintained, perform a
      // zero-length write into the buffer. The logic monitoring this could be
      // moved into collect, but this is cleaner and inexpensive. For now, it
      // is acceptable.
      bb.write(b0, 0, 0); //写入

      // the record must be marked after the preceding write, as the metadata
      // for this record are not yet written
      int valend = bb.markRecord();

      mapOutputRecordCounter.increment(1);
      mapOutputByteCounter.increment(
          distanceTo(keystart, valend, bufvoid));

      // write accounting info
      kvmeta.put(kvindex + PARTITION, partition); //存入partitionId,偏移kvindex2个位置,kvmata是int类型的
      kvmeta.put(kvindex + KEYSTART, keystart); //序列化后的key起始索引
      kvmeta.put(kvindex + VALSTART, valstart);
      kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); // 序列化后的value长度
      // advance kvindex
      kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity(); //kvindex往左移动METASIZE(16字节)
    } catch (MapBufferTooSmallException e) {
      LOG.info("Record too large for in-memory buffer: " + e.getMessage());
      spillSingleRecord(key, value, partition);
      mapOutputRecordCounter.increment(1);
      return;
    }
  }

  /**
   * Set the point from which meta and serialization data expand. The meta
   * indices are aligned with the buffer, so metadata never spans the ends of
   * the circular buffer.
   */
  private void setEquator(int pos) {  //设置赤道
    equator = pos;
    // set index prior to first entry, aligned at meta boundary
    final int aligned = pos - (pos % METASIZE);
    // Cast one of the operands to long to avoid integer overflow
    // 其位置在环形数组中相当于按照逆时针方向减去 METASIZE
    kvindex = (int)
      (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
    LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
        "(" + (kvindex * 4) + ")");
  }
  
  /**
   * Set position from last mark to end of writable buffer, then rewrite
   * the data between last mark and kvindex.
   * This handles a special case where the key wraps around the buffer.
   * If the key is to be passed to a RawComparator, then it must be
   * contiguous in the buffer. This recopies the data in the buffer back
   * into itself, but starting at the beginning of the buffer. Note that
   * this method should <b>only</b> be called immediately after detecting
   * this condition. To call it at any other time is undefined and would
   * likely result in data loss or corruption.
   * @see #markRecord()
   */
  protected void shiftBufferedKey() throws IOException { // 重新调整key的序列化位置,防止key被切割,这样就不方便对key排序
    // spillLock unnecessary; both kvend and kvindex are current
    int headbytelen = bufvoid - bufmark;
    bufvoid = bufmark;
    final int kvbidx = 4 * kvindex;
    final int kvbend = 4 * kvend;
    final int avail =
      Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
    if (bufindex + headbytelen < avail) {
      System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
      System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
      bufindex += headbytelen;
      bufferRemaining -= kvbuffer.length - bufvoid;
    } else {
      byte[] keytmp = new byte[bufindex];
      System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
      bufindex = 0;
      out.write(kvbuffer, bufmark, headbytelen);
      out.write(keytmp);
    }
  }

MapOutputBuffer基本实现:

    首先将 buffer包装成int类型的数组:

    buff数组正向存:序列化key value后的值 -- kvbuffer

    buff数组反向存:keyvalue的meta信息 [val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度]  -- kvmeta

          对于序列化值(k ,v),需要存的meta有四个,分别为 v.start, k.start, pId, v.len

      例:
        k1_ser_value占50字节
        v1_ser_value占50字节
        [k1_ser_value v1_ser_value, ..... ,  0, 50, 0, 50]
        数组左边存KeyValue序列化值,右边存[val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度],每个值用int存,长4个字节

        每一对输出所占的meta空间,每一个值占4个字节,所以总共占16字节

    kvbuffer 和 kvmeta 的分界线叫赤道,默认设置为索引0:

        以赤道为中心,序列化数据和meta数据分别向两边写

        在溢写时,在buffindex和kvindex中重新画赤道(空闲区域),然后分别往两边写数据,这样 可以在溢写时同时能就收受map输出

    当序列化后的key被bufvoid分隔时,则移动key

        防止key被切割是为了方便对key排序

现在当map一直输出,SpillThead 监测到 MapOutputBuffer达到阈值时,启动 溢写

类:MapTask.MapOutputBuffer.SpillThread

  protected class SpillThread extends Thread { // spill溢写线程

    @Override
    public void run() {
      spillLock.lock();
      spillThreadRunning = true;
      try {
        while (true) {
          spillDone.signal();
          while (!spillInProgress) {
            spillReady.await();
          }
          try {
            spillLock.unlock();
            sortAndSpill(); // 排序及溢写,进入
      // 省略
    }
  }

可见调用的是 sortAndSpill() 方法,进入:

  // 排序及溢写,有combiner执行combine
  private void sortAndSpill() throws IOException, ClassNotFoundException,
                                     InterruptedException {
      // 省略
      sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //排序
      int spindex = mstart;
      final IndexRecord rec = new IndexRecord();
      final InMemValBytes value = new InMemValBytes();
      for (int i = 0; i < partitions; ++i) { //按分区溢写
        IFile.Writer<K, V> writer = null;
        try {
          long segmentStart = out.getPos();
          FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
          writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
                                    spilledRecordsCounter);
          if (combinerRunner == null) { //如果未设置combiner,直接溢写
            // spill directly
            DataInputBuffer key = new DataInputBuffer();
            // 省略
          } else {
            int spstart = spindex;
            while (spindex < mend && //取这个分区的数据
                kvmeta.get(offsetFor(spindex % maxRec)
                          + PARTITION) == i) {
              ++spindex;
            }
            // Note: we would like to avoid the combiner if we've fewer
            // than some threshold of records for a partition
            if (spstart != spindex) {
              combineCollector.setWriter(writer);
              RawKeyValueIterator kvIter =
                new MRResultIterator(spstart, spindex);
              combinerRunner.combine(kvIter, combineCollector); //如果设置combiner了,则执行combine
            }
          }

          // close the writer
          writer.close();

      // 省略
      LOG.info("Finished spill " + numSpills);
      ++numSpills;
    } finally {
      if (out != null) out.close();
    }
  }

此方法执行 :

    排序:

          排序这里有优化是只需要排序kvmeta拿出来,根据索引反序列化出key的大小排序,然后按顺序取kvbuff就可以了

    溢写:

          有combiner就按分区执行combine

当map输出全部执行完成后,回去 MapTask runNewMapper() 方法看接来的代码:

  mapper.run(mapperContext); // 调用mapper的run方法,传入MapContext,进入
  mapPhase.complete(); // map进度完成
  setPhase(TaskStatus.Phase.SORT);  //标记开始排序sort阶段
  statusUpdate(umbilical);
  input.close(); //关闭RecordReader
  input = null;
  output.close(mapperContext); //执行flush,溢写buffer中剩下的,并执行merge,进入

 进入 output.close(mapperContext) 方法,output为 NewOutputCollector

  @Override
  public void close(TaskAttemptContext context
                    ) throws IOException,InterruptedException {
    try {
      collector.flush(); // 溢写剩下的
    } catch (ClassNotFoundException cnf) {
      throw new IOException("can't find class ", cnf);
    }
    collector.close();
  }

 查看 collector.flush() 方法,位于 MapOutputBuffer

  /**
   * 执行flush,溢写buffer中剩下的,并执行merge
   */
  public void flush() throws IOException, ClassNotFoundException,
         InterruptedException {
    // 省略
      if (kvindex != kvend) {
        kvend = (kvindex + NMETA) % kvmeta.capacity();
        bufend = bufmark;
        // 省略
        sortAndSpill(); //溢写剩下的
      }
    // 省略
    kvbuffer = null;
    mergeParts(); // 执行merge
    // 省略
  }

关闭前执行flush

    溢写buffer中剩下的;

    执行 merge

进入 mergeParts() 方法查看如何merge:

  // 最后执行merge
  private void mergeParts() throws IOException, InterruptedException, 
                                   ClassNotFoundException {
    // get the approximate size of the final output/index files
    // 省略

    // read in paged indices
    for (int i = indexCacheList.size(); i < numSpills; ++i) {
      Path indexFileName = mapOutputFile.getSpillIndexFile(i);
      indexCacheList.add(new SpillRecord(indexFileName, job));
    }

    //make correction in the length to include the sequence file header
    //lengths for each partition
    // 省略

    if (numSpills == 0) {
      // 省略
      return;
    }
    {
      sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
      
      IndexRecord rec = new IndexRecord();
      final SpillRecord spillRec = new SpillRecord(partitions);
      for (int parts = 0; parts < partitions; parts++) { // 遍历分区
        //create the segments to be merged
        // 省略
        // sort the segments only if there are intermediate merges
        boolean sortSegments = segmentList.size() > mergeFactor;
        //merge
        // 省略
        //write merged output to disk
        // 省略
        if (combinerRunner == null || numSpills < minSpillsForCombine) { // 判断溢写文件数量
          Merger.writeFile(kvIter, writer, reporter, job); // 直接输出
        } else { //如果设置了combiner且spill小文件数超过设置的combiner最小个数,执行combiner合并文件
          combineCollector.setWriter(writer);
          combinerRunner.combine(kvIter, combineCollector);
        }

        //close
        writer.close();

    // 省略
  }

merge

     将溢写出的小文件按分区合并成一个大文件;

     如果设置了combiner spill小文件数超过设置的combiner最小个数执行combiner合并文件;

 到此出MapTask部分主要代码读完


三、ReduceTask部分

每个ReduceTask从各个MapTask节点拉取属于自己分区的shuffle小文件;
然后进行 归并排序,为了防止内存溢出,不会完全归并成一个文件,而是一份在内存,一部分在磁盘;
然后在其上 逻辑包装一个迭代器,按归并算法读取;

ReduceTask类位置:org.apache.hadoop.mapred.ReduceTask ,首先查看 run() 方法:


  public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    // start thread that will handle communication with parent
    // 省略
    
    // 省略
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    // 省略
    
    // Initialize the codec
    codec = initCodec();
    RawKeyValueIterator rIter = null;
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    
    Class combinerClass = conf.getCombinerClass();
    CombineOutputCollector combineCollector = 
      (null != combinerClass) ? 
     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

     // mapreduce.job.reduce.shuffle.consumer.plugin.class
    Class<? extends ShuffleConsumerPlugin> clazz =
          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
					
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

    ShuffleConsumerPlugin.Context shuffleContext = 
      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                  super.lDirAlloc, reporter, codec, 
                  combinerClass, combineCollector, 
                  spilledRecordsCounter, reduceCombineInputCounter,
                  shuffledMapsCounter,
                  reduceShuffleBytes, failedShuffleCounter,
                  mergedMapOutputsCounter,
                  taskStatus, copyPhase, sortPhase, this,
                  mapOutputFile, localMapFiles);
    shuffleConsumerPlugin.init(shuffleContext);

    /**
     * 输入迭代器:
     * shuffleConsumerPlugin实际就是yarn配置文件中配置的mapreduc的shuffle
     * 反正里面的拉取数据,然后归并,返回一个迭代器
     * 但是这个归并不是完全归并,而是一部分在内存,一部分在磁盘
     * 只是在上面封装了一个迭代器
     * 这样假如数据量很大话不会内存溢出
     * (因为每个文件都是内部有序的所以可以这样包装)
     */
    rIter = shuffleConsumerPlugin.run();

    // free up the data structures
    mapOutputFilesOnDisk.clear();
    
    sortPhase.complete();                         // sort is complete 排序阶段结束
    setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass(); // 设置的 job.setMapOutputKeyClass()
    Class valueClass = job.getMapOutputValueClass(); // 设置的 job.setMapOutputValueClass()
    /**
     * 取用户设置的分组比较器,
     * 如果没有设置,取用户设置的排序比较器,
     * 如没有,再取key的比较器
     */
    RawComparator comparator = job.getOutputValueGroupingComparator();

    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass); // 进入
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }

    shuffleConsumerPlugin.close();
    done(umbilical, reporter);
  }

查看 runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass) 方法:

  /**
   *  设置输入:rIter{RawKeyValueIterator}
   *  设置输出:RecordWriter{trackedRW}
   *  创建任务上下文:taskContext{TaskAttemptContextImpl}
   *  创建用户的reducer:reducer
   *  创建reducer上下文:reducerContext{Reducer.Context}
   *  调用用户reducer的run方法:reducer.run(reducerContext);
   */
  @SuppressWarnings("unchecked")
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    // wrap value iterator to report progress.
    final RawKeyValueIterator rawIter = rIter;
    rIter = new RawKeyValueIterator() { // 包装了一下迭代器
      public void close() throws IOException {
        rawIter.close();
      }
      public DataInputBuffer getKey() throws IOException {
        return rawIter.getKey();
      }
      public Progress getProgress() {
        return rawIter.getProgress();
      }
      public DataInputBuffer getValue() throws IOException {
        return rawIter.getValue();
      }
      public boolean next() throws IOException {
        boolean ret = rawIter.next();
        reporter.setProgress(rawIter.getProgress().getProgress()); // 此处不同
        return ret;
      }
    };
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = //创建task任务的context
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
          getTaskID(), reporter);
    // make a reducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = //从context中反射得到 用户编写的Reducer
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = // 创建输出 RecordWriter,TextOutputFormat中构建的LineRecordWriter
      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
    job.setBoolean("mapred.skip.on", isSkipping());
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    /**
     *  创建用户编写的Reducer中的context
     *  reducerContext: 对象为WrappedReducer.context
     *  WrappedReducer.context包装的是ReduceContextImpl
     */
    org.apache.hadoop.mapreduce.Reducer.Context
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter, 
                                               reduceInputValueCounter, 
                                               trackedRW,
                                               committer,
                                               reporter, comparator, keyClass,
                                               valueClass);
    try {
      reducer.run(reducerContext); // 执行用户编写的Reducer中的run, 启动reduce,进入
    } finally {
      trackedRW.close(reducerContext);
    }
  }

  //NewTrackingRecordWriter类
  NewTrackingRecordWriter(ReduceTask reduce, org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
        throws InterruptedException, IOException {
      // 省略
      this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) reduce.outputFormat
          .getRecordWriter(taskContext); // TextOutputFormat中构建的LineRecordWriter
      // 省略
    }
    
  // TextOutputFormat
  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) 
        throws IOException, InterruptedException {
    Configuration conf = job.getConfiguration();
    // 省略
    if (!isCompressed) {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
      FSDataOutputStream fileOut = fs.create(file, false);
      //LineRecordWriter
      return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator);
    }
  }
  
  // org.apache.hadoop.mapred.Task类
  protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
    createReduceContext(org.apache.hadoop.mapreduce.Reducer
                        <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
                      Configuration job,
                      org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
                      RawKeyValueIterator rIter,
                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
                      org.apache.hadoop.mapreduce.OutputCommitter committer,
                      org.apache.hadoop.mapreduce.StatusReporter reporter,
                      RawComparator<INKEY> comparator,
                      Class<INKEY> keyClass, Class<INVALUE> valueClass
  ) throws IOException, InterruptedException {
    // ReduceContextImpl
    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    reduceContext = 
      new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);

    // 包装 ReduceContextImpl 成 WrappedReducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
      reducerContext = new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(reduceContext);

    return reducerContext;
  }

设置输入迭代器:rIter,实现:RawKeyValueIterator;

设置输出:RecordWriter,实现:NewTrackingRecordWriter,NewTrackingRecordWriter通过配置的 OutputFormat (默认 TextOutputFormat )中构建的 LineRecordWriter;

创建任务上下文:taskContext,实现:TaskAttemptContextImpl,继承自 JobContextImpl implements TaskAttemptContext;

创建用户的reducer:reducer,从taskContext中反射得到 用户编写的Reducer;

创建reducer上下文:reducerContext,reducerContext: 对象为WrappedReducer.context, WrappedReducer.context包装的是 ReduceContextImpl ;

调用用户reducer的run方法reducer.run(reducerContext);

进入 reducer.run(reducerContext) 方法:

  /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context); // setup
    try {
      // nextKey: 每一个组执行一个,因为已经排序的,所以每个组的数据连在一块
      // 调用的是ReduceContextImpl.nextKey(),nextKey()调用nextKeyValue
      while (context.nextKey()) { // 进入查看
        // (这个组的key的引用, 这个组的value的迭代器, reduceContext)
        //  组的value的迭代器在迭代的时候会更新CurrentKey,以确保reduce方法中key为每行的实际key
        // 调用的是ReduceContextImpl.getValues, values.iterator().next()里调用的还是nextKeyValue
        // 所以这个values并不是一个内存中的数据,而是每运行一次next()读取一行
        reduce(context.getCurrentKey(), context.getValues(), context);  // 进入查看
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); // 重置values的备份
        }
      }
    } finally {
      cleanup(context); // cleanup
    }
  }

Reduce原语 相同的Key调用一次reduce,而如何判断是相同的使用的是分组比较器;

run(context):

     这个context调用 nextKey() 方法判断还有没有组,

     有的话调用 reduce方法 ,传入 CurrentKey引用  和 这组values的迭代器

首先来查看 nextKey() ,实现为 ReduceContextImpl(WrappedReducer.context调用的ReduceContextImpl)

   /**
   * Start processing next unique key.
   * 返回是否有新的行, 新的行既新的组
   * 因为:
   *  1、用户在执行迭代器时会更新nextKeyIsSame的值
   *  2、当用户在reduce方法时未迭代完value时,此方法会清除掉这个组剩下的key
   */
  public boolean nextKey() throws IOException,InterruptedException {
    // 这个循环是假设用户的reducer中未迭代完一个组的values的数据,这个迭代完(清除)这个组剩下的
    // nextKeyIsSame默认值为false, 所以第一次不会运行
    while (hasMore && nextKeyIsSame) {
      nextKeyValue();
    }
    if (hasMore) {  // 如果还有输入
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue(); // 返回是否有新行,而 是不是一个组 在用户执行迭代器时更新
    } else {
      return false;
    }
  }

  // ReduceContextImpl类变量
  private RawKeyValueIterator input;
  private Counter inputValueCounter;
  private Counter inputKeyCounter;
  private RawComparator<KEYIN> comparator;
  private KEYIN key;                                  // current key 下一行的key
  private VALUEIN value;                              // current value  下一行的值
  private boolean firstValue = false;                 // first value in key
  private boolean nextKeyIsSame = false;              // more w/ this key 下一个key是不是还是一个组的
  private boolean hasMore;                            // more in file 是否还有输入
  protected Progressable reporter;
  private Deserializer<KEYIN> keyDeserializer; // key序列化器
  private Deserializer<VALUEIN> valueDeserializer;  // value序列化器
  private DataInputBuffer buffer = new DataInputBuffer(); // IO读取的 buffer 数组
  private BytesWritable currentRawKey = new BytesWritable();
  private ValueIterable iterable = new ValueIterable(); // 一个组的values迭代器,既reduce方法中传入的values
  private boolean isMarked = false;
  private BackupStore<KEYIN,VALUEIN> backupStore;
  private final SerializationFactory serializationFactory;
  // 省略
  
  // ReduceContextImpl构造方法
  public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
                           RawKeyValueIterator input, 
                           Counter inputKeyCounter,
                           Counter inputValueCounter,
                           RecordWriter<KEYOUT,VALUEOUT> output,
                           OutputCommitter committer,
                           StatusReporter reporter,
                           RawComparator<KEYIN> comparator,
                           Class<KEYIN> keyClass,
                           Class<VALUEIN> valueClass
                          ) throws InterruptedException, IOException{
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    this.serializationFactory = new SerializationFactory(conf);
    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
    this.keyDeserializer.open(buffer); // 这里传入的buffer
    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(buffer); // 这里传入的buffer
    hasMore = input.next();
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
  }

  /**
   * Advance to the next key/value pair.
   * 判断有没有新行
   * 同时更新currentKey,currentValue引用的值
   * 再更新firstValue:是否是一个新组
   * 再更新nextKeyIsSame:下一行是否是同一个组的
   */
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!hasMore) { // 没有新行返回false
      key = null;
      value = null;
      return false;
    }
    firstValue = !nextKeyIsSame; // 当nextKeyIsSame为false时,标记为新的一组
    DataInputBuffer nextKey = input.getKey(); //从内存或磁盘中去到先前执行input.next()读取的一行的key
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); // 添加序列化数据到buffer中
    /**
     * 反序列出key
     * 这里只传入key有点奇怪,其实是:context初始化时keyDeserializer.open(buffer)传入了buffer,所以这是从buffer中反序列化出key
     * keyDeserializer为用户设置的序列化器,默认为JavaSerializationDeserializer,还有AvroSerializer
     */
    key = keyDeserializer.deserialize(key);
    DataInputBuffer nextVal = input.getValue(); //从内存或磁盘中去到先前执行input.next()读取的一行的value
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
    // 反序列出value
    value = valueDeserializer.deserialize(value);

    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();

    if (isMarked) {
      backupStore.write(nextKey, nextVal);
    }

    hasMore = input.next(); // 读取一行, 放心,这一行的数据可以通过下次运行时执行input.getKey(),input.getValue()得到
    if (hasMore) { //如果还有一行
      nextKey = input.getKey();
      // 通过设置的组比较器判断这个key是不是同一个组的
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false; // 没有新行了,nextKeyIsSame赋值为false
    }
    inputValueCounter.increment(1);
    return true;  // 有新行直接放回true
  }

  public KEYIN getCurrentKey() {
    return key; // 直接返回引用
  }

  @Override
  public VALUEIN getCurrentValue() {
    return value; // 直接返回引用
  }

nextKey() : 返回是否有新的行, 新的行既新的组

 因为:

    1、因为已经排序的,所以每个组的数据连在一块,这里也说明 分组比较器强依赖于排序比较器;

    2、里面调用 nextKeyValue()  来判断 是否有新行;

    2、nextKeyIsSame标记下一行是否同一个组)的更新则是在:用户在遍历values时更新;

    3、当用户在reduce方法时未迭代完value时,此方法会清除掉这个组剩下的key;

nextKeyValue() 

    1、判断有没有新行;

    2、同时 更新 currentKey,currentValue 引用的值;

    3、再 更新 firstValue是否是一个新组;

    4、再 更新 nextKeyIsSame下一行是否是同一个组的;


回到 Reducer.run() ,再得到了判断了 nextKey()  为 true 后,

reduce(context.getCurrentKey(), context.getValues(), context) 方法:

进入 context.getValues() 查看,实现类为 ReduceContextImpl

  /**
   * Iterate through the values for the current key, reusing the same value 
   * object, which is stored in the context.
   * @return the series of values associated with the current key. All of the 
   * objects returned directly and indirectly from this method are reused.
   */
  public 
  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
    return iterable;
  }
  
  private ValueIterable iterable = new ValueIterable(); // 一个组的values迭代器,既reduce方法中传入的values
  
  protected class ValueIterable implements Iterable<VALUEIN> {
    private ValueIterator iterator = new ValueIterator(); // 一个组的values迭代器,既reduce方法中传入的values
    @Override
    public Iterator<VALUEIN> iterator() {
      return iterator;
    } 
  }

可知 values 实际为迭代器 ValueIterator

  // 一个组的values迭代器,既reduce方法中传入的values
  protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {

    private boolean inReset = false;
    private boolean clearMarkFlag = false;

    @Override
    public boolean hasNext() {  // 判断这个组还有没有值
      try {
        if (inReset && backupStore.hasNext()) {
          return true;
        } 
      } catch (Exception e) {
        e.printStackTrace();
        throw new RuntimeException("hasNext failed", e);
      }
      return firstValue || nextKeyIsSame; // 如果是这是一个新组或下一行也是这个组的,返回true
    }

    @Override
    public VALUEIN next() { // 得到这个组当前一行的key,value,注意currentKey也会更新
      if (inReset) {  // 如果用户不是第一遍迭代
        try {
          if (backupStore.hasNext()) {  // 从backupStore中缓存的取
            backupStore.next();
            DataInputBuffer next = backupStore.nextValue();
            buffer.reset(next.getData(), next.getPosition(), next.getLength()
                - next.getPosition());
            value = valueDeserializer.deserialize(value);
            return value;
          } else {
            inReset = false;
            backupStore.exitResetMode();
            if (clearMarkFlag) {
              clearMarkFlag = false;
              isMarked = false;
            }
          }
        } catch (IOException e) {
          e.printStackTrace();
          throw new RuntimeException("next value iterator failed", e);
        }
      } 

      // if this is the first record, we don't need to advance
      if (firstValue) { // 如果是第一行,在执行context.nextKey时就保存了这一行的key, value,直接返回
        firstValue = false;
        return value;
      }
      // if this isn't the first record and the next key is different, they
      // can't advance it here.
      if (!nextKeyIsSame) { // 如果不是第一行,但是下一行不是这个组的话,报错
        throw new NoSuchElementException("iterate past last value");
      }
      // otherwise, go to the next key/value pair
      try {
        nextKeyValue(); // 调用nextKeyValue更新currentKey,currentValue
        return value; // 返回currentValue
      } catch (IOException ie) {
        throw new RuntimeException("next value iterator failed", ie);
      } catch (InterruptedException ie) {
        // this is bad, but we can't modify the exception list of java.util
        throw new RuntimeException("next value iterator interrupted", ie);        
      }
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("remove not implemented");
    }

    @Override
    public void mark() throws IOException {
      // 省略
    }

    @Override
    public void reset() throws IOException {
      // We reached the end of an iteration and user calls a 
      // reset, but a clearMark was called before, just throw
      // an exception
      // 省略
      inReset = true;
      backupStore.reset();
    }

    @Override
    public void clearMark() throws IOException {
      if (getBackupStore() == null) {
        return;
      }
      if (inReset) {
        clearMarkFlag = true;
        backupStore.clearMark();
      } else {
        inReset = isMarked = false;
        backupStore.reinitialize();
      }
    }

用户迭代values时,是这样写的:

  Iterator<VALUEIN> iterator = getValues().iterator();
  while (iterator.hasNext()) {
      VALUEIN value = iterator.next();
  }

hasNext() : 判断这个组还有没有值

    通过 firstValue || nextKeyIsSame 判断,既如果是这是一个新组或下一行也是这个组的,返回true

    firstValue : 标记是否是这个组的第一个值firstValue的会在外部调用 nextKey(),然后 nextKey() 里调用 nextKeyValue() 时更新为true,

    而 nextKeyIsSame 也是在 调用 nextKeyValue() 时更新;

next() : 得到这个组当前一行的key, value,注意currentKey也会更新

    如果是 firstValue,就直接返回currentValue,原因是在调用nextKey()时,执行了一次nextKeyValue(),已经更新了currentKey,currentValue;

    如果不是,调用 nextKeyValue() 更新 currentKey,currentValue,同时更新 nextKeyIsSame ;

另外用户需要迭代多次时使用 backupStore 来备份数据:

  private BackupStore<KEYIN,VALUEIN> backupStore;
  
  // ValueIterator.next()方法
  public VALUEIN next() {
      if (inReset) {  // 如果用户不是第一轮迭代
        try {
          if (backupStore.hasNext()) {  // 从backupStore中缓存的取
            backupStore.next();
            DataInputBuffer next = backupStore.nextValue();
            buffer.reset(next.getData(), next.getPosition(), next.getLength()
                - next.getPosition());
            value = valueDeserializer.deserialize(value);
            return value;
          } else {
            inReset = false;
            backupStore.exitResetMode();
            if (clearMarkFlag) {
              clearMarkFlag = false;
              isMarked = false;
            }
          }
        } catch (IOException e) {
          e.printStackTrace();
          throw new RuntimeException("next value iterator failed", e);
        }
      }
      // 省略
      // 调用 nextKeyValue() 读取
  }

接着是用户调用 conntext.write(key, value) 输出数据,然后 ReduceTask将结果文件上传到hdfs;

ReduceTask部分主要源码阅读完成


四、总结

Client阶段

set conf

规划

检查输入输出路径

由InputFormat(FileInputFormat)计算split分片,

        splitSize = Max( minSize, Min(blockSize,maxSize) )

        splitSize切割文件的 split( path, splitOffset, splitLength, blockHosts)-- 计算向数据移动,数据本地化

复制job jar文件和配置到集群

提交任务到JobTracker(ResourceManager)


Map阶段

已map为中心来看的话,map首先需要输入(input)

而input由split对象和InputFormat(TextInputFormat)构成

然后调用input初始化方法( LineRecordReader.initialize() )

初始化完毕后,mapContext包装了input(LineRecordLine)里面的nextKeyValue()

同时也初始化output(RecordWriter),实现类为NewOutputCollector,在构造方法里

NewOutputCollector初始化了一个MapOutBuffer来接受Map输出的数据(设置buffer大小阈值,排序方法,比较器),并设置了combiner,启动了溢写线程,逻辑设计为环是不阻塞map输出,溢写时锁住已有数据

NewOutputCollector创建了分区器对象

然后在Mapper的run方法里调用nextKeyValue()获取currentKey,currentValue,(实际调用的是LineRecordLine的方法)

然后调用map方法

用户map输出调用context.write(),实际调用NewOutputCollector的write方法,获取分区号,然后再调用MapOutBuffer.collect(k,v,p)存入缓存环

接着不断执行map, 溢写线程发现达到阈值,执行溢写,(按分区排序,执行combiner,溢写成小文件)

最后map执行完了,output将MapOutBuffer里面的身下的数据溢写,并执行merge,如果发现spill小文件数量超过最小设置值,还会执行一次combiner

Reduce阶段

输入:

首先ReduceTask将shuffle小文件拉取过来,然后进行归并排序,这时为了防止内存溢出,不需要完成合成一个文件,只需要归并成几个大文件,一部分在内存一部分在磁盘

然后再在上层包装一层输入迭代器rIter{RawKeyValueIterator}然后按归并算法取就可以了。

然后设置分组比较器,

        设置输出:RecordWriter{trackedRW}

创建任务上下文:taskContext{taskContext}

创建用户的reducer:reducer

创建reducer上下文:reducerContext{Reducer.Context}

调用用户reducer的run方法:reducer.run(reducerContext);

    run方法里执行 

nextKey判断是否有新的组(里面执行nextKeyValue判断是否有新行,有新行既一组)

如果有新组,调用reduce方法,传入values迭代器

用户执行迭代器时:

hasNext:判断nextKeyValue里更新的nextKeyIsSame是否为true,true则此组有还有值

next: 调用nextKeyValue更新currentKey,currentKey值,同时判断下一行是否nextKeyIsSame

用户执行context.write()

reducerTask结束

添加新评论