gpt4 book ai didi

Mongodb-hadoop-connector 会创建很多分区,即使我向它添加查询

转载 作者:可可西里 更新时间:2023-11-01 16:24:56 26 4
gpt4 key购买 nike

我有一个非常大的 mongo 表,我想使用 spark 对其进行一些分析,它太大了,我不想加载整个数据库。但看起来它总是扫描整个数据库并将它们分成大量的分区,即使我将 mongo.input.query 传递给它也是如此。我正在使用 mongo-hadoop加载它,我的代码如下所示:

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)

val mongoConfig = new Configuration()

val beginDate = new Date(2016 - 1900,6,7)

println("the begin data is: =========== >" + beginDate)

val beginId = new ObjectId(beginDate, 0,0.toShort,0)

mongoConfig.set("mongo.input.uri",
"mongodb://mymongoduri/mongodb.mongocollection")

val queryStr = """{"_id": {"$gt" : {"$oid":"beginDate" }}}""".replace("beginDate", beginId.toString)
mongoConfig.set("mongo.input.query", queryStr)
mongoConfig.set("mongo.input.fields", """{ "its.src":-1, "its._id":-1, "its.cid": -1}""")

val documents = sc.newAPIHadoopRDD(
mongoConfig, // Configuration
classOf[MongoInputFormat], // InputFormat
classOf[Object], // Key type
classOf[BSONObject]) // Value type

val OUTPUT_PATH = if(ENV == Some("dev")){
s"./result"
} else{
s"s3://${OUTPUT_BUCKET}/output/graph/${beginDate}"
}

documents.saveAsNewAPIHadoopFile(
OUTPUT_PATH,
classOf[Object],
classOf[BSONObject],
classOf[BSONFileOutputFormat[Object, BSONObject]]
)

它最终在 s3 中产生了大量的空文件,这不是我预期的结果(而且它浪费了很多钱)。

我已经阅读了文档,它说 mongo.input.query使用查询过滤输入集合,我可以只加载查询的数据吗?不只是过滤它们。

或者,我可以只存储那些非空的分区吗?

最佳答案

mongo 的 spark hadoop 连接器总是读取整个集合并相应地进行分区,然后使用输入查询过滤对象。当您保存文档 RDD 时,它总是会保存分区,无论它是否为空。

您可以将 RDD 重新分区为 1。或者使用 documents.coalesce(1).saveAsNewAPIHadoopFile(....)

关于Mongodb-hadoop-connector 会创建很多分区,即使我向它添加查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38259056/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com