Wordcount的例子基本接触过hadoop都会跑过,本篇的主要分析都以Wordcount为例。
我们先把etc/hadoop/下配置文件作为Wordcount的输入
hadoop fs -put etc/hadoop/* /input
然后打开调试运行以下命令
hadoophadoop-mapreduce-examples-xxx.jar wordcount /input /output
我们可以看到入口类Wordcount.java如下
入口类Wordcount.java
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
...
注意准备好job以后就会调用
job.waitForCompletion(true) 跟踪进入
job.waitForCompletion(true) -> submit() -> submitter.submitJobInternal(Job.this, cluster)
```
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
......
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
......
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
```
我们可以发现如下几个核心代码
1、 jobStagingArea
2、copyAndConfigureFiles
3、writeSplits
4、writeConf
jobStagingArea
从MR的原理去理解,job因为包含了MR运行所需要的源码和信息,所以必须是分布式计算都可以访问到的目录,我们跟进可以发现
```
private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
}
```
MR_AM_STAGING_DIR(yarn.app.mapreduce.am.staging-dir)的默认值
/tmp/hadoop-yarn/staging,所以我们可以在共享存储下的/tmp/hadoop-yarn/staging/root/.staging(root运行)看到的job目录
copyAndConfigureFiles
confFile主要有tmpfiles,tmpjars,tmparchives,jobJar这几种,在wordcount中只有jobjar是有值的,看到临时目录如下
writeSplits
writeSplits(job, submitJobDir) -> writeNewSplits(job, jobSubmitDir) -> input.getSplits(job)
Wordcount的InputFormat是使用的FileInputFormat,
研究发现就是根据每个文件的BlockLocation来进行split,这里有两个概念需要我们了解:文件的BlockLocation和计算数据本地化。
我们先考虑以下这个问题,抛开mr框架,让我们自己做wordcount,我们会怎么做?
一般来讲肯定会把文件分成好几份,传到每个机器上,然后再做计算,这样计算的时候不用再下载数据。
那对于hdfs,上传到hdfs的文件有BlockLocation属性,代表hdfs文件实际数据在那些节点上,当在这些节点上去读取此数据段时,就是从本地磁盘上读的,而不是从网络传输过来。MR就用到了这个特性,先获取文件的块信息,然后在map计算的时候,尽量把map分配到这个文件数据块所在机器上,这个就是我们常说的计算数据本地化。
此过程完成后文件目录如下
writeConf
上传job.xml,此过程完成后文件目录如下
以上结束后job就可以提交了。