gpt4 book ai didi

java - Mongo hadoop mapreduce 显示错误

转载 作者:行者123 更新时间:2023-12-01 14:22:48 25 4
gpt4 key购买 nike

我对大数据和 NOSQL 领域很陌生,我正在尝试一个示例程序

我正在尝试从我的 mongo 数据库中获取详细信息。以下是我的数据库架构--

  { "_id" : ObjectId("51d11c95e82449edcf7640bc"), "Called_Number" : NumberLong("7259400112"), "Calling_Number" : NumberLong("9008496311"), "Date" : "22-Apr-13", "Time" : "10:21:43", "Duration" : "4:36" }

现在我尝试从数据库中获取值并运行映射缩减作业,以便我可以找到如下所示的详细信息

{ "调用号码":7259400112 , 被叫号码: "9008496311"频率: "3"}

以下是我正在尝试的

package callcircle;

import java.io.*;
import java.util.*;

import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.bson.*;

import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.hadoop.*;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.*;

public class call {

private static final Log log = LogFactory.getLog(call.class);

public static class TokenizerMapper extends
Mapper<Object, Object, Text, IntWritable> {


private final static IntWritable one = new IntWritable(1);
private final Text word = new Text();


public void map(Object calling_number, Object called_number,
Context context) throws IOException, InterruptedException {
System.out.println("entering method");


// calling_number = (Object) calling_number).get("Calling_Number");
called_number = ((BSONWritable) called_number).get("Called_Number");

String CallNumer01 = called_number.toString();

String[] recips = CallNumer01.split(",");



for (int i = 0; i < recips.length; i++) {
String recip = recips[i].trim();
if (recip.length() > 0) {


// context.write(new CallPair(calling_number, recip), new IntWritable(1));
// word.set(CallNumer01); context.write( word, one );

//System.out.println("After mapping");

}
}
}
}

public class CallReducer extends
Reducer<CallPair, IntWritable, BSONWritable, IntWritable> {

public void reduce(final CallPair pKey,
final Iterable<IntWritable> pValues, final Context pContext)
throws IOException, InterruptedException {
int sum = 0;
for (final IntWritable value : pValues) {
sum += value.get();
}
@SuppressWarnings("static-access")
BSONObject outDoc = new BasicDBObjectBuilder().start()
.add("f", pKey.calling_number).add("t", pKey.called_number)
.get();
BSONWritable pkeyOut = new BSONWritable(outDoc);
pContext.write(pkeyOut, new IntWritable(sum));
}

}



public static void main(String[] args) throws Exception {
System.out.println("In Main");
final Configuration conf = new Configuration();
System.out.println("Conf1: " + conf);
MongoConfigUtil.setInputURI(conf, "mongodb://localhost/CDR.in1");
MongoConfigUtil.setOutputURI(conf, "mongodb://localhost/CDR.out");
System.out.println("Conf: " + conf);

final Job job = new Job(conf, "CDR");

job.setJarByClass(call.class);
System.out.println("Conf2: " + conf);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(CallReducer.class);
job.setReducerClass(CallReducer.class);
System.out.println("Conf3: " + conf);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.out.println("Conf3: " + conf);
job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class);
System.out.println("Conf4: " + conf);
System.exit(job.waitForCompletion(true) ? 0 : 1);
System.out.println("Conf6: " + conf);
}

}

但是我收到以下错误

In Main
Conf1: Configuration: core-default.xml, core-site.xml
Conf: Configuration: core-default.xml, core-site.xml
13/07/01 19:04:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
Conf2: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf4: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
13/07/01 19:04:27 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/07/01 19:04:27 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/07/01 19:04:28 INFO util.MongoSplitter: Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
13/07/01 19:04:28 INFO mapred.JobClient: Running job: job_local_0001
13/07/01 19:04:28 INFO util.MongoSplitter: Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
should setup context
13/07/01 19:04:28 INFO input.MongoInputSplit: Deserialized MongoInputSplit ... { length = 9223372036854775807, locations = [localhost], keyField = _id, query = { "$query" : { }}, fields = { }, sort = { }, limit = 0, skip = 0, noTimeout = false}
13/07/01 19:04:28 INFO mapred.MapTask: io.sort.mb = 100
13/07/01 19:04:28 INFO mapred.MapTask: data buffer = 79691776/99614720
13/07/01 19:04:28 INFO mapred.MapTask: record buffer = 262144/327680
entering method
13/07/01 19:04:28 WARN mapred.LocalJobRunner: job_local_0001
java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.hadoop.io.BSONWritable
at callcircle.call$TokenizerMapper.map(call.java:36)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
13/07/01 19:04:29 INFO mapred.JobClient: map 0% reduce 0%
13/07/01 19:04:29 INFO mapred.JobClient: Job complete: job_local_0001
13/07/01 19:04:29 INFO mapred.JobClient: Counters: 0

请有人指导我哪里错了?

谢谢

最佳答案

如果映射器和化简器不使用相同的输出类型,则必须显式指定映射器键/值类型 - 因此您可能还需要添加:

setMapOutputKeyClass(Text.class)
setMapOutputValueClass(IntWritable.class)

关于java - Mongo hadoop mapreduce 显示错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17406063/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com