gpt4 book ai didi

java - 使用 Mongo-Hadoop 连接器通过 Apache Spark 更新 MongoDb 中的集合

转载 作者:可可西里 更新时间:2023-11-01 09:10:52 24 4
gpt4 key购买 nike

我想通过 Java 中的 Spark 更新 MongoDb 中的特定集合。我正在使用 MongoDB Connector for HadoopApache Spark 检索和保存信息到 Java 中的 MongoDb。

在关注了 Sampo Niskanen 的优秀 post regarding retrieving and saving collections to MongoDb via Spark,我在更新 Collection 方面遇到了困难。

MongoOutputFormat.java包括一个采用 String[] updateKeys 的构造函数,我猜它指的是一个可能的键列表,用于比较现有集合并执行更新。但是,使用带有参数 MongoOutputFormat.class 的 Spark 的 saveAsNewApiHadoopFile() 方法,我想知道如何使用该更新构造函数。

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

在此之前,MongoUpdateWritable.java被用来执行集合更新。从我在 Hadoop 上看到的例子来看,这通常是在 mongo.job.output.value 上设置的,在 Spark 中可能是这样的:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);

但是,我仍然想知道如何在 MongoUpdateWritable.java 中指定更新 key 。

不可否认,作为一种 hacky 方式,我将对象的“_id”设置为我文档的键值,这样当执行保存时,集合将覆盖具有与 _id .

JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
BSONObject o = (BSONObject) s._1;

//for all keys, set _id to key:value_
String id = "";
for (String key : o.keySet()){
id += key + ":" + (String) o.get(key) + "_";
}
o.put("_id", id);

o.put("result", s._2);
return new Tuple2<>(null, o);
});

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

我想使用 MongoOutputFormatMongoUpdateWritableConfiguration 通过 Spark 执行 mongodb 集合更新,最好使用 saveAsNewAPIHadoopFile( ) 方法。可能吗?如果没有,是否有任何其他方法不涉及专门将 _id 设置为我要更新的键值?

最佳答案

我尝试了 config.set("mongo.job.output.value","....") 的几种组合和

的几种组合
.saveAsNewAPIHadoopFile(
"file:///bogus",
classOf[Any],
classOf[Any],
classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
mongo_config
)

但没有一个有效。

我通过使用 MongoUpdateWritable 类作为我的 map 方法的输出使其工作:

items.map(row => {
val mongo_id = new ObjectId(row("id").toString)
val query = new BasicBSONObject()
query.append("_id", mongo_id)
val update = new BasicBSONObject()

update.append("$set", new BasicBSONObject().append("field_name", row("new_value")))
val muw = new MongoUpdateWritable(query,update,false,true)
(null, muw)
})
.saveAsNewAPIHadoopFile(
"file:///bogus",
classOf[Any],
classOf[Any],
classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
mongo_config
)

在 mongo 中执行的原始查询是这样的:

2014-11-09T13:32:11.609-0800 [conn438] update db.users query: { _id: ObjectId('5436edd3e4b051de6a505af9') } update: { $set: { value: 10 } } nMatched:1 nModified:0 keyUpdates:0 numYields:0 locks(micros) w:24 3ms

关于java - 使用 Mongo-Hadoop 连接器通过 Apache Spark 更新 MongoDb 中的集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26526717/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com