gpt4 book ai didi

hadoop - 如何直接将 mapper-reducer 的输出发送到另一个 mapper-reducer 而无需将输出保存到 hdfs

转载 作者:可可西里 更新时间:2023-11-01 14:16:55 27 4
gpt4 key购买 nike

问题最终得到解决在底部查看我的解决方案


最近我正在尝试运行 Mahout in Action 的第 6 章( list 6.1 ~ 6.4)中的推荐系统示例。但是我遇到了一个问题,我已经用谷歌搜索了但找不到解决方案。

问题来了:我有一对 mapper-reducer

public final class WikipediaToItemPrefsMapper extends
Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
Matcher m = NUMBERS.matcher(line);
m.find();
VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
VarLongWritable itemID = new VarLongWritable();
while (m.find()) {
itemID.set(Long.parseLong(m.group()));
context.write(userID, itemID);
}
}
}

public class WikipediaToUserVectorReducer
extends
Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
Iterable<VarLongWritable> itemPrefs, Context context)
throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
userVector.set((int) itemPref.get(), 1.0f);
}
context.write(userID, new VectorWritable(userVector));
}
}

reducer 输出一个 userID 和一个 userVector,它看起来像这样:98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}

然后我想用另一对mapper-reducer来处理这个数据

public class UserVectorSplitterMapper
extends
Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
throws IOException, InterruptedException {
long userID = key.get();
Vector userVector = value.get();
Iterator<Vector.Element> it = userVector.iterateNonZero();
IntWritable itemIndexWritable = new IntWritable();
while (it.hasNext()) {
Vector.Element e = it.next();
int itemIndex = e.index();
float preferenceValue = (float) e.get();
itemIndexWritable.set(itemIndex);
context.write(itemIndexWritable,
new VectorOrPrefWritable(userID, preferenceValue));
}
}
}

当我尝试运行该作业时,它显示错误

org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable

第一个 mapper-reducer 将输出写入 hdfs,第二个 mapper-reducer 尝试读取输出,mapper 可以将 98955 转换为 VarLongWritable,但无法转换{590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} 到 VectorWritable,所以我想知道有没有办法让第一个 mapper-reducer 直接将输出发送到第二对,然后有无需进行数据转换。我查看了 Hadoop in action 和 hadoop: the definitive guide,似乎没有这样的方法可以做到这一点,有什么建议吗?


问题解决了

解决方案:通过使用SequenceFileOutputFormat,我们可以将第一个MapReduce工作流的reduce结果输出并保存在DFS上,然后第二个MapReduce工作流可以使用读取临时文件作为输入SequenceFileInputFormat 类作为创建映射器时的参数。由于矢量将保存在具有特定格式的二进制序列文件中,SequenceFileInputFormat 可以读取它并将其转换回矢量格式。

下面是一些示例代码:

confFactory ToItemPrefsWorkFlow = new confFactory
(new Path("/dbout"), //input file path
new Path("/mahout/output.txt"), //output file path
TextInputFormat.class, //input format
VarLongWritable.class, //mapper key format
Item_Score_Writable.class, //mapper value format
VarLongWritable.class, //reducer key format
VectorWritable.class, //reducer value format
**SequenceFileOutputFormat.class** //The reducer output format

);
ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
JobConf conf1 = ToItemPrefsWorkFlow.getConf();


confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
(new Path("/mahout/output.txt"),
new Path("/mahout/UserVectorToCooccurrence"),
SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
//UserVectorToCooccurrenceMapper.class,
IntWritable.class,
IntWritable.class,
IntWritable.class,
VectorWritable.class,
SequenceFileOutputFormat.class
);
UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();

JobClient.runJob(conf1);
JobClient.runJob(conf2);

如果您对此有任何疑问,请随时与我联系

最佳答案

您需要显式配置第一个作业的输出以使用 SequenceFileOutputFormat 并定义输出键和值类:

job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);

在没有看到您的驱动程序代码的情况下,我猜您正在使用 TextOutputFormat 作为第一个作业的输出,并使用 TextInputFormat 作为第二个作业的输入 - 这种输入格式发送成对的 <Text, Text>到第二个映射器

关于hadoop - 如何直接将 mapper-reducer 的输出发送到另一个 mapper-reducer 而无需将输出保存到 hdfs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10266367/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com