本次阅读的是MapReduce中的核心源码,主要分成三个部分查看:
1、客户端Client部分
2、MapTask部分
3、ReduceTask部分
一、客户端Client部分
客户端在 MapReduce 中是一个重要的角色,主要负责 任务的初始化和Split切片的划分。
当我们执行 MapReduce 任务时,输入的命令是 Hadoop jar wc.jar WordCount.class input output
可以知道 MapReduce 程序的入口是我们编写的配置 Job 的 WordCount.class,
这个此类中我们工作也就是设置 Job 的参数然后 submit 提交,
所以我们在客户端的源码从 Job.waitForCompletion() 开始即可
Job类路径:org.apache.hadoop.mapreduce.Job ,注意是 mapreduce 包下
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit(); //提交任务,进入
}
if (verbose) { //是否打印日志
monitorAndPrintJob();
// 省略
}
然后再进入 submit() 方法:
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI(); //是否使用新api
connect(); //连接yarn,获取连接对象cluster
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
//ugi是UserGroupInformation类的实例,表示Hadoop中的用户和组信息
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster); //提交job, 进入
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
进入 submitter.submitJobInternal(Job.this, cluster) :
/**
* Internal method for submitting jobs to the system.
*
* <p>The job submission process involves:
* <ol>
* <li>
* Checking the input and output specifications of the job. //检查输入输出路径
* </li>
* <li>
* Computing the {@link InputSplit}s for the job. // 计算split分片
* </li>
* <li>
* Setup the requisite accounting information for the
* {@link DistributedCache} of the job, if necessary.
* </li>
* <li>
* Copying the job's jar and configuration to the map-reduce system //复制job jar文件和配置到集群
* directory on the distributed file-system.
* </li>
* <li>
* Submitting the job to the <code>JobTracker</code> and optionally
* monitoring it's status.
* </li>
* </ol></p>
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
checkSpecs(job); //检查输入输出路径,输出路径要为空
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf); // 将MapReduce框架加入分布式缓存中
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //初始化job的工作根目录
//configure the command line options correctly on the submitting dfs
// 省略
JobID jobId = submitClient.getNewJobID(); // submitClient的类型是ClientProtocol接口,用来进行JobClient和JobTracker之间的RPC通信
job.setJobID(jobId);
// 省略
copyAndConfigureFiles(job, submitJobDir); //复制配置文件
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir); //获取mapTask数量==split数量,进入
conf.setInt(MRJobConfig.NUM_MAPS, maps); // 设置map数量
LOG.info("number of splits:" + maps);
// 省略
// Write job file to submit dir
writeConf(conf, submitJobFile); //写入配置文件到提交目录
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
/**
* 最终submitClient提交作业
* 接下来查看 org.apache.hadoop.mapred.MapTask.run() ,注意是mapred不是mapreduce
*/
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
// 省略
}
可以看到在这里:
检查输入输出路径,输出路径要为空
将MapReduce框架加入分布式缓存中
初始化job的工作根目录
获取mapTask数量既split数量
写入配置文件到提交目录
进入 int maps = writeSplits(job, submitJobDir) 方法查看如何 划分Split 得到 MapTask的数目,
writeSplits() 调用了 writeNewSplits() ,进入
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
/**
* 根据配置反射得到InputFormat的实现类,
* 查看job.getInputFormatClass(),位置在JobContextImpl中,ctrl+alt+左键,进入
*/
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job); // 得到分片数组,进入(实现类为TextInputFormat父类FileInputFormat)
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest go first
// 按大小排序split数组,因为这些split不单单的一个文件,而是输入目录中所有文件的split
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length; //返回mapTask数量
}
首先 这里配置了输入格式化类 InputFormat,进入 job.getInputFormatClass() 查看配置,job实现类为 JobContextImpl
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
/**
* 如果conf中没有配置,默认为TextInputFormat
* job.setInputFormatClass();
* TextInputFormat继承自FileInputFormat
*/
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); // mapreduce.job.inputformat.class
}
可以看到 InputFormat 默认为继承自 FileInputFormat 的 TextInputFormat,而用户可以通过 job.setInputFormatClass() 配置。
返回 writeNewSplits() 方法,进入 input.getSplits(job) 方法查看 分片过程 ,input.getSplits() 在 FileInputFormat 中实现的:
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 最小值配置SPLIT_MINSIZE,默认值为1
long maxSize = getMaxSplitSize(job); // 最大值配置SPLIT_MINSIZE,默认值Long.MAX_VALUE
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>(); //存放获取的split对象
List<FileStatus> files = listStatus(job); //获取每个文件的信息
for (FileStatus file: files) { //遍历文件
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) { //如果是本地文件系统
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length); // 获取文件的block列表
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize(); //block size
/**
* split分片的大小
* Math.max(minSize, Math.min(maxSize, blockSize)
* 如果要比blockSize大,设置minSize比blockSize大
* 如果要比blockSize小,设置MaxSize比blockSize小
* 默认情况split大小就是block大小
* FileInputFotmat.setMaxInputSplitSize(job, size)
* conf.set("mapred.min.split.size", size);
*/
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //当剩余的字节数为切片大小的110%时才去寻找最适合这个split的blockLocation
// 获取一个split的最优的本地化位置,对应的block,规则是splitOffset在这个block上就行
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// 构建split对象 (filePath, spiltOffset, splitSize, blockHosts) 关键是blockHosts, 向该host发送MapTask任务
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// 省略
return splits;
}
/**
* Get the lower bound on split size imposed by the format.
* @return the number of bytes of the minimal split for this format
*/
protected long getFormatMinSplitSize() {
return 1; // 最小Split为1
}
/**
* Get the minimum split size
* @param job the job
* @return the minimum number of bytes that can be in a split
*/
public static long getMinSplitSize(JobContext job) {
// mapreduce.input.fileinputformat.split.minsize
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); //返回用设置的SPLIT_MINSIZE,默认1
}
/**
* Get the maximum split size.
* @param context the job to look at.
* @return the maximum number of bytes a split can include
*/
public static long getMaxSplitSize(JobContext context) {
// mapreduce.input.fileinputformat.split.maxsize
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE); // 返回用设置的SPLIT_MAXSIZE,默认去Long.MAX_VALUE
}
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
// 计算在那个block块上
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block? splitOffset在这个block上就行
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
// 省略
}
可以看到
Split切片大小的计算公式 为: Math.max(minSize, Math.min(maxSize, blockSize))
minSize,由用户设置( mapred.min.split.size 或 FileInputFotmat.setMinInputSplitSize(job, size) ),默认值 1;
maxSize,由用户设置( mapred.max.split.size 或 FileInputFotmat.setMaxInputSplitSize(job, size) ),默认值 Long.MAX_VALUE;
blockSize,由用户上传文件时设置( dfs.blocksize ),默认值 128M;
只要一个split的边界范围在一个block块上,那么这个block就被split包括;
split对象的组成 split(filePath, spiltOffset, splitSize, blockHosts) , 关键是 blockHosts , 向该host发送MapTask任务
回到 writeNewSplits() 方法:
List<InputSplit> splits = input.getSplits(job); // 得到分片数组,进入(实现类为TextInputFormat父类FileInputFormat)
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest go first
// 按大小排序split数组,因为这些split不单单是一个文件的,而是输入目录中所有文件的split
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length; //返回mapTask数量
可知 mapTask与split分片的数量相同
回到 submitJobInternal() 方法,
/**
* 最终submitClient提交作业
* 接下来查看 org.apache.hadoop.mapred.MapTask.run() ,注意是mapred不是mapreduce
*/
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
此时初始化配置完成,复制配置到Job目录,执行 status = submitClient.submitJob() 真正提交了任务;
客户端主要源码看完
框架分发MapTask到各个节点,处理属于它的split数据;
MapTask根据split的blockHosts来进行数据本地化分发,实现计算向数据移动;
MapTask类路径:org.apache.hadoop.mapred.MapTask ,注意是 mapred 包下,不是 mapredcue;
MapTask为一个进程,首先看 run() 方法:
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) //入口
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f); //设置进度权重
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
// 省略
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter); // runNewMapper
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
// 省略
}
查看 runNewMapper() 方法:
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = //创建taskContext,实现类为JobContextImpl
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // 通过反射得到mapper的实现类(用户设置的那个mapperClass,默认为Mapper.class)
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); //得到InputFormat, 默认为TextInputFormat
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()), //得到分配的split
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE> // 创建RecordReader,NewTrackingRecordReader为内部类
(split, inputFormat, reporter, taskContext); // 里面默认调用TextInputFormat.createRecordReader(split, taskContext)创建LineRecordReader
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null; // RecordWriter - context.write();
// get an output object 新建一个OutputCollector封装类
if (job.getNumReduceTasks() == 0) { // reduce的数量,job.setReduceTasks(num)
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter); //
} else {
// NewOutputCollector为内部类, 等看到代码context.write()时再来查看NewOutputCollector的构造方法
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output, //传入了RecordReader(LineRecordReader),RecordWriter(MapOutputCollector)
committer,
reporter, split); // 创建MapContext,实现类MapContextImpl,MapContextImpl是TaskInputOutputContextImpl的实现类
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext); //mapperContext调用的MapContextImpl的方法,// 用户mapper里调用context.write(k, v),执行的是WrappedMapper里的方法
try {
input.initialize(split, mapperContext); //调用LineRecordReader的initialize()初始化input,进入,里面细节多
mapper.run(mapperContext); // 调用mapper的run方法,传入MapContext,进入
mapPhase.complete(); // map进度完成
setPhase(TaskStatus.Phase.SORT); //开始排序sort阶段
statusUpdate(umbilical);
input.close(); //关闭RecordReader
input = null;
output.close(mapperContext); //执行flush,溢写buffer中剩下的,并执行merge,进入
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
反射mapper的代码: getMapperClass()
public Class<? extends Mapper<?,?,?,?>> getMapperClass()
throws ClassNotFoundException {
return (Class<? extends Mapper<?,?,?,?>>)
conf.getClass(MAP_CLASS_ATTR, Mapper.class); //用户设置mapperClass,默认为Mapper.class
}
创建 NewTrackingRecordReader 的代码,可知其实现类为 LineRecordReader
// 构造方法
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
// 省略
this.real = inputFormat.createRecordReader(split, taskContext); // 默认为调用TextInputFormat.createRecordReader(split, taskContext)创建LineRecordReader
// 省略
}
// TextInputFormat类
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
// 省略
return new LineRecordReader(recordDelimiterBytes); // LineRecordReader
}
进入 input.initialize(split, mapperContext) 既 LineRecordReader 的初始化:
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); // split行数
start = split.getStart(); // split 起始位置
end = start + split.getLength(); // split 结束位置
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file); // 打开了这个文件,返回文件流
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) { // 如果指定了压缩
// 省略
} else { // 如果没有指定压缩
fileIn.seek(start); // seek到split起始offset位置开始读取,因为mapTask已经优化成本地化执行了,所以相当于从本地读取
in = new UncompressedSplitLineReader( // SplitLineReader 切割行读取
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
// 如果不是第一个split切片,在初始化时候就舍弃掉第一行
// 原因是在split大小等于block块大小时,防止两个block把单词切割了(特别是中文编码),
// 而前一个Mapper会把多读一行的
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start; // split应该开始读的偏移量
}
这里两个重点:
1、通过流的 seek() 方法传入split的 offset 实现分布式运算;
2、除了第一个分片,放弃掉第一行 ,防止两个block把 单词或字符切割了;
回到 MapTask.runNewMapper(),进入 mapper.run(mapperContext) ,此时就执行 用户编写的mapper 了:
mapper类路径:org.apache.hadoop.mapreduce.mapper
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context); // 预先执行
try { //context中的mapContext的实现类为mapContextImpl
// mapContextImpl.nextKeyValue() 调用的 RecordReader.nextKeyValue(),RecordReader默认为LineRecordReader
while (context.nextKeyValue()) { // 是否有下一行,同时更新currentKey, currentValue,进入
map(context.getCurrentKey(), context.getCurrentValue(), context);
// 在Mapper实现类里面我们会调用context.write(key, value), 回去MapTask查看new NewOutputCollector
}
} finally {
cleanup(context); //最后执行
}
}
可知
此方法里用 nextKeyValue()(实现类为 LineRecordReader ) 判断是否有下一行,
有下一行就传入 currentKey,currentValue 给 用户Mapper 的 map方法
进入 nextKeyValue() 查看:
context.nextKeyValue() 实现为 mapContextImpl.nextKeyValue(),
mapContextImpl 调用的 RecordReader.nextKeyValue(),
RecordReader 默认为 LineRecordReader
/**
* 是否有下一行,同时更新currentKey, currentValue
*/
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable(); // 默认map输入key为long,记录offset
}
key.set(pos); // 修改key
if (value == null) {
value = new Text();
}
int newSize = 0; //记录读取的字节数
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark(); // 文件开始跳过编码
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); // 读取一行修改value
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false; //没有行了,返回false
} else {
return true;
}
}
private LongWritable key;
private Text value;
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
可知
nextKeyValue() 判断是 否有下一行的同时更新 会 currentKey, currentValue
使用 FileInputFormat 时mapper默认传入的 Key为LongWritable 类型,值为 偏移量offset
数据传入map方法后,在 Mapper实现类 里面我们会调用 context.write(key, value) 输出,
这时先回去 MapTask 的 runNewMapper() 方法里 查看 new NewOutputCollector() ,看看输出怎么实现的:
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter); //创造一个排序的collector输出池(MapOutputBuffer),进入
partitions = jobContext.getNumReduceTasks(); // 得到partition的数量,与reduceTask数量配置相同(mapreduce.job.reduces),默认为1
if (partitions > 1) {
// 当partition数量大于1,就用户自己配置了时,反射获取用户配置的分区器,默认为HashPartitioner
// job.setPartitionerClass();
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
// partition数量为1时,创建返回分区号为0的分区器
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
// JobContextImpl
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
// job.setPartitionerClass();
// 反射获取用户配置的分区器,默认为HashPartitioner
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
NewOutputCollector在初始化时:
创建了一个 有序的collector输出池 (MapOutputBuffer)
设置了 分区器,默认 分区数量与reduceTask数量相同,数目通过设置 mapreduce.job.reduces 或 job.setNumReduceTasks(),默认为 1
分区器,由配置 mapreduce.job.partitioner.class 或 job.setPartitionerClass() 反射得到,
未配置分区为 HashPartitioner
当reduceTask数量为1时,分区号为0
进入createSortingCollector() 查看输出池:
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
// 用户配置(mapreduce.job.map.output.collector.class)或默认取MapOutputBuffer内存环
Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
int remainingCollectors = collectorClasses.length;
for (Class clazz : collectorClasses) {
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job); // 最终在此反射
/**
* 设置溢写阈值(0.8),内存环大小(100M)
* 设置排序算法,默认快排
* 设置排序比较器(用户配置 || key比较器)
* 设置combiner (用户配置 || null)
* 设置merge时执行combine的最小文件数
* 启动守护溢写进程
*/
collector.init(context); // 内存环初始化,进入
// 省略
}
发现 输出池实现 默认为 MapTask.MapOutputBuffer,可由 mapreduce.job.map.output.collector.class 配置
进入 collector.init(context) 查看 MapOutputBuffer的初始化:
/**
* 设置溢写阈值(0.8),内存环大小(100M)
* 设置排序算法,默认快排
* 设置排序比较器(用户配置 || key比较器)
* 设置combiner (用户配置 || null)
* 设置merge时执行combine的最小文件数
* 启动守护溢写进程
*/
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
job = context.getJobConf();
reporter = context.getReporter();
mapTask = context.getMapTask();
mapOutputFile = mapTask.getMapOutputFile();
sortPhase = mapTask.getSortPhase();
spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks
// 溢写内存比例阈值,配置mapreduce.map.sort.spill.percent或默认0.8
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
// 缓存环大小默认值,配置mapreduce.task.io.sort.mb,默认100M
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
// 设置排序算法,配置map.sort.class,默认快排
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;// 可用空间调整为METASIZE既16的整数倍
kvbuffer = new byte[maxMemUsage]; // 定义缓存数组,buff环的意思不是环队列,而是首尾同时使用的数组
bufvoid = kvbuffer.length; //bufvoid 用于标识用于存放数据的最大位置。
kvmeta = ByteBuffer.wrap(kvbuffer) // 将kvbuffer包装成int类型的数组
.order(ByteOrder.nativeOrder())
.asIntBuffer();
// 设置 buf 和 kvmeta 的分界线,赤道
// 以赤道为中心,序列化数据和meta数据分别向两边写,
// 在溢写时,在buffindex和kvindex中重新画赤道,然后分别往两边写数据,这样可以在溢写时同时能就收受map输出
setEquator(0); // 还初始化了kvindex, 其位置在环形数组中相当于按照逆时针方向减去METASIZE
bufstart = bufend = bufindex = equator; // buff数组正向存序列化值的key value值
kvstart = kvend = kvindex; // buff数组反向存 [val序列化值起始index,key序列化值起始index,partitionId的值,val序列化值的长度]
/**
* k1_ser_value占50字节
* v1_ser_value占50字节
* [k1_ser_value v1_ser_value, ..... , 0, 50, 0, 50]
* 数组左边存KeyValue序列化值,右边存[val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度],每个值用int存,长4个字节
*/
// kvmeta 中存放元数据实体的最大个数
maxRec = kvmeta.capacity() / NMETA;
softLimit = (int)(kvbuffer.length * spillper);
bufferRemaining = softLimit; // 可用空间设置为溢写阈值,默认0.8
LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
// k/v serialization
/**
* 排序比较器
* 默认取用户设置的排序比较器,job.setComparatorClass()
* 没有配置,返回map输出中key对象的比较器
*/
comparator = job.getOutputKeyComparator();
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job); // 序列化
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
// 省略
// combiner map端合并
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
// JobContextImpl.getCombinerClass()
// 获取combiner类,并设置combiner分组比较器job.setCombinerKeyGroupingComparator()
// 配置mapreduce.job.combine.class 或 job.setCombinerClass()
// 默认没有,没有配置就不执行combiner
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
spillInProgress = false;
/**
* 每次溢写就会执行一次combiner,最后map输出完了,merge的时候触发combine的最小文件个数
* 配置mapreduce.map.combine.minspills,默认为3
* 也就是merge时候多于或等于三个小文件,就会执行combine
*/
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true); // SpillThread溢写守护进程
spillThread.setName("SpillThread");
spillLock.lock();
try {
spillThread.start(); // 启动溢写线程
// 省略
}
// MapOutputBuffer的变量
// k/v accounting
// 存放 meta 数据的 IntBuffer,都是int类型,(包装自kvbuffer, 本质是同一个数组)
private IntBuffer kvmeta; // metadata overlay on backing store
int kvstart; // marks origin of spill metadata 标记meta区域的边界起始
int kvend; // marks end of spill metadata 标记meta区域的边界结束
int kvindex; // marks end of fully serialized records 在kvbuffer中kvmeta的左边界索引
// 分割 meta 和 key value 内容的标识
// meta 数据和 key value 内容都存放在同一个环形缓冲区,所以需要分隔开
int equator; // marks origin of meta/serialization
int bufstart; // marks beginning of spill 标记序列化区域的边界起始
int bufend; // marks beginning of collectable 标记序列化区域的边界结束
int bufmark; // marks end of record
int bufindex; // marks end of collected 在kvbuffer中序列化数据的右边界索引
int bufvoid; // marks the point where we should stop reading at the end of the buffer , bufvoid 用于标识用于存放数据的最大位置。
// 存放 key value 的 byte 数组,单位是 byte,注意与 kvmeta 区分
byte[] kvbuffer; // main output buffer
private final byte[] b0 = new byte[0];
// buff数组反向存 [val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度] kvindex代表左边界的值
// key value索引 存放位置在kvbuffer中存放kvmata区域偏移自kvindex的距离,为int类型,转为byte,占4个byte字节
private static final int VALSTART = 0; // val offset in acct
private static final int KEYSTART = 1; // key offset in acct
// PARTITION分区号(直接存值)存放位置在kvbuffer中存放kvmata区域偏移自kvindex的距离,为int类型,转为byte,占4个byte字节
private static final int PARTITION = 2; // partition offset in acct
// value的序列化数据长度值 存放位置在kvbuffer中存放kvmata区域偏移自kvindex的距离,为int类型,转为byte,占4个byte字节
private static final int VALLEN = 3; // length of value
// 一对输出的meta数据在kvmeta中占用的int个数
private static final int NMETA = 4; // num meta ints
// 一对输出的meta数据在kvmeta中占用的byte个数
private static final int METASIZE = NMETA * 4; // size in bytes
public RawComparator getOutputKeyComparator() {
// 默认取用户设置的排序比较器 mapreduce.job.output.key.comparator.class
// job.setComparatorClass()
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
// 返回map输出中key对象的比较器
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
MapOutputBuffer数据结构为数组:
其逻辑包装成一个缓存环,目的是 在buffer在溢写时不阻塞Mapper的输出,buffer环的具体实现下面讲溢写时再讲;
在 init() 方法中,完成了下列配置:
设置 buffer环的溢写阈值:默认 0.8,可由 mapreduce.map.sort.spill.percent 配置;
设置 buffer环内存大小:默认 100M,可由 mapreduce.task.io.sort.mb 配置;
设置 溢写时的排序算法:默认 QuickSort快排,可由 map.sort.class 配置;
设置 排序比较器:默认为 map输出的key的比较器,可由 job.setComparatorClass() 或 mapreduce.job.output.key.comparator.class ;
设置 combiner:默认 没有combiner,设置了就运行,配置 mapreduce.job.combine.class 或 job.setCombinerClass() ;
combiner也可配置分组比较器:job.setCombinerKeyGroupingComparator()
设置 Map输出完最后merge时执行combine的最小文件数:默认为 3 个,可由 mapreduce.map.combine.minspills 配置;
启动 守护溢写进程spillThread:当达到配置的溢写阈值时就会溢写;
初始化看完后,当用户调用 context.write() 的时候:
查看:org.apache.hadoop.mapreduce.lib.map.WrappedMapper
@Override
public void write(KEYOUT key, VALUEOUT value) throws IOException,
InterruptedException {
mapContext.write(key, value); // 用户mapper里调用context.write(k, v),执行的是此处方法,MapContextImpl是TaskInputOutputContextImpl的实现类
}
TaskInputOutputContextImpl.write() :
/**
* Generate an output key/value pair.
*/
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException {
/**
* 调用的是RecordWriter,而RecordWriter的实现对象为NewOutputCollector
* NewOutputCollector里面是实例化了一个MapOutBuffer
* 实际中调用的是MapOutBuffer.collect()
*/
output.write(key, value);
}
MapTask.NewOutputCollector.write() :
@Override
public void write(K key, V value) throws IOException, InterruptedException {
/**
* 调用MapOutBuffer.collect()
* 顺便在写入buffer的时候就用分区器计算好了分区号
* collect(key, value, partitionId)
*/
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
在调用 MapOutBuffer.collect() 时就用 分区器计算好了分区号 :
collector.collect(key, value, partitionId) :
/**
* 往数组中存数据时的算法
* Serialize the key, value to intermediate storage.
* When this method returns, kvindex must refer to sufficient unused
* storage to store one METADATA.
*/
public synchronized void collect(K key, V value, final int partition
) throws IOException {
// 省略
/**
* ETASIZE = NMETA * 4
* 每一对输出所占的meta空间,每一个值占4个字节,所以总共占16字节
* 对于序列化值(k ,v),需要存的meta有四个,分别为 v.start, k.start, pId, v.len
* 例:
* k1_ser_value占50字节
* v1_ser_value占50字节
* [k1_ser_value v1_ser_value, ..... , 0, 50, 0, 50]
* 数组左边存KeyValue序列化值,右边存[val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度],每个值用int存,长4个字节
*/
bufferRemaining -= METASIZE; // bufferRemaining记录可用空间
if (bufferRemaining <= 0) {
// start spill if the thread is not running and the soft limit has been
// reached
spillLock.lock();
try {
do {
// 首次spill时,spillInProgress是false
if (!spillInProgress) {
final int kvbidx = 4 * kvindex; // byte位置
final int kvbend = 4 * kvend;
// serialized, unspilled bytes always lie between kvindex and
// bufindex, crossing the equator. Note that any void space
// created by a reset must be included in "used" bytes
final int bUsed = distanceTo(kvbidx, bufindex);
final boolean bufsoftlimit = bUsed >= softLimit;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
resetSpill();
bufferRemaining = Math.min(
distanceTo(bufindex, kvbidx) - 2 * METASIZE,
softLimit - bUsed) - METASIZE;
continue;
} else if (bufsoftlimit && kvindex != kvend) {
// spill records, if any collected; check latter, as it may
// be possible for metadata alignment to hit spill pcnt
startSpill();
final int avgRec = (int)
(mapOutputByteCounter.getCounter() /
mapOutputRecordCounter.getCounter());
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
// bytes remaining before the lock must be held and limits
// checked is the minimum of three arcs: the metadata space, the
// serialization space, and the soft limit
bufferRemaining = Math.min(
// metadata max
distanceTo(bufend, newPos),
Math.min(
// serialization max
distanceTo(newPos, serBound),
// soft limit
softLimit)) - 2 * METASIZE;
}
}
} while (false);
} finally {
spillLock.unlock();
}
}
try {
// serialize key bytes into buffer
int keystart = bufindex; //序列化后的key起始索引
keySerializer.serialize(key);
// key所占空间被bufvoid分隔,则移动key,
if (bufindex < keystart) {
// 将其值放在连续的空间中便于sort时key的对比
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
// serialize value bytes into buffer
final int valstart = bufindex; //序列化后的value起始索引, 调用.serialize(), bufindex会移动的
valSerializer.serialize(value);
// It's possible for records to have zero length, i.e. the serializer
// will perform no writes. To ensure that the boundary conditions are
// checked and that the kvindex invariant is maintained, perform a
// zero-length write into the buffer. The logic monitoring this could be
// moved into collect, but this is cleaner and inexpensive. For now, it
// is acceptable.
bb.write(b0, 0, 0); //写入
// the record must be marked after the preceding write, as the metadata
// for this record are not yet written
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));
// write accounting info
kvmeta.put(kvindex + PARTITION, partition); //存入partitionId,偏移kvindex2个位置,kvmata是int类型的
kvmeta.put(kvindex + KEYSTART, keystart); //序列化后的key起始索引
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); // 序列化后的value长度
// advance kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity(); //kvindex往左移动METASIZE(16字节)
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}
/**
* Set the point from which meta and serialization data expand. The meta
* indices are aligned with the buffer, so metadata never spans the ends of
* the circular buffer.
*/
private void setEquator(int pos) { //设置赤道
equator = pos;
// set index prior to first entry, aligned at meta boundary
final int aligned = pos - (pos % METASIZE);
// Cast one of the operands to long to avoid integer overflow
// 其位置在环形数组中相当于按照逆时针方向减去 METASIZE
kvindex = (int)
(((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
"(" + (kvindex * 4) + ")");
}
/**
* Set position from last mark to end of writable buffer, then rewrite
* the data between last mark and kvindex.
* This handles a special case where the key wraps around the buffer.
* If the key is to be passed to a RawComparator, then it must be
* contiguous in the buffer. This recopies the data in the buffer back
* into itself, but starting at the beginning of the buffer. Note that
* this method should <b>only</b> be called immediately after detecting
* this condition. To call it at any other time is undefined and would
* likely result in data loss or corruption.
* @see #markRecord()
*/
protected void shiftBufferedKey() throws IOException { // 重新调整key的序列化位置,防止key被切割,这样就不方便对key排序
// spillLock unnecessary; both kvend and kvindex are current
int headbytelen = bufvoid - bufmark;
bufvoid = bufmark;
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
final int avail =
Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
if (bufindex + headbytelen < avail) {
System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
bufindex += headbytelen;
bufferRemaining -= kvbuffer.length - bufvoid;
} else {
byte[] keytmp = new byte[bufindex];
System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
bufindex = 0;
out.write(kvbuffer, bufmark, headbytelen);
out.write(keytmp);
}
}
MapOutputBuffer基本实现:
首先将 buffer包装成int类型的数组:
buff数组正向存:序列化key value后的值 -- kvbuffer
buff数组反向存:keyvalue的meta信息 [val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度] -- kvmeta
对于序列化值(k ,v),需要存的meta有四个,分别为 v.start, k.start, pId, v.len
例:
k1_ser_value占50字节
v1_ser_value占50字节
[k1_ser_value v1_ser_value, ..... , 0, 50, 0, 50]
数组左边存KeyValue序列化值,右边存[val序列化值起始index, key序列化值起始index, partitionId的值, val序列化值的长度],每个值用int存,长4个字节
每一对输出所占的meta空间,每一个值占4个字节,所以总共占16字节
kvbuffer 和 kvmeta 的分界线叫赤道,默认设置为索引0:
以赤道为中心,序列化数据和meta数据分别向两边写,
在溢写时,在buffindex和kvindex中重新画赤道(空闲区域),然后分别往两边写数据,这样 可以在溢写时同时能就收受map输出
当序列化后的key被bufvoid分隔时,则移动key
防止key被切割是为了方便对key排序
现在当map一直输出,SpillThead 监测到 MapOutputBuffer达到阈值时,启动 溢写:
类:MapTask.MapOutputBuffer.SpillThread
protected class SpillThread extends Thread { // spill溢写线程
@Override
public void run() {
spillLock.lock();
spillThreadRunning = true;
try {
while (true) {
spillDone.signal();
while (!spillInProgress) {
spillReady.await();
}
try {
spillLock.unlock();
sortAndSpill(); // 排序及溢写,进入
// 省略
}
}
可见调用的是 sortAndSpill() 方法,进入:
// 排序及溢写,有combiner执行combine
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
// 省略
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //排序
int spindex = mstart;
final IndexRecord rec = new IndexRecord();
final InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) { //按分区溢写
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null) { //如果未设置combiner,直接溢写
// spill directly
DataInputBuffer key = new DataInputBuffer();
// 省略
} else {
int spstart = spindex;
while (spindex < mend && //取这个分区的数据
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector); //如果设置combiner了,则执行combine
}
}
// close the writer
writer.close();
// 省略
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
}
}
此方法执行 :
排序:
排序这里有优化是只需要排序kvmeta拿出来,根据索引反序列化出key的大小排序,然后按顺序取kvbuff就可以了
溢写:
有combiner就按分区执行combine
当map输出全部执行完成后,回去 MapTask 的 runNewMapper() 方法看接来的代码:
mapper.run(mapperContext); // 调用mapper的run方法,传入MapContext,进入
mapPhase.complete(); // map进度完成
setPhase(TaskStatus.Phase.SORT); //标记开始排序sort阶段
statusUpdate(umbilical);
input.close(); //关闭RecordReader
input = null;
output.close(mapperContext); //执行flush,溢写buffer中剩下的,并执行merge,进入
进入 output.close(mapperContext) 方法,output为 NewOutputCollector:
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush(); // 溢写剩下的
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
查看 collector.flush() 方法,位于 MapOutputBuffer:
/**
* 执行flush,溢写buffer中剩下的,并执行merge
*/
public void flush() throws IOException, ClassNotFoundException,
InterruptedException {
// 省略
if (kvindex != kvend) {
kvend = (kvindex + NMETA) % kvmeta.capacity();
bufend = bufmark;
// 省略
sortAndSpill(); //溢写剩下的
}
// 省略
kvbuffer = null;
mergeParts(); // 执行merge
// 省略
}
关闭前执行flush:
溢写buffer中剩下的;
执行 merge;
进入 mergeParts() 方法查看如何merge:
// 最后执行merge
private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
// 省略
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
//make correction in the length to include the sequence file header
//lengths for each partition
// 省略
if (numSpills == 0) {
// 省略
return;
}
{
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) { // 遍历分区
//create the segments to be merged
// 省略
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
// 省略
//write merged output to disk
// 省略
if (combinerRunner == null || numSpills < minSpillsForCombine) { // 判断溢写文件数量
Merger.writeFile(kvIter, writer, reporter, job); // 直接输出
} else { //如果设置了combiner且spill小文件数超过设置的combiner最小个数,执行combiner合并文件
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector);
}
//close
writer.close();
// 省略
}
merge:
将溢写出的小文件按分区合并成一个大文件;
如果设置了combiner 且 spill小文件数超过设置的combiner最小个数,执行combiner合并文件;
到此出MapTask部分主要代码读完
三、ReduceTask部分
每个ReduceTask从各个MapTask节点拉取属于自己分区的shuffle小文件;
然后进行 归并排序,为了防止内存溢出,不会完全归并成一个文件,而是一份在内存,一部分在磁盘;
然后在其上 逻辑包装一个迭代器,按归并算法读取;
ReduceTask类位置:org.apache.hadoop.mapred.ReduceTask ,首先查看 run() 方法:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
// 省略
// 省略
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
// 省略
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
// mapreduce.job.reduce.shuffle.consumer.plugin.class
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
/**
* 输入迭代器:
* shuffleConsumerPlugin实际就是yarn配置文件中配置的mapreduc的shuffle
* 反正里面的拉取数据,然后归并,返回一个迭代器
* 但是这个归并不是完全归并,而是一部分在内存,一部分在磁盘
* 只是在上面封装了一个迭代器
* 这样假如数据量很大话不会内存溢出
* (因为每个文件都是内部有序的所以可以这样包装)
*/
rIter = shuffleConsumerPlugin.run();
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete(); // sort is complete 排序阶段结束
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass(); // 设置的 job.setMapOutputKeyClass()
Class valueClass = job.getMapOutputValueClass(); // 设置的 job.setMapOutputValueClass()
/**
* 取用户设置的分组比较器,
* 如果没有设置,取用户设置的排序比较器,
* 如没有,再取key的比较器
*/
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass); // 进入
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
查看 runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass) 方法:
/**
* 设置输入:rIter{RawKeyValueIterator}
* 设置输出:RecordWriter{trackedRW}
* 创建任务上下文:taskContext{TaskAttemptContextImpl}
* 创建用户的reducer:reducer
* 创建reducer上下文:reducerContext{Reducer.Context}
* 调用用户reducer的run方法:reducer.run(reducerContext);
*/
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
// wrap value iterator to report progress.
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator() { // 包装了一下迭代器
public void close() throws IOException {
rawIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reporter.setProgress(rawIter.getProgress().getProgress()); // 此处不同
return ret;
}
};
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = //创建task任务的context
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(), reporter);
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = //从context中反射得到 用户编写的Reducer
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = // 创建输出 RecordWriter,TextOutputFormat中构建的LineRecordWriter
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
/**
* 创建用户编写的Reducer中的context
* reducerContext: 对象为WrappedReducer.context
* WrappedReducer.context包装的是ReduceContextImpl
*/
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,
valueClass);
try {
reducer.run(reducerContext); // 执行用户编写的Reducer中的run, 启动reduce,进入
} finally {
trackedRW.close(reducerContext);
}
}
//NewTrackingRecordWriter类
NewTrackingRecordWriter(ReduceTask reduce, org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
// 省略
this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) reduce.outputFormat
.getRecordWriter(taskContext); // TextOutputFormat中构建的LineRecordWriter
// 省略
}
// TextOutputFormat
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
// 省略
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
//LineRecordWriter
return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator);
}
}
// org.apache.hadoop.mapred.Task类
protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
createReduceContext(org.apache.hadoop.mapreduce.Reducer
<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
Configuration job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
RawKeyValueIterator rIter,
org.apache.hadoop.mapreduce.Counter inputKeyCounter,
org.apache.hadoop.mapreduce.Counter inputValueCounter,
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
org.apache.hadoop.mapreduce.OutputCommitter committer,
org.apache.hadoop.mapreduce.StatusReporter reporter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass, Class<INVALUE> valueClass
) throws IOException, InterruptedException {
// ReduceContextImpl
org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
reduceContext =
new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);
// 包装 ReduceContextImpl 成 WrappedReducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
reducerContext = new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(reduceContext);
return reducerContext;
}
设置输入迭代器:rIter,实现:RawKeyValueIterator;
设置输出:RecordWriter,实现:NewTrackingRecordWriter,NewTrackingRecordWriter通过配置的 OutputFormat (默认 TextOutputFormat )中构建的 LineRecordWriter;
创建任务上下文:taskContext,实现:TaskAttemptContextImpl,继承自 JobContextImpl implements TaskAttemptContext;
创建用户的reducer:reducer,从taskContext中反射得到 用户编写的Reducer;
创建reducer上下文:reducerContext,reducerContext: 对象为WrappedReducer.context, WrappedReducer.context包装的是 ReduceContextImpl ;
调用用户reducer的run方法:reducer.run(reducerContext);
进入 reducer.run(reducerContext) 方法:
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context); // setup
try {
// nextKey: 每一个组执行一个,因为已经排序的,所以每个组的数据连在一块
// 调用的是ReduceContextImpl.nextKey(),nextKey()调用nextKeyValue
while (context.nextKey()) { // 进入查看
// (这个组的key的引用, 这个组的value的迭代器, reduceContext)
// 组的value的迭代器在迭代的时候会更新CurrentKey,以确保reduce方法中key为每行的实际key
// 调用的是ReduceContextImpl.getValues, values.iterator().next()里调用的还是nextKeyValue
// 所以这个values并不是一个内存中的数据,而是每运行一次next()读取一行
reduce(context.getCurrentKey(), context.getValues(), context); // 进入查看
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); // 重置values的备份
}
}
} finally {
cleanup(context); // cleanup
}
}
Reduce原语 :相同的Key调用一次reduce,而如何判断是相同的使用的是分组比较器;
run(context):
这个context调用 nextKey() 方法判断还有没有组,
有的话调用 reduce方法 ,传入 CurrentKey引用 和 这组values的迭代器
首先来查看 nextKey() ,实现为 ReduceContextImpl(WrappedReducer.context调用的ReduceContextImpl):
/**
* Start processing next unique key.
* 返回是否有新的行, 新的行既新的组
* 因为:
* 1、用户在执行迭代器时会更新nextKeyIsSame的值
* 2、当用户在reduce方法时未迭代完value时,此方法会清除掉这个组剩下的key
*/
public boolean nextKey() throws IOException,InterruptedException {
// 这个循环是假设用户的reducer中未迭代完一个组的values的数据,这个迭代完(清除)这个组剩下的
// nextKeyIsSame默认值为false, 所以第一次不会运行
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) { // 如果还有输入
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue(); // 返回是否有新行,而 是不是一个组 在用户执行迭代器时更新
} else {
return false;
}
}
// ReduceContextImpl类变量
private RawKeyValueIterator input;
private Counter inputValueCounter;
private Counter inputKeyCounter;
private RawComparator<KEYIN> comparator;
private KEYIN key; // current key 下一行的key
private VALUEIN value; // current value 下一行的值
private boolean firstValue = false; // first value in key
private boolean nextKeyIsSame = false; // more w/ this key 下一个key是不是还是一个组的
private boolean hasMore; // more in file 是否还有输入
protected Progressable reporter;
private Deserializer<KEYIN> keyDeserializer; // key序列化器
private Deserializer<VALUEIN> valueDeserializer; // value序列化器
private DataInputBuffer buffer = new DataInputBuffer(); // IO读取的 buffer 数组
private BytesWritable currentRawKey = new BytesWritable();
private ValueIterable iterable = new ValueIterable(); // 一个组的values迭代器,既reduce方法中传入的values
private boolean isMarked = false;
private BackupStore<KEYIN,VALUEIN> backupStore;
private final SerializationFactory serializationFactory;
// 省略
// ReduceContextImpl构造方法
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer); // 这里传入的buffer
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer); // 这里传入的buffer
hasMore = input.next();
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}
/**
* Advance to the next key/value pair.
* 判断有没有新行
* 同时更新currentKey,currentValue引用的值
* 再更新firstValue:是否是一个新组
* 再更新nextKeyIsSame:下一行是否是同一个组的
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!hasMore) { // 没有新行返回false
key = null;
value = null;
return false;
}
firstValue = !nextKeyIsSame; // 当nextKeyIsSame为false时,标记为新的一组
DataInputBuffer nextKey = input.getKey(); //从内存或磁盘中去到先前执行input.next()读取的一行的key
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); // 添加序列化数据到buffer中
/**
* 反序列出key
* 这里只传入key有点奇怪,其实是:context初始化时keyDeserializer.open(buffer)传入了buffer,所以这是从buffer中反序列化出key
* keyDeserializer为用户设置的序列化器,默认为JavaSerializationDeserializer,还有AvroSerializer
*/
key = keyDeserializer.deserialize(key);
DataInputBuffer nextVal = input.getValue(); //从内存或磁盘中去到先前执行input.next()读取的一行的value
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition());
// 反序列出value
value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
if (isMarked) {
backupStore.write(nextKey, nextVal);
}
hasMore = input.next(); // 读取一行, 放心,这一行的数据可以通过下次运行时执行input.getKey(),input.getValue()得到
if (hasMore) { //如果还有一行
nextKey = input.getKey();
// 通过设置的组比较器判断这个key是不是同一个组的
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false; // 没有新行了,nextKeyIsSame赋值为false
}
inputValueCounter.increment(1);
return true; // 有新行直接放回true
}
public KEYIN getCurrentKey() {
return key; // 直接返回引用
}
@Override
public VALUEIN getCurrentValue() {
return value; // 直接返回引用
}
nextKey() : 返回是否有新的行, 新的行既新的组
因为:
1、因为已经排序的,所以每个组的数据连在一块,这里也说明 分组比较器强依赖于排序比较器;
2、里面调用 nextKeyValue() 来判断 是否有新行;
2、nextKeyIsSame(标记下一行是否同一个组)的更新则是在:用户在遍历values时更新;
3、当用户在reduce方法时未迭代完value时,此方法会清除掉这个组剩下的key;
nextKeyValue() :
1、判断有没有新行;
2、同时 更新 currentKey,currentValue 引用的值;
3、再 更新 firstValue:是否是一个新组;
4、再 更新 nextKeyIsSame:下一行是否是同一个组的;
回到 Reducer.run() ,再得到了判断了 nextKey() 为 true 后,
看 reduce(context.getCurrentKey(), context.getValues(), context) 方法:
进入 context.getValues() 查看,实现类为 ReduceContextImpl :
/**
* Iterate through the values for the current key, reusing the same value
* object, which is stored in the context.
* @return the series of values associated with the current key. All of the
* objects returned directly and indirectly from this method are reused.
*/
public
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}
private ValueIterable iterable = new ValueIterable(); // 一个组的values迭代器,既reduce方法中传入的values
protected class ValueIterable implements Iterable<VALUEIN> {
private ValueIterator iterator = new ValueIterator(); // 一个组的values迭代器,既reduce方法中传入的values
@Override
public Iterator<VALUEIN> iterator() {
return iterator;
}
}
可知 values 实际为迭代器 ValueIterator :
// 一个组的values迭代器,既reduce方法中传入的values
protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
private boolean inReset = false;
private boolean clearMarkFlag = false;
@Override
public boolean hasNext() { // 判断这个组还有没有值
try {
if (inReset && backupStore.hasNext()) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("hasNext failed", e);
}
return firstValue || nextKeyIsSame; // 如果是这是一个新组或下一行也是这个组的,返回true
}
@Override
public VALUEIN next() { // 得到这个组当前一行的key,value,注意currentKey也会更新
if (inReset) { // 如果用户不是第一遍迭代
try {
if (backupStore.hasNext()) { // 从backupStore中缓存的取
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {
inReset = false;
backupStore.exitResetMode();
if (clearMarkFlag) {
clearMarkFlag = false;
isMarked = false;
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("next value iterator failed", e);
}
}
// if this is the first record, we don't need to advance
if (firstValue) { // 如果是第一行,在执行context.nextKey时就保存了这一行的key, value,直接返回
firstValue = false;
return value;
}
// if this isn't the first record and the next key is different, they
// can't advance it here.
if (!nextKeyIsSame) { // 如果不是第一行,但是下一行不是这个组的话,报错
throw new NoSuchElementException("iterate past last value");
}
// otherwise, go to the next key/value pair
try {
nextKeyValue(); // 调用nextKeyValue更新currentKey,currentValue
return value; // 返回currentValue
} catch (IOException ie) {
throw new RuntimeException("next value iterator failed", ie);
} catch (InterruptedException ie) {
// this is bad, but we can't modify the exception list of java.util
throw new RuntimeException("next value iterator interrupted", ie);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove not implemented");
}
@Override
public void mark() throws IOException {
// 省略
}
@Override
public void reset() throws IOException {
// We reached the end of an iteration and user calls a
// reset, but a clearMark was called before, just throw
// an exception
// 省略
inReset = true;
backupStore.reset();
}
@Override
public void clearMark() throws IOException {
if (getBackupStore() == null) {
return;
}
if (inReset) {
clearMarkFlag = true;
backupStore.clearMark();
} else {
inReset = isMarked = false;
backupStore.reinitialize();
}
}
而 用户迭代values时,是这样写的:
Iterator<VALUEIN> iterator = getValues().iterator();
while (iterator.hasNext()) {
VALUEIN value = iterator.next();
}
hasNext() : 判断这个组还有没有值
通过 firstValue || nextKeyIsSame 判断,既如果是这是一个新组或下一行也是这个组的,返回true
firstValue : 标记是否是这个组的第一个值,firstValue的会在外部调用 nextKey(),然后 nextKey() 里调用 nextKeyValue() 时更新为true,
而 nextKeyIsSame 也是在 调用 nextKeyValue() 时更新;
next() : 得到这个组当前一行的key, value,注意currentKey也会更新
如果是 firstValue,就直接返回currentValue,原因是在调用nextKey()时,执行了一次nextKeyValue(),已经更新了currentKey,currentValue;
如果不是,调用 nextKeyValue() 更新 currentKey,currentValue,同时更新 nextKeyIsSame ;
另外用户需要迭代多次时使用 backupStore 来备份数据:
private BackupStore<KEYIN,VALUEIN> backupStore;
// ValueIterator.next()方法
public VALUEIN next() {
if (inReset) { // 如果用户不是第一轮迭代
try {
if (backupStore.hasNext()) { // 从backupStore中缓存的取
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {
inReset = false;
backupStore.exitResetMode();
if (clearMarkFlag) {
clearMarkFlag = false;
isMarked = false;
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("next value iterator failed", e);
}
}
// 省略
// 调用 nextKeyValue() 读取
}
接着是用户调用 conntext.write(key, value) 输出数据,然后 ReduceTask将结果文件上传到hdfs;
ReduceTask部分主要源码阅读完成
四、总结
Client阶段
set conf
规划
检查输入输出路径
由InputFormat(FileInputFormat)计算split分片,
splitSize = Max( minSize, Min(blockSize,maxSize) )
由splitSize切割文件的 split( path, splitOffset, splitLength, blockHosts)-- 计算向数据移动,数据本地化
复制job jar文件和配置到集群
提交任务到JobTracker(ResourceManager)
Map阶段
已map为中心来看的话,map首先需要输入(input)
而input由split对象和InputFormat(TextInputFormat)构成
然后调用input初始化方法( LineRecordReader.initialize() )
初始化完毕后,mapContext包装了input(LineRecordLine)里面的nextKeyValue()
同时也初始化output(RecordWriter),实现类为NewOutputCollector,在构造方法里
NewOutputCollector初始化了一个MapOutBuffer来接受Map输出的数据(设置buffer大小阈值,排序方法,比较器),并设置了combiner,启动了溢写线程,逻辑设计为环是不阻塞map输出,溢写时锁住已有数据
NewOutputCollector创建了分区器对象
然后在Mapper的run方法里调用nextKeyValue()获取currentKey,currentValue,(实际调用的是LineRecordLine的方法)
然后调用map方法
用户map输出调用context.write(),实际调用NewOutputCollector的write方法,获取分区号,然后再调用MapOutBuffer.collect(k,v,p)存入缓存环
接着不断执行map, 溢写线程发现达到阈值,执行溢写,(按分区排序,执行combiner,溢写成小文件)
最后map执行完了,output将MapOutBuffer里面的身下的数据溢写,并执行merge,如果发现spill小文件数量超过最小设置值,还会执行一次combiner
Reduce阶段
输入:
首先ReduceTask将shuffle小文件拉取过来,然后进行归并排序,这时为了防止内存溢出,不需要完成合成一个文件,只需要归并成几个大文件,一部分在内存一部分在磁盘
然后再在上层包装一层输入迭代器rIter{RawKeyValueIterator}然后按归并算法取就可以了。
然后设置分组比较器,
设置输出:RecordWriter{trackedRW}
创建任务上下文:taskContext{taskContext}
创建用户的reducer:reducer
创建reducer上下文:reducerContext{Reducer.Context}
调用用户reducer的run方法:reducer.run(reducerContext);
run方法里执行
nextKey判断是否有新的组(里面执行nextKeyValue判断是否有新行,有新行既一组)
如果有新组,调用reduce方法,传入values迭代器
用户执行迭代器时:
hasNext:判断nextKeyValue里更新的nextKeyIsSame是否为true,true则此组有还有值
next: 调用nextKeyValue更新currentKey,currentKey值,同时判断下一行是否nextKeyIsSame
用户执行context.write()
reducerTask结束