gpt4 book ai didi

java - 如何在 hadoop 中管理连接 - MultipleInputPath

转载 作者:可可西里 更新时间:2023-11-01 16:14:18 30 4
gpt4 key购买 nike

在 map side join 之后,我在 Reducer 中得到的数据是

key------ book
values
6
eraser=>book 2
pen=>book 4
pencil=>book 5

我基本上想做的是

eraser=>book = 2/6
pen=>book = 4/6
pencil=>book = 5/6

我最初做的是这样的

public void reduce(Text key,Iterable<Text> values , Context context) throws IOException, InterruptedException{

System.out.println("key------ "+key);
System.out.println("Values");
for(Text value : values){
System.out.println("\t"+value.toString());
String v = value.toString();
double BsupportCnt = 0;
double UsupportCnt = 0;
double res = 0;
if(!v.contains("=>")){
BsupportCnt = Double.parseDouble(v);
}
else{
String parts[] = v.split(" ");
UsupportCnt = Double.parseDouble(parts[1]);
}
// calculate here
res = UsupportCnt/BsupportCnt;

}

如果传入的数据如上,则可以正常工作

但是如果mapper传入的数据是

key------ book
values
eraser=>book 2
pen=>book 4
pencil=>book 5
6

这行不通否则我需要将所有 => 存储在一个列表中(如果传入数据是大数据,该列表可能会占用堆空间),一旦我得到一个数字,我就应该进行计算。

更新由于 Vefthym 要求在值到达 reducer 之前对其进行二次排序。我使用 htuple 来做同样的事情。我推荐了this link

在 mapper1 中发出 eraser=>book 2 作为值所以

public class AprioriItemMapper1 extends Mapper<Text, Text, Text, Tuple>{
public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
//Configurations and other stuffs
//allWords is an ArrayList
if(allWords.size()<=2)
{
Tuple outputKey = new Tuple();
String LHS1 = allWords.get(1);
String RHS1 = allWords.get(0)+"=>"+allWords.get(1)+" "+value.toString();
outputKey.set(TupleFields.ALPHA, RHS1);
context.write(new Text(LHS1), outputKey);
}
//other stuffs

Mapper2 发出 numbers 作为值

public class AprioriItemMapper2 extends Mapper<Text, Text, Text, Tuple>{
Text valEmit = new Text();
public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
//Configuration and other stuffs
if(cnt != supCnt && cnt < supCnt){
System.out.println("emit");
Tuple outputKey = new Tuple();
outputKey.set(TupleFields.NUMBER, value);

System.out.println("v---"+value);
System.out.println("outputKey.toString()---"+outputKey.toString());
context.write(key, outputKey);
}

Reducer 我只是尝试打印键和值

但是这个发现了错误

Mapper 2: 
line book
Support Count: 2
count--- 1
emit
v---6
outputKey.toString()---[0]='6,
14/08/07 13:54:19 INFO mapred.LocalJobRunner: Map task executor complete.
14/08/07 13:54:19 WARN mapred.LocalJobRunner: job_local626380383_0003
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.htuple.Tuple
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.htuple.Tuple
at org.htuple.TupleMapReducePartitioner.getPartition(TupleMapReducePartitioner.java:28)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:601)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:106)
at edu.am.bigdata.apriori.AprioriItemMapper1.map(AprioriItemMapper1.java:49)
at edu.am.bigdata.apriori.AprioriItemMapper1.map(AprioriItemMapper1.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapreduce.lib.input.DelegatingMapper.run(DelegatingMapper.java:51)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)

错误在 context.write(new Text(LHS1), outputKey); 来自 AprioriItemMapper1.java:49但以上打印细节来自Mapper 2

有什么更好的办法吗请建议。

最佳答案

我建议使用二次排序,这将保证第一个值(按字典顺序排序)是数字值,假设没有以数字开头的单词。

如果这行不通,那么,考虑到您提到的可扩展性限制,我会将 reducer 的值存储在 HashMap<String,Double> 中。缓冲区,其中键是“=>”的左侧部分,值是它们的数值。您可以存储这些值,直到获得分母的值 BsupportCnt .然后,您可以发出具有正确分数的所有缓冲区内容和所有剩余值,因为它们一个接一个地出现,而无需再次使用缓冲区(因为您现在知道分母)。类似的东西:

public void reduce(Text key,Iterable<Text> values , Context context) throws IOException, InterruptedException{
Map<String,Double> buffer = new HashMap<>();
double BsupportCnt = 0;
double UsupportCnt;
double res;
for(Text value : values){
String v = value.toString();

if(!v.contains("=>")){
BsupportCnt = Double.parseDouble(v);
} else {
String parts[] = v.split(" ");
UsupportCnt = Double.parseDouble(parts[1]);

if (BsupportCnt != 0) { //no need to add things to the buffer any more
res = UsupportCnt/BsupportCnt;
context.write(new Text(v), new DoubleWritable(res));
} else {
buffer.put(parts[0], UsupportCnt);
}
}

}


//now emit the buffer's contents
for (Map<String,Double>.Entry entry : buffer) {
context.write(new Text(entry.getKey()), new DoubleWritable(entry.getValue()/BsupportCnt));
}
}

您可以通过仅将“=>”的左侧部分存储为 HashMap 的键来获得更多空间,因为右侧部分始终是 reducer 的输入键。

关于java - 如何在 hadoop 中管理连接 - MultipleInputPath,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25160703/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com