gpt4 book ai didi

java - 合并来自 hadoop map-reduce 的结果

转载 作者:可可西里 更新时间:2023-11-01 15:31:52 25 4
gpt4 key购买 nike

我有一个 Mapper<AvroKey<Email>, NullWritable, Text, Text>它有效地接收电子邮件并多次吐出电子邮件地址的键和找到它的字段的值(发件人、收件人、抄送等)。

然后我有一个Reducer<Text, Text, NullWritable, Text>接受电子邮件地址和字段名称。它吐出一个 NullWritable 键和一个地址在给定字段中出现的次数的计数。例如...

{
"address": "joe.bloggs@gmail.com",
"toCount": 12,
"fromCount": 4
}

我正在使用 FileUtil.copyMerge 来合并作业的输出,但(显然)不同 reducer 的结果没有合并,所以在实践中我看到:

{
"address": "joe.bloggs@gmail.com",
"toCount": 12,
"fromCount": 0
}, {
"address": "joe.bloggs@gmail.com",
"toCount": 0,
"fromCount": 4
}

有没有更明智的方法来解决这个问题,以便我可以为每个电子邮件地址获得一个结果? (我收集到运行预缩减阶段的组合器仅在数据子集上运行,不能保证提供我想要的结果)?

编辑:

Reducer 代码类似于:

public class EmailReducer extends Reducer<Text, Text, NullWritable, Text> {

private static final ObjectMapper mapper = new ObjectMapper();

public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Map<String, Object>> results = new HashMap<>();

for (Text value : values) {
if (!results.containsKey(value.toString())) {
Map<String, Object> result = new HashMap<>();
result.put("address", key.toString());
result.put("to", 0);
result.put("from", 0);

results.put(value.toString(), result);
}

Map<String, Object> result = results.get(value.toString());

switch (value.toString()) {
case "TO":
result.put("to", ((int) result.get("to")) + 1);
break;
case "FROM":
result.put("from", ((int) result.get("from")) + 1);
break;
}

results.values().forEach(result -> {
context.write(NullWritable.get(), new Text(mapper.writeValueAsString(result)));
});
}
}

最佳答案

reducer 的每个输入键都对应一个唯一的电子邮件地址,因此您不需要 results 集合。每次调用 reduce 方法时,都是针对一个不同的电子邮件地址,所以我的建议是:

public class EmailReducer extends Reducer<Text, Text, NullWritable, Text> {

private static final ObjectMapper mapper = new ObjectMapper();

public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

Map<String, Object> result = new HashMap<>();
result.put("address", key.toString());
result.put("to", 0);
result.put("from", 0);

for (Text value : values) {
switch (value.toString()) {
case "TO":
result.put("to", ((int) result.get("to")) + 1);
break;
case "FROM":
result.put("from", ((int) result.get("from")) + 1);
break;
}

context.write(NullWritable.get(), new Text(mapper.writeValueAsString(result)));

}
}

我不确定 ObjectMapper 类的作用,但我想您需要它来格式化输出。否则,我会将输入键打印为输出键(即电子邮件地址),并为每个电子邮件地址的“发件人”和“收件人”字段打印两个串联计数。

如果您的输入是数据集合(即不是流或类似的东西),那么您应该只获取每个电子邮件地址一次。如果您的输入是在流中给出的,并且您需要逐步构建最终输出,那么一个作业的输出可以是另一个作业的输入。如果是这种情况,我建议使用 MultipleInputs,其中一个 Mapper 是您之前描述的那个,另一个 IdentityMapper 将前一个作业的输出转发到 Reducer。这样,同一个电子邮件地址同样由同一个 reduce 任务处理。

关于java - 合并来自 hadoop map-reduce 的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31702374/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com