gpt4 book ai didi

java - 第一次使用Hadoop,MapReduce Job没有运行Reduce Phase

转载 作者:可可西里 更新时间:2023-11-01 14:18:10 24 4
gpt4 key购买 nike

我编写了一个简单的 map reduce 作业,它会从 DFS 中读取数据并在其上运行一个简单的算法。在尝试调试它时,我决定简单地让映射器输出一组键和值,而缩减器输出一组完全不同的键和值。我在单节点 Hadoop 20.2 集群上运行这个作业。作业完成后,输出仅包含映射器输出的值,让我相信 reducer 没有运行。如果有人对我的代码产生这种输出的原因提供任何见解,我将不胜感激。我已经尝试将 outputKeyClass 和 outputValueClass 设置为不同的东西,以及将 setMapOutputKeyClass 和 setMapOutputValueClass 设置为不同的东西。目前注释我们的代码部分是我正在运行的算法,但我已经更改了 map 和 reduce 方法以简单地输出某些值。同样,作业的输出仅包含映射器输出的值。这是我用来运行作业的类:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class CalculateHistogram {

public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {

private static final int R = 100;
private int n = 0;

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (n == 0) {
StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
int counter = 0;
while (tokens.hasMoreTokens()) {
String token = tokens.nextToken();
if (tokens.hasMoreTokens()) {
context.write(new LongWritable(-2), new Text("HI"));
//context.write(new LongWritable(counter), new Text(token));
}
counter++;
n++;
}
} else {
n++;
if (n == R) {
n = 0;
}

}
}
}

public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {

private final static int R = 10;

public void reduce(LongWritable key, Iterator<Text> values, Context context)
throws IOException, InterruptedException {
if (key.toString().equals("-1")) {
//context.write(key, new HistogramBucket(key));
}
Text t = values.next();
for (char c : t.toString().toCharArray()) {
if (!Character.isDigit(c) && c != '.') {
//context.write(key, new HistogramBucket(key));//if this isnt a numerical attribute we ignore it
}
}
context.setStatus("Building Histogram");
HistogramBucket i = new HistogramBucket(key);
i.add(new DoubleWritable(Double.parseDouble(t.toString())));
while (values.hasNext()) {
for (int j = 0; j < R; j++) {
t = values.next();
}
if (!i.contains(Double.parseDouble(t.toString()))) {
context.setStatus("Writing a value to the Histogram");
i.add(new DoubleWritable(Double.parseDouble(t.toString())));
}
}

context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}

Job job = new Job(conf, "MRDT - Generate Histogram");
job.setJarByClass(CalculateHistogram.class);
job.setMapperClass(HistogramMap.class);
job.setReducerClass(HistogramReduce.class);

//job.setOutputValueClass(HistogramBucket.class);

//job.setMapOutputKeyClass(LongWritable.class);
//job.setMapOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最佳答案

您的 reduce 方法的签名是错误的。您的方法签名包含 Iterator<Text> .你必须通过 Iterable<Text> .

您的代码不会覆盖 reduce Reducer 的方法基类。因此,Reducer 提供的默认实现使用基类。此实现是身份函数。

使用 @Override注释来预测像这样的错误。

关于java - 第一次使用Hadoop,MapReduce Job没有运行Reduce Phase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4253286/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com