gpt4 book ai didi

java - Hadoop 多个输入错误分组 - 双向连接练习

转载 作者:行者123 更新时间:2023-12-01 11:04:09 25 4
gpt4 key购买 nike

我正在尝试学习一些 hadoop 并阅读了很多有关如何进行自然连接的内容。我有两个带有 key 和信息的文件,我想交叉并将最终结果呈现为 (a, b, c)。

我的问题是映射器正在为每个文件调用化简器。我期望收到类似 (10, [R1,S10, S22]) 的内容(10 是键,1, 10, 22 是以 10 作为键的不同行的值,并且 R 和 S 进行标记,以便我可以识别它们来自哪个表)。

问题是我的 reducer 收到 (10, [S10, S22]) 并且只有在完成所有 S 文件后我才得到另一个键值对,例如 (10, [R1])。这意味着,它对每个文件分别按键分组并调用 reducer

我不确定这是否是正确的行为,是否必须以不同的方式配置它,或者我是否做错了一切。

我也是java新手,所以代码对你来说可能看起来很糟糕。

我避免使用 TextPair 数据类型,因为我自己还无法想出这种方法,并且我认为这将是另一种有效的方法(以防万一您想知道)。谢谢

基于WordCount示例运行hadoop 2.4.1。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;

public class TwoWayJoin {

public static class FirstMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

Text a = new Text();
Text b = new Text();

a.set(tokenizer.nextToken());
b.set(tokenizer.nextToken());

output.collect(b, relation);
}
}

public static class SecondMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

Text b = new Text();
Text c = new Text();

b.set(tokenizer.nextToken());
c.set(tokenizer.nextToken());

Text relation = new Text("S"+c.toString());

output.collect(b, relation);

}
}

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

ArrayList < Text > RelationS = new ArrayList < Text >() ;
ArrayList < Text > RelationR = new ArrayList < Text >() ;

while (values.hasNext()) {
String relationValue = values.next().toString();
if (relationValue.indexOf('R') >= 0){
RelationR.add(new Text(relationValue));
} else {
RelationS.add(new Text(relationValue));
}
}

for( Text r : RelationR ) {
for (Text s : RelationS) {
output.collect(key, new Text(r + "," + key.toString() + "," + s));
}
}
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(MultipleInputs.class);
conf.setJobName("TwoWayJoin");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);

conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, SecondMap.class);

Path output = new Path(args[2]);

FileOutputFormat.setOutputPath(conf, output);

FileSystem.get(conf).delete(output, true);

JobClient.runJob(conf);

}
}

R.txt

(a  b(key))
2 46
1 10
0 24
31 50
11 2
5 31
12 36
9 46
10 34
6 31

S.txt

(b(key)  c)
45 32
45 45
46 10
36 15
45 21
45 28
45 9
45 49
45 18
46 21
45 45
2 11
46 15
45 33
45 6
45 20
31 28
45 32
45 26
46 35
45 36
50 49
45 13
46 3
46 8
31 45
46 18
46 21
45 26
24 15
46 31
46 47
10 24
46 12
46 36

此代码的输出成功但为空,因为我的数组 R 为空或数组 S 为空。

如果我只是将它们一一收集而不进行任何处理,那么我就已经映射了所有行。

预期输出为

key  "a,b,c"

最佳答案

问题出在组合器上。请记住,combiner 在映射输出上应用归约函数。所以它间接做的是reduce函数分别应用于您的R和S关系,这就是您在不同的reduce调用中获得R和S关系的原因。注释掉

conf.setCombinerClass(Reduce.class);

然后再次尝试运行应该不会有任何问题。另一方面,只有当您觉得映射输出的聚合结果与排序和洗牌完成后应用于输入时的聚合结果相同时,组合器函数才会有帮助。

关于java - Hadoop 多个输入错误分组 - 双向连接练习,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33115228/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com