gpt4 book ai didi

mongodb - 使用 ReactiveMongo 处理来自上限集合的作业直到被中断

转载 作者:可可西里 更新时间:2023-11-01 09:15:14 28 4
gpt4 key购买 nike

我在 MongoDB 中有一个 jobs_queue 集合。这是一个上限集合,我正在使用可尾游标进行轮询:

val cur =
jobsQueue
.find(Json.obj("done" -> Json.obj("$ne" -> true)))
.options(QueryOpts().tailable.awaitData)
.cursor[JsObject]

cur.enumerate() |>>> Iteratee.foreach { queuedDoc =>
// do some processing and store the results back in the DB
}

这是从常规 Scala App 调用的,因此根本没有 Akka 或 Play 包装。

确保 App 在我明确退出 Iteratee.foreach 之前不会退出的最合适方法是什么?此外,如果有更简单(即使稍微不那么优雅)的方法,我根本不必使用 play-iteratees


附言我确实确保收藏有上限:

val jobsQueueMaybe = db.collection[JSONCollection]("jobs_queue")
val jobsQueue: JSONCollection =
jobsQueueMaybe.stats()
.flatMap {
case stats if !stats.capped =>
jobsQueueMaybe.convertToCapped(size = 1024 * 1024, maxDocuments = None)
case _ =>
Future(jobsQueueMaybe)
}
.recover { case _ => jobsQueueMaybe.createCapped(size = 1024 * 1024, maxDocuments = None) }
.map { _ => jobsQueueMaybe }

P.P.S.

对于我如何设计这段逻辑,以及我如何通过重新思考我的方法和稍微修改实现来解决这个问题,我也将不胜感激。

最佳答案

作为当前的解决方法,我将 Iteratee.foreach 更改为 Iteratee.foldM 以便每次迭代都返回一个 Future;这样,它似乎强制 ReactiveMongo 继续计算直到被中断,而不是 foreach 似乎退出得太早:

cur.enumerate() |>>> Iteratee.foldM(()) { (acc, queuedDoc) =>
// always yield something like Future.successful(acc) or an actual `Future[Unit]`
}

然后,我只需要等到整个程序终止(通过将某些东西放入 stopSignal: ConcurrentLinkedQueue 来发出信号:

while (stopSignal.isEmpty) Thread.sleep(1000)

但是,虽然它运作良好,但我并不是特别喜欢这个解决方案。

也许我的担心是没有道理的,但我真的想要一个关于我应该如何解决这个问题的更权威的答案。

关于mongodb - 使用 ReactiveMongo 处理来自上限集合的作业直到被中断,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27922855/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com