gpt4 book ai didi

当文件格式为自定义格式时,Hadoop MultipleOutputs 不会写入多个文件

转载 作者:可可西里 更新时间:2023-11-01 14:47:40 25 4
gpt4 key购买 nike

我正在尝试从 cassandra 中读取并使用 MultipleOutputs api(Hadoop 版本 1.0.3)将 reducers 输出写入多个输出文件。在我的案例中,文件格式是扩展 FileOutputFormat 的自定义输出格式。我已按照 MultipleOutputs api 中所示的类似方式配置了我的作业.但是,当我运行作业时,我只得到一个名为 part-r-0000 的输出文件,它是文本输出格式。如果未设置 job.setOutputFormatClass(),默认情况下它会将 TextOutputFormat 视为格式。此外,它只允许初始化两个格式类之一。它完全忽略了我在 MulitpleOutputs.addNamedOutput(job, "format1", MyCustomFileFormat1.class, Text.class, Text.class) 和 MulitpleOutputs.addNamedOutput(job, "format2", MyCustomFileFormat2.class, Text .class, Text.class).其他人是否面临类似问题,还是我做错了什么?

我还尝试编写一个非常简单的 MR 程序,该程序从文本文件读取并以 MultipleOutputs api 中所示的 TextOutputFormat 和 SequenceFileOutputFormat 两种格式写入输出。但是,那里也没有运气。我只得到 1 个文本输出格式的输出文件。

有人可以帮我解决这个问题吗?

Job job = new Job(getConf(), "cfdefGen");
job.setJarByClass(CfdefGeneration.class);

//read input from cassandra column family
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.getConfiguration().set("cassandra.consistencylevel.read", "QUORUM");

//thrift input job configurations
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), HOST);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "RandomPartitioner");

SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("classification")));
//ConfigHelper.setRangeBatchSize(job.getConfiguration(), 2048);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);

//specification for mapper
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//specifications for reducer (writing to files)
job.setReducerClass(ReducerToFileSystem.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setOutputFormatClass(MyCdbWriter1.class);
job.setNumReduceTasks(1);

//set output path for storing output files
Path filePath = new Path(OUTPUT_DIR);
FileSystem hdfs = FileSystem.get(getConf());
if(hdfs.exists(filePath)){
hdfs.delete(filePath, true);
}
MyCdbWriter1.setOutputPath(job, new Path(OUTPUT_DIR));

MultipleOutputs.addNamedOutput(job, "cdb1', MyCdbWriter1.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "cdb2", MyCdbWriter2.class, Text.class, Text.class);

boolean success = job.waitForCompletion(true);
return success ? 0:1;

public static class ReducerToFileSystem extends Reducer<Text, Text, Text, Text>
{
private MultipleOutputs<Text, Text> mos;

public void setup(Context context){
mos = new MultipleOutputs<Text, Text>(context);
}

//public void reduce(Text key, Text value, Context context)
//throws IOException, InterruptedException (This was the mistake, changed the signature and it worked fine)
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
//context.write(key, value);
mos.write("cdb1", key, value, OUTPUT_DIR+"/"+"cdb1");
mos.write("cdb2", key, value, OUTPUT_DIR+"/"+"cdb2");
context.progress();
}

public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}

public class MyCdbWriter1<K, V> extends FileOutputFormat<K, V>
{
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
}

public static void setOutputPath(Job job, Path outputDir) {
job.getConfiguration().set("mapred.output.dir", outputDir.toString());
}

protected static class CdbDataRecord<K, V> extends RecordWriter<K, V>
{
@override
write()
close()
}
}

最佳答案

我在调试后发现我的错误,我的 reduce 方法从未被调用过。我发现我的函数定义与 API 的定义不匹配,将其从 public void reduce(Text key, Text value, Context context) 更改为至 public void reduce(Text key, Iterable<Text> values, Context context) .我不知道为什么 reduce 方法没有 @Override 标签,它可以防止我的错误。

关于当文件格式为自定义格式时,Hadoop MultipleOutputs 不会写入多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12981233/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com