gpt4 book ai didi

java - Hadoop 中的Reducer 在集群上运行时出现异常

转载 作者:行者123 更新时间:2023-12-01 13:10:21 25 4
gpt4 key购买 nike

我有一个 MapReduce 程序,在独立模式下运行时运行得很好,但是当我在学校的 Hadoop 集群上运行它时,Reducer 中发生了异常。我不知道这是什么异常(exception)。我开始知道这一点,因为当我在 reducer 中保留 try/catch 时,作业会通过但输出为空。当我不保留 try/catch 时,工作就会失败。由于这是一个学校集群,我无法访问任何工作跟踪器或其他文件。我所能找到的只是通过编程方式。有没有办法可以找到运行时 hadoop 上发生的异常?

以下是我的代码片段

public static class RowMPreMap extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {

private Text keyText = new Text();
private Text valText = new Text();

public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {

// Input: (lineNo, lineContent)

// Split each line using seperator based on the dataset.
String line[] = null;

line = value.toString().split(Settings.INPUT_SEPERATOR);

keyText.set(line[0]);
valText.set(line[1] + "," + line[2]);

// Output: (userid, "movieid,rating")
output.collect(keyText, valText);
}
}

public static class RowMPreReduce extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {

private Text valText = new Text();

public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {

// Input: (userid, List<movieid, rating>)

float sum = 0.0F;
int totalRatingCount = 0;

ArrayList<String> movieID = new ArrayList<String>();
ArrayList<Float> rating = new ArrayList<Float>();

while (values.hasNext()) {
String[] movieRatingPair = values.next().toString().split(",");
movieID.add(movieRatingPair[0]);
Float parseRating = Float.parseFloat(movieRatingPair[1]);
rating.add(parseRating);

sum += parseRating;
totalRatingCount++;
}

float average = ((float) sum) / totalRatingCount;

for (int i = 0; i < movieID.size(); i++) {
valText.set("M " + key.toString() + " " + movieID.get(i) + " "
+ (rating.get(i) - average));
output.collect(null, valText);
}

// Output: (null, <M userid, movieid, normalizedrating>)
}
}

上述 reducer 发生异常。下面是配置

public void normalizeM() throws IOException, InterruptedException {
JobConf conf1 = new JobConf(UVDriver.class);
conf1.setMapperClass(RowMPreMap.class);
conf1.setReducerClass(RowMPreReduce.class);
conf1.setJarByClass(UVDriver.class);

conf1.setMapOutputKeyClass(Text.class);
conf1.setMapOutputValueClass(Text.class);

conf1.setOutputKeyClass(Text.class);
conf1.setOutputValueClass(Text.class);

conf1.setKeepFailedTaskFiles(true);

conf1.setInputFormat(TextInputFormat.class);
conf1.setOutputFormat(TextOutputFormat.class);

FileInputFormat.addInputPath(conf1, new Path(Settings.INPUT_PATH));
FileOutputFormat.setOutputPath(conf1, new Path(Settings.TEMP_PATH + "/"
+ Settings.NORMALIZE_DATA_PATH_TEMP));

JobConf conf2 = new JobConf(UVDriver.class);
conf2.setMapperClass(ColMPreMap.class);
conf2.setReducerClass(ColMPreReduce.class);
conf2.setJarByClass(UVDriver.class);

conf2.setMapOutputKeyClass(Text.class);
conf2.setMapOutputValueClass(Text.class);

conf2.setOutputKeyClass(Text.class);
conf2.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(conf2, new Path(Settings.TEMP_PATH + "/"
+ Settings.NORMALIZE_DATA_PATH_TEMP));
FileOutputFormat.setOutputPath(conf2, new Path(Settings.TEMP_PATH + "/"
+ Settings.NORMALIZE_DATA_PATH));

Job job1 = new Job(conf1);
Job job2 = new Job(conf2);

JobControl jobControl = new JobControl("jobControl");
jobControl.addJob(job1);
jobControl.addJob(job2);
job2.addDependingJob(job1);
handleRun(jobControl);

}

最佳答案

我在reducer中捕获了异常并将堆栈跟踪写入文件系统中的文件中。我知道这是最肮脏的方式,但目前我别无选择。以下是代码,如果对将来的任何人有帮助的话。将代码放入 catch block 中。

                String valueString = "";
while (values.hasNext()) {
valueString += values.next().toString();
}

StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
String exceptionAsString = sw.toString();

Path pt = new Path("errorfile");
FileSystem fs = FileSystem.get(new Configuration());
BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
br.write(exceptionAsString + "\nkey: " + key.toString() + "\nvalues: " + valueString);
br.close();

欢迎以干净的方式完成此任务。

顺便说一句,最终我发现它是一个 NumberFormatException。计数器无法帮助我识别这一点。后来我意识到单机和集群上分割输入的格式是以不同的方式发生的,我还没有找到原因。

关于java - Hadoop 中的Reducer 在集群上运行时出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22922864/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com