gpt4 book ai didi

java - mapreduce 中 Join 操作的输出

转载 作者:行者123 更新时间:2023-12-02 21:54:28 24 4
gpt4 key购买 nike

我正在 map reduce 中执行连接操作。我正在输入两个文件,其值由分隔符(逗号)分隔。通过对一个公共(public)实体执行连接操作,我可以从两个输入文件中获取一个文件中的输出。

这是 map 减少代码:

public class EmpMapReduce {
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String tokens [] = value.toString().split(",");
String empid = tokens[0];
String val = "";
if(tokens.length != 0)
{
for (int cnt = 1; cnt < tokens.length; cnt++)
{
val = val + tokens[cnt] + "\t";
}
}

context.write(new Text(empid), new Text(val));

}
}

public static class MyReducer extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException
{
String str = "";
for (Text val : values)
{
str = str + val.toString() + "\t";
}

context.write(key, new Text (str));

}
}

public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: EmpMapReduce <in1> <in2> <out>");
System.exit(2);
}
Job job = new Job(conf, "EmpMapReduce");



job.setJarByClass(EmpMapReduce.class);

job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(MyReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

这是我使用的两个输入文件内容:
100,name100,10
101,name101,11
102,name102,12
103,name103,13
104,name104,14
105,name105,15
106,name106,16
107,name107,17

第二个输入文件:
100,100000
101,200000
102,300000
103,400000
104,500000
105,600000
106,700000
107,800000

我得到以下输出:
100,name100,10,100000
101,200000,name101,11
102,name102,12,300000
103,400000,name103,13
104,name104,14,500000
105,600000,name105,15
106,name106,16,700000
107,800000,name107,17

现在我担心的是为什么我会得到这样的输出:
100,name100,10,100000
101,200000,name101,11

即第一行数据首先从一个输入文件复制,而不是从另一个输入文件复制。但对于第二排,反之亦然。我无法弄清楚如何使每一行中的数据顺序相同。

另一个问题是:

一旦我以特定顺序在所有行中获得数据,我就可以执行各种操作,例如:替换 name100 ---> somenewname 或在每行末尾添加新的逗号分隔值,该值具有该行中所有值的总和。

最佳答案

两个映射器的输出到达 reducer 的顺序是未指定的。所以你需要一些方法来在reducer中识别它们。

一个简单的解决方案是:

  • 有两个映射器,每个输入一个
  • 每个映射器输出“[type]:[rest of value]”的值
  • 假设您有两种类型(用户、交易),现在每种类型都已识别。
  • 现在在你的 reducer 中(抱歉伪代码):

  • void reduce(..) {
    String user = "";
    String trans = "";

    for(value: values) {
    (type, payload) = value.split();
    if (type == "user") user = payload;
    if (type == "transaction") transaction = payload;
    }

    context.write(user + "\t" + transaction);
    }

    关于java - mapreduce 中 Join 操作的输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15954276/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com