gpt4 book ai didi

multithreading - 如何保证akka中fork的顺序

转载 作者:行者123 更新时间:2023-12-01 10:44:18 36 4
gpt4 key购买 nike

我们正在为每个(小的)传入消息组创建一个参与者链,以保证它们的顺序处理和管道(组通过公共(public) ID 区分)。问题是我们的链有 fork ,比如 A1 -> (A2 -> A3 | A4 -> A5) 我们应该保证通过 A2 -> A3 A4 -> A5。当前的遗留解决方案是阻止 A1 actor 直到当前消息被完全处理(在其中一个子链中):

def receive { //pseudocode
case x => ...
val f = A2orA4 ? msg
Await.complete(f, timeout)
}

因此,应用程序中的线程数与正在处理的消息数成正比,无论这些消息是事件的还是正在异步等待来自外部服务的某些响应。它与 fork-join(或任何其他动态)池一起工作大约两年,但当然不能与固定池一起工作,并且在高负载的情况下会极大地降低性能。不仅如此,它还会影响 GC,因为每个阻塞的 fork-actor 都会在内部保存冗余的先前消息的状态。

即使有背压,它也会创建比收到的消息多 N 倍的线程(因为流中有 N 个顺序 fork ),这仍然很糟糕,因为处理一条消息需要很长时间,但 CPU 不多。所以我们应该处理尽可能多的消息,因为我们有足够的内存。我提出的第一个解决方案 - 将链线性化,如 A1 -> A2 -> A3 -> A4 -> A5。还有更好的吗?

最佳答案

更简单的解决方案是将最后收到的消息的 future 存储到参与者的状态中,并将其与之前的 future 链接起来:

def receive = process(Future{new Ack}) //completed future
def process(prevAck: Future[Ack]): Receive = { //pseudocode
case x => ...
context become process(prevAck.flatMap(_ => A2orA4 ? msg))
}

因此它将创建没有任何阻塞的 future 链。 future 完成后链将被删除(最后一个除外)。

关于multithreading - 如何保证akka中fork的顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28044971/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com