gpt4 book ai didi

java - Mapreduce程序只输出一条记录

转载 作者:可可西里 更新时间:2023-11-01 16:41:59 26 4
gpt4 key购买 nike

我编写了一个 MapReduce 程序 来分析这种形式的用户的数据集

UserID::Gender::Age::MoviesRated::Zip Code
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117

我要

find the top 10 zipcodes based on the avarage age of users belonging to that zipcode, in the descending order of the avarage age. Top 10 means the youngest 10 avarage age of users of that zipcode.

我有一个 MapClass、一个 CombinerClass 和一个 ReducerClass

我的代码如下

public class TopTenYoungestAverageAgeRaters extends Configured implements Tool {
private static TreeSet<AverageAge> top10 = new TreeSet<AverageAge>();

public static class MapClass extends Mapper<LongWritable, Text, Text, AverageAge>
{

public boolean isNumeric(String value) // Checks if record is valid
{
try
{
Integer.parseInt(value);
return true;
}
catch(NumberFormatException e)
{
return false;
}
}

public AverageAge toCustomWritable(String[] line)
{
AverageAge record = new AverageAge(new IntWritable(Integer.parseInt(line[0])), new IntWritable(Integer.parseInt(line[2])), new Text(line[1]), new IntWritable(Integer.parseInt(line[3])), new Text(line[4]));
return record;
}

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String[] values = line.split("::");
if(isNumeric(values[0]))
{
AverageAge customTuple = toCustomWritable(values);
context.write(new Text(values[4]), customTuple);
}

}
}

public static class CombinerClass extends Reducer<Text, AverageAge, Text, AverageAge>
{
public void reduce(Text key, Iterable<AverageAge> values, Context context) throws IOException, InterruptedException
{
AverageAge newRecord = new AverageAge();
long age = 0;
int count = 0;
for(AverageAge value:values)
{
age += value.getUserAge();
count += 1;
}
newRecord.setZipCode(key.toString());
newRecord.setAverageAge((double)(age/count));
context.write(key, newRecord);
}
}


public static class ReducerClass extends Reducer<Text, AverageAge, NullWritable, AverageAge>
{

public void reduce(Text key, Iterable<AverageAge> values, Context context) throws IOException, InterruptedException
{

for(AverageAge value:values)
{
top10.add(value);
if(top10.size() > 10)
top10.remove(top10.last());
}
}

protected void cleanup(Context context) throws IOException, InterruptedException
{
for(AverageAge avg: top10)
{
context.write(NullWritable.get(), avg);
}
}
}

public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
int res = ToolRunner.run(new Configuration(), new TopTenYoungestAverageAgeRaters(), args);
System.exit(res);
}

@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(MapClass.class);
job.setCombinerClass(CombinerClass.class);
job.setReducerClass(ReducerClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AverageAge.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(AverageAge.class);

FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
return job.waitForCompletion(true) ? 0 : 1;
}

}

MapClass 以 zipcode 作为 keyAverageAge(自定义可写类)作为 value 写入输出>

CombinerClass 计算属于该邮政编码的用户的平均年龄,并将 key 写为邮政编码,将值写为 AverageAge

ReducerClass 给出(应该给出)具有平均用户年龄的前 10 个邮政编码,但我只得到一个记录作为输出。。

我还尝试在 Reducer 类中执行 System.out.println() 以查看传递给 ReducerClass 的值,但 console< 上没有打印任何内容(我在 eclipse 环境中本地运行程序)

我是 MapReduce 的新手,无法找出该程序中的错误。

Dataset Source

最佳答案

问题陈述似乎自相矛盾:平均年龄递减的前 10 名将是最年长的 10 名,而不是最年轻的 10 名。最好在那里得到一些澄清。

不管怎么说,这里有很多很多错误。

  1. 不能保证组合器永远被调用
  2. 如果您有多个 reducer 任务,您将在不同的文件中从每个任务中获得最多 10 个输出
  3. 正如所写,您将获得的“前 10 个”将是 10 个最低的邮政编码(按字典顺序排序)。
  4. 通常到cleanup() 时间你就不再写记录了。

你想要的是使用 shuffle 将具有相同邮政编码的记录放在一起,并使用聚合类(Combiner 和 Reducer)计算平均值。在每个邮政编码都有年龄之前,无法确定“前 10 个”要求。不过,关键的一点是,为了以分布式方式计算平均值,你永远不能失去分母,直到你减少。您车队中的组合器可能会收到具有相同 key 的记录。

Mapper 获取一条记录并生成一个三元组:

k::g::a::z |=> z |-> ( 1, a )

Combiner 获取具有相同键的三元组的集合并对它们进行平均(并对分母求和):

z |-> [ ( d1, a1 ), ..., ( dn, an ) ] |=> z |-> ( sum( di ), sum( ai ) / sum ( di ) )

Reducer 获取具有相同键的三元组的集合并对它们进行平均,抛出分母:

z |-> [ ( d1, a1 ), ..., ( dn, an ) ] |=> z |-> sum( ai ) / sum ( di )

无论您是否提供组合器,您的算法都应该有效;组合器是一种优化,仅适用于某些 map-reduce 情况。

要限制为前 10 名,您现在需要按平均年龄对结果重新排序。

这意味着另一个映射器:

z |-> avg |=> avg |-> z

还有一个只输出前 10 个结果的缩减器(留给读者练习)。另外,只能有一个 reduce 任务,否则您将获得前 10x,其中 x 是 reduce 任务的数量。

关于java - Mapreduce程序只输出一条记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40091842/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com