gpt4 book ai didi

multithreading - 等待 Scala Future 完成并继续下一个

转载 作者:行者123 更新时间:2023-12-03 22:41:47 25 4
gpt4 key购买 nike

我有一个包含 500,000 个元素的列表和一个包含 20 个消费者的队列。消息以不同的速度处理(1、15、30、60 秒;3、50 分钟;3、16 小时或更长时间。24 小时为超时)。我需要消费者的响应才能对数据进行一些处理。我将为此使用 Scala Future 和基于事件的 onComplete

为了不淹没队列,我想先向队列发送 30 条消息:20 条将由消费者挑选,10 条将在队列中等待。当其中一个 Future 完成时,我想向队列发送另一条消息。你能告诉我如何实现这一目标吗?这可以用 Akka Streams 完成吗?

这是错误的,我只是想让你知道我想要什么:

private def sendMessage(ids: List[String]): Unit = {
val id = ids.head

val futureResult = Future {
//send id among some message to the queue
}.map { result =>
//process the response
}

futureResult.onComplete { _ =>
sendMessage(ids.tail)
}
}

def migrateAll(): Unit = {
val ids: List[String] = //get IDs from the DB

sendMessage(ids)
}

最佳答案

下面是一个使用 Akka Streams 对您的用例进行建模的简单示例。

让我们将处理定义为一个接受 String 并返回 Future[String] 的方法:

def process(id: String): Future[String] = ???

然后我们从包含 500,000 个 String 元素的 List 创建一个 Source 并使用 mapAsync将元素提供给处理方法。并行级别设置为 20,这意味着在任何时间点运行的 Future 不超过 20 个。当每个 Future 完成时,我们执行额外的处理并打印结果:

Source((1 to 500000).map(_.toString).toList)
.mapAsync(parallelism = 20)(process)
// do something with the result of the Future; here we create a new string
// that begins with "Processed: "
.map(s => s"Processed: $s")
.runForeach(println)

您可以在 documentation 中阅读有关 mapAsync 的更多信息.

关于multithreading - 等待 Scala Future 完成并继续下一个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46713131/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com