gpt4 book ai didi

hadoop - 使用自定义可写从 Hadoop Map Reduce 作业输出列表

转载 作者:可可西里 更新时间:2023-11-01 14:10:48 29 4
gpt4 key购买 nike

我正在尝试通过更改 hadoop 给出的字数示例来创建一个简单的 map reduce 作业。

我试图列出一个列表而不是单词数。 wordcount 示例给出以下输出

hello 2
world 2

我正在努力让它以列表的形式输出,这将构成 future 工作的基础

hello 1 1
world 1 1

我认为我在正确的轨道上,但我在编写列表时遇到了问题。而不是上面的,我得到

Hello   foo.MyArrayWritable@61250ff2
World foo.MyArrayWritable@483a0ab1

这是我的 MyArrayWritable。我在 write(DataOuptut arg0) 中放了一个 sys out 但它从不输出任何东西所以我认为可能不会调用该方法而且我不知道为什么。

class MyArrayWritable extends ArrayWritable{

public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
super(valueClass, values);
}
public MyArrayWritable(Class<? extends Writable> valueClass) {
super(valueClass);
}

@Override
public IntWritable[] get() {
return (IntWritable[]) super.get();
}

@Override
public void write(DataOutput arg0) throws IOException {
for(IntWritable i : get()){
i.write(arg0);
}
}
}

编辑 - 添加更多源代码

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}

public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
ArrayList<IntWritable> list = new ArrayList<IntWritable>();
for (IntWritable val : values) {
list.add(val);
}
context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
}
}

public static void main(String[] args) throws Exception {
if(args == null || args.length == 0)
args = new String[]{"./wordcount/input","./wordcount/output"};
Path p = new Path(args[1]);
FileSystem fs = FileSystem.get(new Configuration());
fs.exists(p);
fs.delete(p, true);

Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}

最佳答案

你的 reducer 中有一个“错误” - 值迭代器在整个循环中重复使用相同的 IntWritable,因此你应该按如下方式包装要添加到列表中的值:

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
ArrayList<IntWritable> list = new ArrayList<IntWritable>();
for (IntWritable val : values) {
list.add(new IntWritable(val));
}
context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
}

这实际上不是问题,因为您使用的是数组列表并且您的映射器只输出一个值(一个),但如果您扩展此代码,这可能会使您出错。

您还需要在作业中定义您的 map 和 reducer 输出类型不同:

// map output types
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// reducer output types

job.setOutputValueClass(Text.class);
job.setOutputValueClass(MyArrayWritable.class);

您可能想要明确定义 reducer 的数量(这可能就是为什么您永远看不到系统输出被写入任务日志的原因,尤其是当您的集群管理员将默认数量定义为 0 时):

job.setNumReduceTasks(1);

您使用默认的文本输出格式,它在输出键和值对上调用 toString() - MyArrayWritable 没有覆盖的 toString 方法,因此您应该在 MyArrayWritable 中放置一个:

@Override
public String toString() {
return Arrays.toString(get());
}

最后从 MyArrayWritable 中删除重写的 write 方法 - 这不是与免费的 readFields 方法兼容的有效实现。你不需要重写这个方法,但是如果你这样做了(比如你想看到一个系统输出来验证它被调用)然后做这样的事情:

@Override
public void write(DataOutput arg0) throws IOException {
System.out.println("write method called");
super.write(arg0);
}

关于hadoop - 使用自定义可写从 Hadoop Map Reduce 作业输出列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15810550/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com