gpt4 book ai didi

apache-spark - 处理 spark streaming 中的数据库连接

转载 作者:行者123 更新时间:2023-12-01 11:29:41 24 4
gpt4 key购买 nike

我不确定我是否正确理解了 spark 如何处理数据库连接以及如何可靠地使用 spark 内部的大量数据库更新操作而不可能搞砸 spark 作业。这是我一直在使用的代码片段(为了便于说明):

val driver = new MongoDriver
val hostList: List[String] = conf.getString("mongo.hosts").split(",").toList
val connection = driver.connection(hostList)
val mongodb = connection(conf.getString("mongo.db"))
val dailyInventoryCol = mongodb[BSONCollection](conf.getString("mongo.collections.dailyInventory"))

val stream: InputDStream[(String,String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, fromOffsets,
(mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()));

def processRDD(rddElem: RDD[(String, String)]): Unit = {
val df = rdd.map(line => {
...
}).flatMap(x => x).toDF()

if (!isEmptyDF(df)) {
var mongoF: Seq[Future[dailyInventoryCol.BatchCommands.FindAndModifyCommand.FindAndModifyResult]] = Seq();

val dfF2 = df.groupBy($"CountryCode", $"Width", $"Height", $"RequestType", $"Timestamp").agg(sum($"Frequency")).collect().map(row => {
val countryCode = row.getString(0); val width = row.getInt(1); val height = row.getInt(2);
val requestType = row.getInt(3); val timestamp = row.getLong(4); val frequency = row.getLong(5);
val endTimestamp = timestamp + 24*60*60; //next day

val updateOp = dailyInventoryCol.updateModifier(BSONDocument("$inc" -> BSONDocument("totalFrequency" -> frequency)), false, true)

val f: Future[dailyInventoryCol.BatchCommands.FindAndModifyCommand.FindAndModifyResult] =
dailyInventoryCol.findAndModify(BSONDocument("width" -> width, "height" -> height, "country_code" -> countryCode, "request_type" -> requestType,
"startTs" -> timestamp, "endTs" -> endTimestamp), updateOp)

f
})

mongoF = mongoF ++ dfF2

//split into small chunk to avoid drying out the mongodb connection
val futureList: List[Seq[Future[dailyInventoryCol.BatchCommands.FindAndModifyCommand.FindAndModifyResult]]] = mongoF.grouped(200).toList

//future list
futureList.foreach(seqF => {
Await.result(Future.sequence(seqF), 40.seconds)
});
}

stream.foreachRDD(processRDD(_))

基本上,我使用的是 Reactive Mongo (Scala),对于每个 RDD,我将其转换为数据框,分组/提取必要的数据,然后针对 mongo 发起大量数据库更新查询。我想问:

  1. 我正在使用 mesos 在 3 台服务器上部署 spark,并且还有一台服务器用于 mongo 数据库。这是处理数据库连接的正确方法吗?我担心的是数据库连接/轮询是否在 spark 作业开始时打开并在整个 spark 持续时间(周,月......)期间正确维护(尽管超时/网络错误故障转移),以及是否会在每次关闭时关闭批完了吗?考虑到作业可能会安排在不同的服务器上?这是否意味着每个批处理都会打开不同的数据库连接集?

  2. 如果执行查询时出现异常怎么办。该批处理的 Spark 作业会失败吗?但是下一批还会继续吗?

  3. 如果有太多查询(2000->+)无法在 mongo-database 上运行更新,并且执行时间超过配置的 spark 批处理持续时间(2 分钟),是否会导致问题?我注意到,在我当前的设置中,大约 2-3 天后,所有批处理都在 Spark WebUI 上作为“进程”排队(如果我禁用 mongo 更新部分,那么我可以毫无问题地运行一周),没有能够正常退出。这基本上会挂断所有批处理作业,直到我重新启动/重新提交作业。

非常感谢。如果您能帮我解决这个问题,我将不胜感激。

最佳答案

请阅读 http://spark.apache.org/docs/latest/streaming-programming-guide.html 中的“使用 foreachRDD 的设计模式”部分.这将消除您对应如何使用/创建连接的疑虑。

其次,我建议将直接更新操作与您的 Spark 作业分开。更好的方法是你的 spark 作业,处理数据然后将其发布到 Kafka 队列中,然后有另一个专用进程/作业/代码从 Kafka 队列读取数据并在 Mongo DB 上执行插入/更新操作。

关于apache-spark - 处理 spark streaming 中的数据库连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33709769/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com