gpt4 book ai didi

hadoop - 分组相应的键和值

转载 作者:可可西里 更新时间:2023-11-01 16:43:58 24 4
gpt4 key购买 nike

我有一个用例来编写映射缩减代码,我必须将对应于同一队列的值分组:

输入:

A,B  
A,C
B,A
B,D

输出:

A {B,C}  
B {A,D}

我写了这段代码:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class GroupKeyValues {

public static class Map extends Mapper<LongWritable, Text, Text, Text> {

public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {

Text myKey = new Text();
Text myVal = new Text();
String line = value.toString();
StringTokenizer st = new StringTokenizer(line);

while (st.hasMoreTokens()) {

String thisH = st.nextToken();
String[] splitData = thisH.split(",");
myKey.set(splitData[0]);
myVal.set(splitData[1]);
}
con.write(myKey, myVal);

}

}

@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "GroupKeyValues");

job.setJarByClass(GroupKeyValues.class);
job.setMapperClass(Map.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

Path outputPath = new Path(args[1]);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

outputPath.getFileSystem(conf).delete(outputPath);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

最佳答案

您缺少将值聚合为单个“行”值的缩减器。例如,您可以像这样使用 ArrayWritable:

public static class AggregatingReducer extends Reducer<Text, Text, Text, ArrayWritable> {
private ArrayWritable result = new ArrayWritable(Text.class);

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<Text> list = new ArrayList<>();
for (Text value : values) {
list.add(value);
}
result.set(list.toArray(new Text[list.size()]));
context.write(key, result);
}
}

在作业设置中,确保添加:

job.setReducerClass(AggregatingReducer.class);
job.setOutputValueClass(ArrayWritable.class); //instead of Text.class

或者(取决于您的需要)您可以将 reducer 值连接到 StringBuilder 中并发出 Text,而不是将其累积并作为 ArrayWritable 发出。

更新:下面是使用逗号分隔符的 StringBuilder 示例:

public static class AggregatingReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
if (sb.length() != 0) {
sb.append(',');
}
sb.append(value);
}
result.set(sb.toString());
context.write(key, result);
}
}

在驱动程序中值类型需要改回文本:

job.setOutputValueClass(Text.class);

关于hadoop - 分组相应的键和值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38095902/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com