- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试学习一些 hadoop 并阅读了很多有关如何进行自然连接的内容。我有两个带有 key 和信息的文件,我想交叉并将最终结果呈现为 (a, b, c)。
我的问题是映射器正在为每个文件调用化简器。我期望收到类似 (10, [R1,S10, S22]) 的内容(10 是键,1, 10, 22 是以 10 作为键的不同行的值,并且 R 和 S 进行标记,以便我可以识别它们来自哪个表)。
问题是我的 reducer 收到 (10, [S10, S22]) 并且只有在完成所有 S 文件后我才得到另一个键值对,例如 (10, [R1])。这意味着,它对每个文件分别按键分组并调用 reducer
我不确定这是否是正确的行为,是否必须以不同的方式配置它,或者我是否做错了一切。
我也是java新手,所以代码对你来说可能看起来很糟糕。
我避免使用 TextPair 数据类型,因为我自己还无法想出这种方法,并且我认为这将是另一种有效的方法(以防万一您想知道)。谢谢
基于WordCount示例运行hadoop 2.4.1。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
public class TwoWayJoin {
public static class FirstMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
Text a = new Text();
Text b = new Text();
a.set(tokenizer.nextToken());
b.set(tokenizer.nextToken());
output.collect(b, relation);
}
}
public static class SecondMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
Text b = new Text();
Text c = new Text();
b.set(tokenizer.nextToken());
c.set(tokenizer.nextToken());
Text relation = new Text("S"+c.toString());
output.collect(b, relation);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
ArrayList < Text > RelationS = new ArrayList < Text >() ;
ArrayList < Text > RelationR = new ArrayList < Text >() ;
while (values.hasNext()) {
String relationValue = values.next().toString();
if (relationValue.indexOf('R') >= 0){
RelationR.add(new Text(relationValue));
} else {
RelationS.add(new Text(relationValue));
}
}
for( Text r : RelationR ) {
for (Text s : RelationS) {
output.collect(key, new Text(r + "," + key.toString() + "," + s));
}
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(MultipleInputs.class);
conf.setJobName("TwoWayJoin");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, SecondMap.class);
Path output = new Path(args[2]);
FileOutputFormat.setOutputPath(conf, output);
FileSystem.get(conf).delete(output, true);
JobClient.runJob(conf);
}
}
R.txt
(a b(key))
2 46
1 10
0 24
31 50
11 2
5 31
12 36
9 46
10 34
6 31
S.txt
(b(key) c)
45 32
45 45
46 10
36 15
45 21
45 28
45 9
45 49
45 18
46 21
45 45
2 11
46 15
45 33
45 6
45 20
31 28
45 32
45 26
46 35
45 36
50 49
45 13
46 3
46 8
31 45
46 18
46 21
45 26
24 15
46 31
46 47
10 24
46 12
46 36
此代码的输出成功但为空,因为我的数组 R 为空或数组 S 为空。
如果我只是将它们一一收集而不进行任何处理,那么我就已经映射了所有行。
预期输出为
key "a,b,c"
最佳答案
问题出在组合器上。请记住,combiner 在映射输出上应用归约函数。所以它间接做的是reduce函数分别应用于您的R和S关系,这就是您在不同的reduce调用中获得R和S关系的原因。注释掉
conf.setCombinerClass(Reduce.class);
然后再次尝试运行应该不会有任何问题。另一方面,只有当您觉得映射输出的聚合结果与排序和洗牌完成后应用于输入时的聚合结果相同时,组合器函数才会有帮助。
关于java - Hadoop 多个输入错误分组 - 双向连接练习,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33115228/
您好,我正在处理 BIRT 报告。我有一个查询,我必须对父级的重复数据进行分组,但子级也不能分组! 在我的查询中: item 是父项,item_ledger_entry 是子项。我有来自 item.N
我正在使用 GA API。 这是针对 MCF 目标报告(底部)的标准目标完成指标表(顶部) 看一下这个: 总数加起来 (12,238),但看看按 channel 分组的分割有多么不同!我以为这些会很接
我正在开发一个流量计数器,我想获得 IP 和重复计数,但是如何? 就像是 :select ip, count(ip) from Redirect 返回 : null total ip count 重定
我尝试编写一个正则表达式来匹配条件表达式,例如: a!=2 1+2=2+a 我尝试提取运算符。我当前的正则表达式是“.+([!=<>]+).+” 但问题是匹配器总是尝试匹配组中可能的最短字符串
在 MS Transact SQL 中,假设我有一个这样的表(订单): Order Date Order Total Customer # 09/30/2008 8
我想按 m.ID 分组,并对每个 m.id 求和 (pm.amount_construction* prod.anzahl) 实际上我有以下结果: Meterial_id | amount_const
我想根据多列中的值对值进行分组。这是一个例子: 我想得到输出: {{-30,-50,20},{-20,30,60},{-30,NULL or other value, 20}} 我设法到达: SELE
我正在尝试找出运行此查询的最佳方式。我基本上需要返回在我们的系统中只下了一个订单的客户的“登录”字段列表(登录字段基本上是客户 ID/ key )。 我们系统的一些背景...... 客户在同一日期下的
给定以下mysql结果集: id code name importance '1234', 'ID-CS-B', 'Chocolate Sauce'
大家好,我的数据框中有以下列: LC_REF 1 DT 16 2C 2 DT 16 2C 3 DT 16 2C 1 DT 16 3C 6 DT 16 3C 3
我有这样的 mongoDB 集合 { "_id" : "EkKTRrpH4FY9AuRLj", "stage" : 10, }, { "_id" : "EkKTRrpH4FY9
假设我有一组数据对,其中 index 0 是值,index 1 是类型: input = [ ('11013331', 'KAT'), ('9085267',
java中用stream进行去重,排序,分组 一、distinct 1. 八大基本数据类型 List collect = ListUtil.of(1, 2, 3, 1, 2).stream().fil
基本上,我从 TABLE_A 中的这个开始 France - 100 France - 200 France - 300 Mexico - 50 Mexico - 50 Mexico - 56 Pol
我希望这个正则表达式 ([A-Z]+)$ 将选择此示例中的最后一次出现: AB.012.00.022ABC-1 AB.013.00.022AB-1 AB.014.00.022ABAB-1 但我没有匹配
我创建了一个数据透视表,但数据没有组合在一起。 任何人都可以帮助我获得所需的格式吗? 我为获取数据透视表而编写的查询: DECLARE @cols AS NVARCHAR(MAX), -- f
我想按时间段(月,周,日,小时,...)选择计数和分组。例如,我想选择行数并将它们按 24 小时分组。 我的表创建如下。日期是时间戳。 CREATE TABLE MSG ( MSG_ID dec
在 SQL Server 2005 中,我有一个包含如下数据的表: WTN------------Date 555-111-1212 2009-01-01 555-111-1212 2009-
题 假设我有 k 个标量列,如果它们沿着每列彼此在一定距离内,我想对它们进行分组。 假设简单 k 是 2 并且它们是我唯一的列。 pd.DataFrame(list(zip(sorted(choice
问题 在以下数据框中 df : import random import pandas as pd random.seed(999) sz = 50 qty = {'one': 1, 'two': 2
我是一名优秀的程序员,十分优秀!