gpt4 book ai didi

java - akka 中的正确设计。 - 消息传递

转载 作者:行者123 更新时间:2023-12-02 10:57:53 25 4
gpt4 key购买 nike

我已经阅读了一些关于 akka 如何以及为什么不保证消息传递的帖子。 documentation , 这个 discussion其他关于 group 的讨论确实很好地解释了它。

我对 akka 还很陌生,想知道一个案例的合适设计。例如,假设我在不同的机器上有 3 个不同的 Actor 。一个负责食谱,另一个负责历史,最后一个负责技术书籍。

我在另一台机器上有一个主要 Actor 。假设有一个对主要 Actor 的查询来搜索我们是否有可用的书籍。主要参与者向 3 个远程参与者发送请求,并期待结果。所以我这样做:

  val scatter = system.actorOf(
Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
routees=someRoutees, within = 10 seconds)), "router")
implicit val timeout = Timeout(10 seconds)
val futureResult = scatter ? Text("Concurrency in Practice")
// What should I do here?.
//val result = Await.result(futureResult, timeout.duration) line(a)

简而言之,我已经向所有 3 个远程参与者发送了请求,并期望在 10 秒内得到结果。

应该采取什么行动?
  • 假设我在 10 秒内没有得到结果,我是否应该再次向所有人发送新请求?
  • 如果within怎么办以上时间为时尚早。但我不知道这可能需要多少时间。
  • 如果within怎么办时间足够了,但消息被丢弃了。

  • 如果我在 within 中没有得到回复时间并再次重新发送请求。像这样,它保持异步:
    futureResult onComplete{
    case Success(i) => println("Result "+i)
    case Failure(e) => //send again
    }

    但是在查询太多的情况下,会不会是调用线程太多,体积庞大?如果我取消注释 line(a) ,它会变得同步并且在负载下可能会表现不佳。

    说我在 10 秒内没有得到响应。如 within时间还早,然后又发生了大量无用的计算。如果消息被丢弃,则 10浪费了几秒钟的宝贵时间。万一,假设我知道消息已送达,我可能会等待更长的时间而不会持怀疑态度。

    人们如何解决这些问题? ACK ?但是我必须将状态存储在所有查询的参与者中。这一定是一件很常见的事情,我正在寻找合适的设计。

    最佳答案

    我将尝试为您回答其中的一些问题。我不会对所有事情都给出具体的答案,但希望我能引导您朝着正确的方向前进。

    首先,您需要改变向 3 个进行书籍搜索的参与者传达请求的方式。使用 ScatterGatherFirstCompletedRouter这里可能不是正确的方法。该路由器将仅等待其中一个路由(第一个响应的路由)的回答,因此您的结果集将不完整,因为它不包含来自其他 2 个路由的结果。还有一个BroadcastRouter ,但这也不符合您的需求,因为它只处理 tell (!)而不是 ask (?) .要做你想做的事,一种选择是将请求发送给每个接收者,得到 Futures用于响应,然后将它们组合成一个聚合 Future使用 Future.sequence .一个简化的示例可能如下所示:

    case class SearchBooks(title:String)
    case class Book(id:Long, title:String)

    class BookSearcher extends Actor{

    def receive = {
    case req:SearchBooks =>
    val routees:List[ActorRef] = ...//Lookup routees here
    implicit val timeout = Timeout(10 seconds)
    implicit val ec = context.system.dispatcher

    val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
    val fut = Future.sequence(futures)

    val caller = sender //Important to not close over sender
    fut onComplete{
    case Success(books) => caller ! books.flatten

    case Failure(ex) => caller ! Status.Failure(ex)
    }
    }
    }

    现在这不会是我们的最终代码,但它是您的示例尝试执行的操作的近似值。在这个例子中,如果任何一个下游路由失败/超时,我们将命中我们的 Failure阻塞,调用者也会失败。如果他们都成功,调用者将得到 Book的聚合列表。对象。

    现在回答你的问题。首先,如果您没有在超时内从其中一个路由中得到答案,您是否应该再次向所有参与者发送请求。这个问题的答案真的取决于你。您会允许另一端的用户看到部分结果(即 3 个 Actor 中的 2 个的结果),还是每次都必须是完整的结果集?如果答案是肯定的,您可以将发送到路由的代码调整为如下所示:
    val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
    case ex =>
    //probably log something here
    List()
    })

    使用此代码,如果任何路由因任何原因超时或失败,“Book”的空列表将替换为响应而不是失败。现在,如果您不能接受部分结果,那么您可以再次重新发送整个请求,但您必须记住,另一端可能有人在等待他们的图书结果,他们不想永远等待。

    对于你的第二个问题,你问如果你的超时时间过早怎么办?您选择的超时值将完全由您决定,但很可能应基于两个因素。第一个因素将来自测试搜索的调用时间。找出平均需要多长时间并根据该值选择一个值,并带有一点缓冲以确保安全。第二个因素是另一端的某人愿意等待他们的结果多长时间。您可以在超时方面非常保守,为了安全起见,将其设置为 60 秒,但是如果另一端确实有人在等待结果,他们愿意等待多长时间?我宁愿得到一个失败响应,表明我应该再试一次,而不是永远等待。因此,考虑到这两个因素,您应该选择一个值,该值将允许您在非常高的时间内获得响应,同时仍然不会让另一端的调用者等待太长时间。

    对于问题 3,您询问如果消息被丢弃会发生什么。在这种情况下,我猜测接收该消息的人的 future 将只是超时,因为它不会得到响应,因为接收者参与者永远不会收到要响应的消息。 Akka 不是 JMS;它没有确认模式,如果收件人没有收到并确认消息,则可以多次重新发送消息。

    此外,正如您从我的示例中看到的,我同意不阻塞聚合 Future通过使用 Await .我更喜欢使用非阻塞回调。在接收函数中阻塞并不理想,因为 Actor实例将停止处理其邮箱,直到阻塞操作完成。通过使用非阻塞回调,您可以释放该实例以返回处理其邮箱,并允许结果的处理只是在 ExecutionContext 中执行的另一个作业。 ,与处理其邮箱的actor分离。

    现在,如果您真的不想在网络不可靠时浪费通信,您可以查看 Reliable Proxy在 Akka 2.2 中可用。如果你不想走这条路,你可以发送 ping 自己滚。定期向路由输入消息。如果一个人没有及时回复,您将其标记为关闭,并且在您获得可靠的(在很短的时间内)之前不要向其发送消息 ping从它,有点像每个路由的 FSM。如果您绝对需要此行为,则其中任何一个都可以工作,但您需要记住,这些解决方案会增加复杂性,并且仅应在您绝对需要此行为时使用。如果您正在开发银行软件并且您绝对需要有保证的交付语义,否则会导致不良的财务影响,请务必采用这种方法。在决定是否需要这样的东西时要明智,因为我打赌你 90% 的时间都不需要。在您的模型中,唯一可能因等待您可能已经知道不会成功的事情而受到影响的人是另一端的调用者。通过在 actor 中使用非阻塞回调,它不会因为某件事可能需要很长时间而停止;它已移至下一条消息。如果您决定在失败时重新提交,您也需要小心。您不想淹没接收 Actor 的邮箱。如果您决定重新发送,请将其设置为固定次数。

    如果您需要这些有保证的语义,另一种可能的方法可能是查看 Akka 的 Clustering Model .如果您对下游路由进行集群,并且其中一台服务器出现故障,则所有流量都将路由到仍然运行的节点,直到另一个节点恢复为止。

    关于java - akka 中的正确设计。 - 消息传递,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16812037/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com