gpt4 book ai didi

scala - Akka 流 : How do I model capacity/rate limiting within a system of 2 related streams?

转载 作者:行者123 更新时间:2023-12-01 00:37:00 24 4
gpt4 key购买 nike

假设我有一个披萨 toastr 和一系列我需要烘烤的披萨。我的 toastr 一次只能烤 4 个比萨饼,可以合理地预计一天中至少有 4 个比萨饼在排队,因此 toastr 需要尽可能地满负荷运转。

每次我将披萨放入 toastr 时,我都会在手机上设置一个计时器。一旦发生这种情况,我将披萨从 toastr 中取出,交给任何想要它的人,然后容量就可用了。

我在这里有 2 个来源,一个是要煮熟的比萨饼队列,另一个是在比萨饼煮熟后响起的鸡蛋计时器。系统中还有 2 个接收器,一个是煮熟的比萨的目的地,另一个是发送比萨已放入 toastr 的确认的地方。

我目前非常天真地代表这些,如下:

Source.fromIterator(() => pizzas)
.map(putInOven) // puts in oven and sets a timer
.runWith(Sink.actorRef(confirmationDest, EndSignal))

Source.fromIterator(() => timerAlerts)
.map(removePizza)
.runWith(Sink.actorRef(pizzaDest, EndSignal))

然而,这两个流目前是完全独立的。 eggTimer 运行正常,每次收集披萨时都会移除它。但它无法向先前的流程发出信号,表明容量已经可用。实际上,第一个流程根本没有容量概念,并且会尝试在他们加入线后立即将比萨饼塞入 toastr 。

Akka 概念可用于以这样一种方式组合这些流,即第一个流仅在有容量时从队列中取出比萨,而第二个流可以“提醒”第一个在从队列中取出比萨时容量发生变化 toastr 。

我的初步印象是实现这样的流程图:
   ┌─────────────┐                                                          
┌─>│CapacityAvail│>──┐
│ └─────────────┘ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ ┌─────────────┐ ├──>│ Zip │>─>│ PutInOven │>─>│ Confirm │
│ │ Queue │>──┘ └─────────────┘ └─────────────┘ └─────────────┘
│ └─────────────┘
│ ┌─────────────┐ ┌─────────────┐
│ │ Done │>─────>│ SendPizza │
│ └─────────────┘ └─────────────┘
│ v
│ │
└─────────┘

支持这一点的原则是有固定数量的 CapacityAvailable 对象填充 CapacityAvail来源。它们用进入 Pizza 队列的事件进行压缩,这意味着如果没有可用的事件,则没有比萨处理开始,因为 zip 操作将等待它们。

然后,一旦披萨完成,CapacityAvailable 对象就会被推回池中。

我看到这个实现的主要障碍是我不确定如何为 CapacityAvail 源创建和填充池,我也不确定源是否也可以是接收器。是否有任何源/接收器/流类型适合于此实现?

最佳答案

这个用例通常不能很好地映射到 Akka Streams。在引擎盖下,Akka Stream 是 reactive stream ;来自 documentation :

Akka Streams implementation uses the Reactive Streams interfaces internally to pass data between the different processing stages.



您的披萨示例不适用于流,因为您有一些外部事件与流的接收器一样是需求的广播器。您公开声明“第一个流根本没有容量概念”这一事实意味着您没有将流用于其预期目的。

总是可以使用一些奇怪的编码柔术来笨拙地弯曲流来解决并发问题,但是您可能很难在线下维护此代码。我建议您考虑使用 Futures、Actors 或普通线程作为并发机制。如果您的 toastr 有无限的容量来容纳正在 cooking 的比萨饼,那么开始就不需要流。

我还会重新检查您的整个设计,因为您使用时钟时间的流逝作为需求信号器(即您的“鸡蛋计时器”)。这通常表明流程设计存在缺陷。如果你不能绕过这个要求,那么你应该评估其他设计模式:
  • Periodic Message Scheduling
  • Non Thread Block Timeouts
  • 关于scala - Akka 流 : How do I model capacity/rate limiting within a system of 2 related streams?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40335040/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com