gpt4 book ai didi

java - 使用 MapReduce MultipleOutputs 清空输出文件

转载 作者:可可西里 更新时间:2023-11-01 15:31:05 26 4
gpt4 key购买 nike

我在我的 Reducer 中使用 MultipleOutputs,因为我想为每个键创建单独的结果文件,但是,尽管创建了默认结果文件 part-r-xxxx 并包含正确的值,但每个结果文件都是空的。

这是我的 JobDriver 和 Reducer 代码

主类

public static void main(String[] args) throws Exception {
int currentIteration = 0;
int reducerCount, roundCount;

Configuration conf = createConfiguration(currentIteration);
cleanEnvironment(conf);
Job job = new Job(conf, "cfim");

//Input and output format configuration
job.setMapperClass(TransactionsMapper.class);
job.setReducerClass(PatriciaReducer.class);

job.setInputFormatClass(TransactionInputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

reducerCount = roundCount = Math.floorDiv(getRoundCount(conf), Integer.parseInt(conf.get(MRConstants.mergeFactorSpecifier)));

FileInputFormat.addInputPath(job, new Path("/home/cloudera/datasets/input"));
Path outputPath = new Path(String.format(MRConstants.outputPathFormat, outputDir, currentIteration));
FileOutputFormat.setOutputPath(job, outputPath);
MultipleOutputs.addNamedOutput(job, "key", TextOutputFormat.class, LongWritable.class, Text.class);

job.waitForCompletion(true);

reducer 类

public class PatriciaReducer extends Reducer<LongWritable, Text, LongWritable, Text> {

private ITreeManager treeManager;
private SerializationManager serializationManager;
private MultipleOutputs<LongWritable, Text> mos;

@Override
protected void setup(Context context) throws IOException ,InterruptedException {
treeManager = new PatriciaTreeManager();
serializationManager = new SerializationManager();
mos = new MultipleOutputs<LongWritable, Text>(context);
}

@Override
protected void reduce(LongWritable key, Iterable<Text> items, Context context)
throws IOException, InterruptedException {

Iterator<Text> patriciaIterator = items.iterator();
PatriciaTree tree = new PatriciaTree();

if (patriciaIterator.hasNext()){
Text input = patriciaIterator.next();
tree = serializationManager.deserializePatriciaTree(input.toString());
}

while(patriciaIterator.hasNext()){
Text input = patriciaIterator.next();
PatriciaTree mergeableTree = serializationManager.deserializePatriciaTree(input.toString());
tree = treeManager.mergeTree(tree, mergeableTree, false);
}

Text outputValue = new Text(serializationManager.serializeAsJson(tree));
mos.write("key", key, outputValue, generateOutputPath(key));
context.write(key, outputValue);
}

@Override
protected void finalize() throws Throwable {
// TODO Auto-generated method stub
super.finalize();
mos.close();
}

private String generateOutputPath(LongWritable key) throws IOException {
String outputPath = String.format("%s-%s", MRConstants.reduceResultValue, key.toString());
return outputPath;
}

我做错了什么吗?

最佳答案

我发现我使用了错误的方法来关闭多个输出对象。在 cleanup 方法而不是 finalize 方法中关闭 MultipleOutputs 后一切正常

关于java - 使用 MapReduce MultipleOutputs 清空输出文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32811915/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com