gpt4 book ai didi

scala - 在 Akka 中,如何将响应从下游参与者路由到正确的上游?

转载 作者:可可西里 更新时间:2023-11-01 11:15:20 25 4
gpt4 key购买 nike

新手问题:我试图让一个缓存管理器位于缓存的多个用户(= 上游)和 Redis 客户端(下游)之间,所以:

Client A  -----> |                
| Cache Manager <=====> Redis Connection --(tcp)--
Client B -----> |

想法是重新使用与 Redis 的单个连接。我可以异步发送 SET 命令,当 redis 客户端 actor 返回响应时,我如何知道将响应中继到哪个客户端?到目前为止,这是我的接收方法:

def receive: PartialFunction[Any, Unit] = {

case Store(key: ByteString, payload: ByteString, metadata: ByeString) => {
// WIP: yes, I could batch these two here
brandoClient ! Request(REDIS_SET, metadata_key(key), metadata)
brandoClient ! Request(REDIS_SET, key, payload)
}

case Some(Ok) => {
???
}
...
}

我能做到:

case Store(key: ByteString, payload: ByteString) => {
val future = brandoClient ? Request(REDIS_SET, key, payload)
sender() ! Await.result(future, request_timeout.duration)
}

但是,这会使缓存管理器阻塞。

我能想到的另一种方法是创建多个引用同一个 Redis 客户端 ActorRef 的缓存管理器 actor,这样我就可以通过这种方式对响应进行重复数据删除。像这样:

Client A  -----> Cache Manager A -----> |               
| Redis Connection --(tcp)--
Client B -----> Cache Manager B -----> |

这是唯一的方法吗?

谢谢,

最佳答案

您可以 pipe 而不是阻塞Future 的结果给发送者。以下示例假定您使用的是 Brando Redis客户端:

import akka.actor.Actor
import akka.pattern.{ ask, pipe }
import akka.util.{ ByteString, Timeout }
import brando.{ Request, StatusReply }
import scala.concurrent.duration._

case class Store(key: ByteString, payload: ByteString)

class CacheManager extends Actor {
import context.dispatcher
implicit val timeout = Timeout(5 seconds)

val brandoClient: ActorRef = ???

def receive = {
case Store(key, payload) =>
(brandoClient ? Request("SET", key, payload))
.mapTo[Some[StatusReply]]
.pipeTo(sender())

// case ...
}
}

关于scala - 在 Akka 中,如何将响应从下游参与者路由到正确的上游?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50437074/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com