java - 为什么这个使用 Combiner 类的 Hadoop 示例不能正常工作? (不要执行Combiner提供的 "local reduction")

我是 Hadoop 的新手,我正在做一些实验,尝试使用 Combiner 类在映射器的同一节点上本地执行 reduce 操作。我正在使用 Hadoop 1.2.1。

所以我有这 3 个类:

// Learning MapReduce by Nitesh Jain
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

* Extend Configured class: g
* Implement Tool interface:
public class WordCountWithCombiner extends Configured implements Tool{

public int run(String[] args) throws Exception {
Configuration conf = getConf();

Job job = new Job(conf, "MyJob"); // Job is a "dashboard" with levers to control the execution of the job

job.setJarByClass(WordCountWithCombiner.class); // Name of the driver class into the jar
job.setJobName("Word Count With Combiners"); // Set the name of the job

FileInputFormat.addInputPath(job, new Path(args[0])); // The input file is the first paramether of the main() method
FileOutputFormat.setOutputPath(job, new Path(args[1])); // The output file is the second paramether of the main() method

job.setMapperClass(WordCountMapper.class); // Set the mapper class

/* Set the combiner: the combiner is a reducer performed locally on the same mapper node (we are resusing the previous WordCountReduces
* class because it perform the same task, but locally to the mapper):
job.setReducerClass(WordCountReducer.class); // Set the reducer class


return job.waitForCompletion(true) ? 0 : 1;


public static void main(String[] args) throws Exception {
/* The ToolRunner object is used to trigger the run() function which contains all the batch execution logic.
* What it does is gie the ability to set properties at the own time so we need not to write a single line of code to handle it
int exitCode = Configuration(), new WordCountWithCombiner(), args);


// Learning MapReduce by Nitesh J.
// Word Count Mapper.
import java.util.StringTokenizer;

import; // Similiar to Int
import; // Similar to Long
import; // Similar to String

import org.apache.hadoop.mapreduce.Mapper;

/* Every mapper class extend the Hadoop Mapper class.
* @param input key (the progressive number)
* @param input type (it is a word so something like a String)
* @param output key
* @param output value
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

/* Override the map() function defined by the Mapper extended class:
* The input parameter have to match with these defined into the extended Mapper class
* @param context: is used to cast the output key and value paired.
* Tokenize the string into words and write these words into the context with words as key, and one (1) as value for each word
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);

while (itr.hasMoreTokens()) {
//just added the below line to convert everything to lower case
// the following check is that the word starts with an alphabet.
context.write(word, one);


// Learning MapReduce by Nitesh Jain

import org.apache.hadoop.mapreduce.Reducer;

/* Every reduceer calss have to extender the Hadoop Reducer class
* @param the mapper output key (text, the word)
* @param the mapper output value (the number of occurrence of the related word: 1)
* @param the redurcer output key (the word)
* @param the reducer output value (the number of occurrence of the related word)
* Have to map the Mapper() param
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

* I have to override the reduce() function defined by the extended Reducer class
* @param key: the current word
* @param Iterable<IntWritable> values: because the input of the recudce() function is a key and a list of values associated to this key
* @param context: collects the output <key, values> pairs
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

int sum = 0;
for (IntWritable value : values) {
sum += value.get();
context.write(key, new IntWritable(sum));


正如您在 WordCountWithCombiner 驱动程序类中看到的那样,我已将 WordCountReducer 类设置为组合器以直接在映射器节点上执行缩减,通过以下行:


然后我在 Hadoop 文件系统上有这个输入文件:

andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop fs -cat  in
to be or not to be


如果我通过 ma​​preduce 的 2 阶段以经典方式执行前一批,它工作正常,实际上在 Linux shell 中执行此语句:

andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop jar WordCount.jar WordCountWithCombiner in out6

Hadoop 让它工作,然后我得到了预期的结果:

andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop fs -cat  out6/p*
be 2
not 1
or 1
to 2


问题是现在我不想执行 reduce 阶段,我希望得到相同的结果,因为我已经设置了在 reducer 的同一节点上执行相同操作的组合器。

因此,在 Linux shell 中,我执行排除了 reducer 阶段的语句:

hadoop jar WordCountWithCombiner.jar WordCountWithCombiner -D mapred.reduce.tasks=0 in out7


andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop jar WordCountWithCombiner.jar WordCountWithCombiner -D mapred.reduce.tasks=0 in out7
16/02/13 19:43:44 INFO input.FileInputFormat: Total input paths to process : 1
16/02/13 19:43:44 INFO util.NativeCodeLoader: Loaded the native-hadoop library
16/02/13 19:43:44 WARN snappy.LoadSnappy: Snappy native library not loaded
16/02/13 19:43:45 INFO mapred.JobClient: Running job: job_201601242121_0008
16/02/13 19:43:46 INFO mapred.JobClient: map 0% reduce 0%
16/02/13 19:44:00 INFO mapred.JobClient: map 100% reduce 0%
16/02/13 19:44:05 INFO mapred.JobClient: Job complete: job_201601242121_0008
16/02/13 19:44:05 INFO mapred.JobClient: Counters: 19
16/02/13 19:44:05 INFO mapred.JobClient: Job Counters
16/02/13 19:44:05 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=18645
16/02/13 19:44:05 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
16/02/13 19:44:05 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
16/02/13 19:44:05 INFO mapred.JobClient: Launched map tasks=1
16/02/13 19:44:05 INFO mapred.JobClient: Data-local map tasks=1
16/02/13 19:44:05 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
16/02/13 19:44:05 INFO mapred.JobClient: File Output Format Counters
16/02/13 19:44:05 INFO mapred.JobClient: Bytes Written=31
16/02/13 19:44:05 INFO mapred.JobClient: FileSystemCounters
16/02/13 19:44:05 INFO mapred.JobClient: HDFS_BYTES_READ=120
16/02/13 19:44:05 INFO mapred.JobClient: FILE_BYTES_WRITTEN=55503
16/02/13 19:44:05 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=31
16/02/13 19:44:05 INFO mapred.JobClient: File Input Format Counters
16/02/13 19:44:05 INFO mapred.JobClient: Bytes Read=19
16/02/13 19:44:05 INFO mapred.JobClient: Map-Reduce Framework
16/02/13 19:44:05 INFO mapred.JobClient: Map input records=1
16/02/13 19:44:05 INFO mapred.JobClient: Physical memory (bytes) snapshot=93282304
16/02/13 19:44:05 INFO mapred.JobClient: Spilled Records=0
16/02/13 19:44:05 INFO mapred.JobClient: CPU time spent (ms)=2870
16/02/13 19:44:05 INFO mapred.JobClient: Total committed heap usage (bytes)=58195968
16/02/13 19:44:05 INFO mapred.JobClient: Virtual memory (bytes) snapshot=682741760
16/02/13 19:44:05 INFO mapred.JobClient: Map output records=6
16/02/13 19:44:05 INFO mapred.JobClient: SPLIT_RAW_BYTES=101
andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop fs -cat out7/p*to 1
be 1
or 1
not 1
to 1
be 1

如您所见,Combiner 提供的本地缩减似乎不起作用。



不要假设组合器会运行。仅将组合器视为优化。 Combiner 不保证运行所有数据。在某些不需要将数据溢出到磁盘的情况下,MapReduce 将完全跳过使用 Combiner。另请注意,组合器可能会在数据子集上运行多次!它会在每次溢出时运行一次。

因此,当 reducer 的数量设置为 0 时,这实际上并不意味着它应该给出正确的结果,因为所有映射器数据都没有被组合器覆盖。

