gpt4 book ai didi

mongodb - 如何使用 spark 查询 mongo?

转载 作者:IT老高 更新时间:2023-10-28 13:13:44 30 4
gpt4 key购买 nike

我正在使用 spark 和 mongo。我可以使用以下代码连接到 mongo:

val sc = new SparkContext("local", "Hello from scala")

val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/dbName.collectionName")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

上面的代码为我提供了集合中的所有文档。

现在我想对查询应用一些条件。

为此我使用了

config.set("mongo.input.query","{customerId: 'some mongo id'}")

这一次只需要一个条件。如果'usage' > 30,我想添加一个条件

1) 如何使用 spark 和 mongo 为 mongo 查询添加多个条件(包括大于和小于)??

我还想使用 scala 遍历查询结果的每个文档??

2) 如何使用 scala 遍历结果?

最佳答案

你好,你可以试试这个:

有一个项目集成了MongoDB和Spark

https://github.com/Stratio/deep-spark/tree/develop

1) 做一个 git clone

2) 进入 deep-spark,然后进入 deep-parent

3) mvn 安装

4) 使用此选项打开 spark-shell:

./spark-shell --jars YOUR_PATH/deep-core-0.7.0-SNAPSHOT.jar,YOUR_PATH/deep-commons-0.7.0-SNAPSHOT.jar,YOUR_PATH/deep-mongodb-0.7.0-SNAPSHOT .jar,YOUR_PATH/mongo-java-driver-2.12.4-sources.jar

记得用真实路径覆盖“YOUR_PATH”

5)在spark shell中执行一个简单的例子:

import com.stratio.deep.mongodb.config.MongoDeepJobConfig
import com.stratio.deep.mongodb.extractor.MongoNativeDBObjectExtractor
import com.stratio.deep.core.context.DeepSparkContext
import com.mongodb.DBObject
import org.apache.spark.rdd.RDD
import com.mongodb.QueryBuilder
import com.mongodb.BasicDBObject

val host = "localhost:27017"


val database = "test"

val inputCollection = "input";

val deepContext: DeepSparkContext = new DeepSparkContext(sc)

val inputConfigEntity: MongoDeepJobConfig[DBObject] = new MongoDeepJobConfig[DBObject](classOf[DBObject])


val query: QueryBuilder = QueryBuilder.start();

query.and("number").greaterThan(27).lessThan(30);


inputConfigEntity.host(host).database(database).collection(inputCollection).filterQuery(query).setExtractorImplClass(classOf[MongoNativeDBObjectExtractor])


val inputRDDEntity: RDD[DBObject] = deepContext.createRDD(inputConfigEntity)

最好的一点是您可以使用 QueryBuilder 对象进行查询

你也可以像这样传递一个 DBObject:

{ "number" : { "$gt" : 27 , "$lt" : 30}}

如果你想迭代你可以使用方法 yourRDD.collect()。你也可以使用你的RDD.foreach,但你必须提供一个函数。

还有另一种方法可以将 jar 添加到 spark 中。您可以修改 spark-env.sh 并将这一行放在最后:

CONFDIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
for jar in $(ls $CONFDIR/../lib/*.jar); do
SPARK_CLASSPATH=$SPARK_CLASSPATH:${jar}
done

在 lib 文件夹中,您可以放置​​您的库,仅此而已。

免责声明:我目前正在研究 Stratio

关于mongodb - 如何使用 spark 查询 mongo?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27523337/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com