gpt4 book ai didi

java - MapReduce:减少写入上下文时无限期停止

转载 作者:可可西里 更新时间:2023-11-01 16:50:47 26 4
gpt4 key购买 nike

下面是一个map reduce程序,在map函数中进行过滤,在reduce步骤中进行求和。

map 部分执行良好。但是当 reduce 部分运行时,它会卡在 context.write(key,value) 行。

只有当我尝试在 reduce 函数类型中编写与在 map 函数中编写的不同的输出时,才会发生这种情况

public class Filter3 {

public static class TokenizerMapper extends Mapper<Object, Text, Text, Contestant>{

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String[] cols = value.toString().split(",");

try {
Contestant val = new Contestant(cols[0],cols[1],cols[2]);

System.out.println();
System.out.println();
System.out.print(key+" ::: ");
System.out.println(val);
System.out.println();
System.out.println();

val.name = val.name.toUpperCase();

if(val.rating>=9) {
context.write(new Text(val.name), val); //write null if it is not required
}
} catch(Exception ex) {
ex.printStackTrace();
}

}
}

public static class AvgRatingReducer extends Reducer<Text,Contestant,Text,DoubleWritable> {

private DoubleWritable result = new DoubleWritable(0.0);

public void reduce(Text key, Iterable<Contestant> values, Context context ) throws IOException, InterruptedException {

double sum = 0.0;
int count = 0;

for (Contestant val : values) {
sum += val.rating;
count++;
}

if(count>0) {
result.set(sum/(double)count);
}

System.out.println(result);

context.write(key, result);

}
}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "AvgMRJob"); //configuration and job name

job.setJarByClass(Filter3.class);

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(AvgRatingReducer.class);
job.setReducerClass(AvgRatingReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

Path inPath = new Path(args[0]);
Path outPath = new Path(args[1]);
outPath.getFileSystem(conf).delete(outPath,true);

FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

使用的可写对象在这里:

public class Contestant implements Writable {

long id;
String name;
double rating;

public Contestant() {}

public Contestant(long id, String name, double rating) {
this.id = id;
this.name = name;
this.rating = rating;
}

public Contestant(String id, String name, String rating) {
try {
this.id = Long.parseLong(id.trim());
} catch(Exception ex) {

}
this.name = name;
try {
this.rating = Double.parseDouble(rating.trim());
} catch(Exception ex) {

}

}

@Override
public void readFields(DataInput inp) throws IOException {

id = inp.readLong();
name = WritableUtils.readString(inp);
rating = inp.readDouble();
}

@Override
public void write(DataOutput out) throws IOException {

out.writeLong(id);
WritableUtils.writeString(out, name);
out.writeDouble(rating);
}

@Override
public String toString() {

return this.id + "," + this.name + "," + this.rating;
}
}

将输出写入上下文时,执行会卡在 reduce 函数中。我没有收到任何错误/异常。它只是无限期地挂起。我不知道是什么问题。我遵循了 MapReduce 的通常程序。

enter image description here

注意:如果我在 map 和 reduce 中写入相同类型的数据,则相同的程序会起作用。即如果我在 Map 和 Reduce 函数中都写 (key=Text,val=Contestant) 。 - 而不是在 reduce 中使用 DoubleWritable!!

最佳答案

删除组合器:

// job.setCombinerClass(AvgRatingReducer.class);

如果您使用组合器,则需要确保缩减器在组合器类的输出上工作,而不是映射器。

关于java - MapReduce:减少写入上下文时无限期停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33461476/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com