gpt4 book ai didi

Hadoop - 使用 MultipleInputs 加入可能会跳过 Reducer

转载 作者:行者123 更新时间:2023-12-02 21:57:54 25 4
gpt4 key购买 nike

所以,我想与 MR 执行 reduce side join。 (没有 Hive 或任何东西,我正在尝试 Vanilla Hadoop atm)。

我有 2 个输入文件,首先是这样的:
12 13
12 15
12 16
12 23

第二个只是 12 1000。

因此,我将每个文件分配给一个单独的映射器,该映射器实际上根据其源文件将每个键值对标记为 0 或 1。这很好用。我怎么知道?
我按预期得到了 MapOutput:

|关键 | |值(value)|
12 0 1000
12 1 13
12 1 15
12 1 16 等

我的 Partitioner 分区基于 key 的第一部分(即 12)。
Reducer 应该按键加入。然而,这项工作似乎跳过了减少步骤。

我想知道我的驱动程序是否有问题?

我的代码(Hadoop v0.22,但与 0.20.2 的结果相同,带有来自主干的额外库):

映射器

public static class JoinDegreeListMapper extends
Mapper<Text, Text, TextPair, Text> {
public void map(Text node, Text degree, Context context)
throws IOException, InterruptedException {

context.write(new TextPair(node.toString(), "0"), degree);

}
}

public static class JoinEdgeListMapper extends
Mapper<Text, Text, TextPair, Text> {
public void map(Text firstNode, Text secondNode, Context context)
throws IOException, InterruptedException {

context.write(new TextPair(firstNode.toString(), "1"), secondNode);

}
}

reducer
public static class JoinOnFirstReducer extends
Reducer<TextPair, Text, Text, Text> {
public void reduce(TextPair key, Iterator<Text> values, Context context)
throws IOException, InterruptedException {

context.progress();
Text nodeDegree = new Text(values.next());
while (values.hasNext()) {
Text secondNode = values.next();
Text outValue = new Text(nodeDegree.toString() + "\t"
+ secondNode.toString());
context.write(key.getFirst(), outValue);
}
}
}

分区器
public static class JoinOnFirstPartitioner extends
Partitioner<TextPair, Text> {

@Override
public int getPartition(TextPair key, Text Value, int numOfPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numOfPartitions;
}
}

司机
public int run(String[] args) throws Exception {


Path edgeListPath = new Path(args[0]);
Path nodeListPath = new Path(args[1]);
Path outputPath = new Path(args[2]);

Configuration conf = getConf();

Job job = new Job(conf);
job.setJarByClass(JoinOnFirstNode.class);
job.setJobName("Tag first node with degree");

job.setPartitionerClass(JoinOnFirstPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
//job.setSortComparatorClass(TextPair.FirstComparator.class);
job.setReducerClass(JoinOnFirstReducer.class);

job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


MultipleInputs.addInputPath(job, edgeListPath, EdgeInputFormat.class,
JoinEdgeListMapper.class);
MultipleInputs.addInputPath(job, nodeListPath, EdgeInputFormat.class,
JoinDegreeListMapper.class);

FileOutputFormat.setOutputPath(job, outputPath);


return job.waitForCompletion(true) ? 0 : 1;

}

最佳答案

我的 reduce 函数有 Iterator<> 而不是 Iterable,所以工作跳到了 Identity Reducer。
我不敢相信我忽略了这一点。菜鸟错误。

答案来自这个 Q/A
Using Hadoop for the First Time, MapReduce Job does not run Reduce Phase

关于Hadoop - 使用 MultipleInputs 加入可能会跳过 Reducer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9035244/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com