gpt4 book ai didi

algorithm - 如何干净/安全地从异步通信链中删除链接

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:01:50 30 4
gpt4 key购买 nike

我有一个 Akka Actor 链 K -> L -> M,其中 K 向 L 发送消息,L 向 M 发送消息,每个人都可以使用 sender 回复其前任收到消息。

我的问题是:我们如何才能安全地将 L 从链中解除链接,以便在操作后 K 直接向 M 发送消息,而 M 将 K 视为其发送者

如果每个 Actor 都稳定且长寿,我就能想出如何让 K 和 M 互相交谈。 L 可以逗留足够长的时间来转发她邮箱中已有的消息,直到收到一个信号,表明 K 和 M 不再与 L 通话。

但是,K 和 M 也可能正在考虑断开自己的链接,这就是一切变得棘手的地方。

是否有一个众所周知的协议(protocol)可以使这项工作安全进行?到目前为止,我还没有找到合适的搜索词。

我知道另一种选择是卡住系统并将整个东西(减去 L)复制到一个新链中。但是,我想实现一个更实时、更少中断的解决方案。

我一直在考虑某种锁交换,其中 K 和 M promise 在 L 完成取消链接并转发所有消息之前不会取消链接。但是任何带有“锁定”一词的解决方案在异步解决方案中似乎都很尴尬,即使参与者不会被完全锁定(只是将他们自己的取消链接操作延迟到方便的时间)。

最佳答案

一个粗略的解决方案是在每个节点内维护前一个和下一个节点的状态,然后支持从链中解开一个节点的消息,并告诉一个节点它的下一个节点已经改变。当下一个节点发生变化时,向通知您的节点发送毒丸(假设它是被取消链接的节点)以优雅地停止它。在取消链接和停止之前,充当可能传入的任何更多消息的纯直通。将所有这些放在一起,代码看起来像这样:

object ChainNode {
case object Unlink
case class Link(prev:Option[ActorRef], next:Option[ActorRef])
case class ChangeNextNode(node:Option[ActorRef])
}

trait ChainNode extends Actor{
import ChainNode._
import context._

override def postStop{
println(s"${self.path} has been stopped")
}

def receive = chainReceive()

def chainReceive(prevNode:Option[ActorRef] = None, nextNode:Option[ActorRef] = None):Receive = {
case Unlink =>
prevNode foreach{ node =>
println(s"unlinking node ${self.path} from sender ${node.path}")
node ! ChangeNextNode(nextNode)
}
become(unlinked(nextNode))

case Link(prev, next) =>
println(s"${self.path} is linking to $prev and $next")
become(chainReceive(prev, next))

case ChangeNextNode(newNext) =>
println(s"${self.path} is changing next node to $newNext")
become(chainReceive(prevNode, newNext))
sender ! PoisonPill

case other =>
println(s"${self.path} received message $other")
val msg = processOther(other)
nextNode foreach{ node =>
println(s"${self.path} forwarding on to ${node.path}")
node ! msg
}
}

def unlinked(nextNode:Option[ActorRef]):Receive = {
case any =>
println(s"${self.path} has been unlinked, just forwarding w/o processing...")
nextNode foreach (_ ! any)
}

def processOther(msg:Any):Any
}

class NodeA extends ChainNode{
def processOther(msg:Any) = "foo"
}

class NodeB extends ChainNode{
def processOther(msg:Any) = "bar"
}

class NodeC extends ChainNode{
def processOther(msg:Any) = "baz"
}

然后,一个简单的测试场景,其中链接在中途被更改:

object ChainTest{
import ChainNode._

def main(args: Array[String]) {
val system = ActorSystem("chain")
val a = system.actorOf(Props[NodeA])
val b = system.actorOf(Props[NodeA])
val c = system.actorOf(Props[NodeA])

a ! Link(None, Some(b))
b ! Link(Some(a), Some(c))
c ! Link(Some(b), None)

import system.dispatcher
Future{
for(i <- 1 until 10){
a ! "hello"
Thread.sleep(200)
}
}

Future{
Thread.sleep(300)
b ! Unlink
}
}
}

它并不完美,但可以作为您的良好起点。一个缺点是像 UnlinkChangeNextNode 这样的消息仍然会按照它们收到的顺序进行处理。如果邮箱中有一堆邮件在他们前面,则必须先处理这些邮件,然后更改(例如取消链接)才会生效。这可能会导致更改发生不必要的延迟。如果这是一个问题,那么您可能需要查看基于优先级的邮箱,其中 UnlinkChangeNextNode 等消息的优先级高于正在处理的其余消息。

关于algorithm - 如何干净/安全地从异步通信链中删除链接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20599099/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com