- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我正在尝试在 Actor 的 receive
方法中进行两次外部调用(对 Redis 数据库)。两个调用都返回一个 Future
,我需要第二个中第一个 Future
的结果。我将这两个调用都包装在 Redis 事务中,以避免其他人在我读取数据库时修改数据库中的值。
actor 的内部状态根据第二个 Future
的值进行更新。
这是我当前代码的样子,但我是不正确的,因为我正在 Future.onComplete
回调中更新 actor 的内部状态。
我不能使用 PipeTo
模式,因为我需要两个 Future 都必须在事务中。如果我对第一个 Future
使用 Await
,那么我的接收方法将阻塞。知道如何解决这个问题吗?
我的第二个问题与我如何使用Future
有关。下面这种 Future
的用法是否正确?一般来说,有没有更好的方法来处理多个 future ?想象一下,如果有 3 或 4 个 Future,每个 Future 都取决于前一个。
import akka.actor.{Props, ActorLogging, Actor}
import akka.util.ByteString
import redis.RedisClient
import scala.concurrent.Future
import scala.util.{Failure, Success}
object GetSubscriptionsDemo extends App {
val akkaSystem = akka.actor.ActorSystem("redis-demo")
val actor = akkaSystem.actorOf(Props(new SimpleRedisActor("localhost", "dummyzset")), name = "simpleactor")
actor ! UpdateState
}
case object UpdateState
class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {
//mutable state that is updated on a periodic basis
var mutableState: Set[String] = Set.empty
//required by Future
implicit val ctx = context dispatcher
var rClient = RedisClient(ip)(context.system)
def receive = {
case UpdateState => {
log.info("Start of UpdateState ...")
val tran = rClient.transaction()
val zf: Future[Long] = tran.zcard(key) //FIRST Future
zf.onComplete {
case Success(z) => {
//SECOND Future, depends on result of FIRST Future
val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z)
rf.onComplete {
case Success(x) => {
//convert ByteString to UTF8 String
val v = x.map(_.utf8String)
log.info(s"Updating state with $v ")
//update actor's internal state inside callback for a Future
//IS THIS CORRECT ?
mutableState ++ v
}
case Failure(e) => {
log.warning("ZRANGE future failed ...", e)
}
}
}
case Failure(f) => log.warning("ZCARD future failed ...", f)
}
tran.exec()
}
}
}
编译但是当我运行时它被击中了。
2014-08-07 INFO [redis-demo-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started
2014-08-07 04:38:35.106UTC INFO [redis-demo-akka.actor.default-dispatcher-3] e.c.s.e.a.g.SimpleRedisActor - Start of UpdateState ...
2014-08-07 04:38:35.134UTC INFO [redis-demo-akka.actor.default-dispatcher-8] r.a.RedisClientActor - Connect to localhost/127.0.0.1:6379
2014-08-07 04:38:35.172UTC INFO [redis-demo-akka.actor.default-dispatcher-4] r.a.RedisClientActor - Connected to localhost/127.0.0.1:6379
更新 1
为了使用 pipeTo
模式,我需要访问我所在的 actor 中的 tran
和 FIRST Future (zf
)我将 Future
传递给因为第二个 Future
取决于 FIRST 的值 (z
)。
//SECOND Future, depends on result of FIRST Future
val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z)
最佳答案
在不太了解您正在使用的 redis 客户端的情况下,我可以提供一个替代解决方案,该解决方案应该更简洁并且不会因关闭可变状态而出现问题。这个想法是使用一种主/ worker 的情况,其中主人(SimpleRedisActor)接收到完成工作的请求,然后委托(delegate)给执行工作并响应更新状态的 worker 。该解决方案看起来像这样:
object SimpleRedisActor{
case object UpdateState
def props(ip:String, key:String) = Props(classOf[SimpleRedisActor], ip, key)
}
class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {
import SimpleRedisActor._
import SimpleRedisWorker._
//mutable state that is updated on a periodic basis
var mutableState: Set[String] = Set.empty
val rClient = RedisClient(ip)(context.system)
def receive = {
case UpdateState =>
log.info("Start of UpdateState ...")
val worker = context.actorOf(SimpleRedisWorker.props)
worker ! DoWork(rClient, key)
case WorkResult(result) =>
mutableState ++ result
case FailedWorkResult(ex) =>
log.error("Worker got failed work result", ex)
}
}
object SimpleRedisWorker{
case class DoWork(client:RedisClient, key:String)
case class WorkResult(result:Seq[String])
case class FailedWorkResult(ex:Throwable)
def props = Props[SimpleRedisWorker]
}
class SimpleRedisWorker extends Actor{
import SimpleRedisWorker._
import akka.pattern.pipe
import context._
def receive = {
case DoWork(client, key) =>
val trans = client.transaction()
trans.zcard(key) pipeTo self
become(waitingForZCard(sender, trans, key) orElse failureHandler(sender, trans))
}
def waitingForZCard(orig:ActorRef, trans:RedisTransaction, key:String):Receive = {
case l:Long =>
trans.zrange(key, l -1, l) pipeTo self
become(waitingForZRange(orig, trans) orElse failureHandler(orig, trans))
}
def waitingForZRange(orig:ActorRef, trans:RedisTransaction):Receive = {
case s:Seq[ByteString] =>
orig ! WorkResult(s.map(_.utf8String))
finishAndStop(trans)
}
def failureHandler(orig:ActorRef, trans:RedisTransaction):Receive = {
case Status.Failure(ex) =>
orig ! FailedWorkResult(ex)
finishAndStop(trans)
}
def finishAndStop(trans:RedisTransaction) {
trans.exec()
context stop self
}
}
工作人员启动事务,然后调用 redis 并最终在停止自身之前完成事务。当它调用 redis 时,它会获取 future 并通过管道返回给自己以继续处理,在两者之间更改 receive 方法作为显示其状态进展的机制。在这样的模型中(我想这有点类似于错误内核模式),主人拥有并保护状态,将“有风险”的工作委托(delegate)给可以弄清楚状态应该发生什么变化的 child ,但零钱仍归主人所有。
再说一次,我不知道您正在使用的 Redis 客户端的功能,也不知道它是否足够安全,甚至可以执行此类操作,但这不是重点。关键是展示一个更安全的结构来做这样的事情,涉及需要安全更改的 future 和状态。
关于scala - Actor 的接收方法中的多个 Future 调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25174504/
我有一个存储结构向量的应用程序。这些结构保存有关系统上每个 GPU 的信息,如内存和 giga-flop/s。每个系统上有不同数量的 GPU。 我有一个程序可以同时在多台机器上运行,我需要收集这些数据
我很好奇 MPI 中缺少此功能: MPI_Isendrecv( ... ); 即,非阻塞发送和接收,谁能告诉我其省略背后的基本原理? 最佳答案 我的看法是 MPI_SENDRECV存在是为了方便那些想
当我用以下方法监听TCP或UDP套接字时 ssize_t recv(int sockfd, void *buf, size_t len, int flags); 或者 ssize_t recvfrom
SUM:如何在 azure 事件网格中推迟事件触发或事件接收? 我设计的系统需要对低频对象状态(创建、启动、检查长时间启动状态、结束)使用react。它看起来像是事件处理的候选者。我想用azure函数
我正在 MPI 中实现一个程序,其中主进程(等级 = 0)应该能够接收来自其他进程的请求,这些进程要求只有根才知道的变量值。如果我按等级 0 进行 MPI_Recv(...),我必须指定向根发送请求的
我正在学习DX12,并在此过程中学习“旧版Win32”。 我在退出主循环时遇到问题,这似乎与我没有收到WM_CLOSE消息有关。 在C++,Windows 10控制台应用程序中。 #include
SUM:如何在 azure 事件网格中推迟事件触发或事件接收? 我设计的系统需要对低频对象状态(创建、启动、检查长时间启动状态、结束)使用react。它看起来像是事件处理的候选者。我想用azure函数
我想编写方法来通过号码发送短信并使用编辑文本字段中的文本。发送消息后,我想收到一些声音或其他东西来提醒我收到短信。我怎样才能做到这一点?先感谢您,狼。 最佳答案 这个网站似乎对两者都有很好的描述:ht
所以我正在用 Java 编写一个程序,在 DatagramSocket 和 DatagramPacket 的帮助下发送和接收数据。问题是,在我发送数据/接收数据之间的某个时间 - 我发送数据的程序中的
我是 Android 编程新手,我正在用 Java 编写一个应用程序,该应用程序可以打开相机拍照并保存。我通过 Intents 做到了,但看不到 onActivityResult 正在运行。 我已经在
我有一个套接字服务器和一个套接字客户端。客户端只有一个套接字。我必须使用线程在客户端发送/接收数据。 static int sock = -1; static std::mutex mutex; vo
我正在尝试使用 c 中的套接字实现 TCP 服务器/客户端。我以这样的方式编写程序,即我们在客户端发送的任何内容都逐行显示在服务器中,直到键入退出。该程序可以运行,但数据最后一起显示在服务器中。有人可
我正在使用微 Controller 与 SIM808 模块通信,我想发送和接收 AT 命令。 现在的问题是,对于某些命令,我只收到了我应该收到的答案的一部分,但对于其他一些命令,我收到了我应该
我用c设计了一个消息传递接口(interface),用于在我的系统中运行的不同进程之间提供通信。该接口(interface)为此目的创建 10-12 个线程,并使用 TCP 套接字提供通信。 它工作正
我需要澄清一下在套接字程序中使用多个发送/接收。我的客户端程序如下所示(使用 TCP SOCK_STREAM)。 send(sockfd,"Messgfromlient",15,0);
我正在构建一个真正的基本代理服务器到我现有的HTTP服务器中。将传入连接添加到队列中,并将信号发送到另一个等待线程队列中的一个线程。此线程从队列中获取传入连接并对其进行处理。 问题是代理程序真的很慢。
我正在使用 $routeProvider 设置一条类似 的路线 when('/grab/:param1/:param2', { controller: 'someController',
我在欧洲有通过 HLS 流式传输的商业流媒体服务器。http://europe.server/stream1/index.m3u8现在我在美国的客户由于距离而遇到一些网络问题。 所以我在美国部署了新服
我有一个长期运行的 celery 任务,该任务遍历一系列项目并执行一些操作。 任务应该以某种方式报告当前正在处理的项目,以便最终用户知道任务的进度。 目前,我的django应用程序和celery一起坐
我需要将音频文件从浏览器发送到 python Controller 。我是这样做的: var xmlHttp = new XMLHttpRequest(); xmlHttp.open( "POST",
我是一名优秀的程序员,十分优秀!