- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在开发一项使用 Akka Persistence 进行事件溯源的服务。到目前为止,我们已经成功地将事件存储在 Cassandra 日志中。现在我们想利用 Akka Persistence Query 来实现 CQRS。作为第一种方法,我们尝试遵循集群单例模式,让参与者按标签流式传输存储的事件。现在我们有一个相当简单的 actor 被包装为一个单例:
public class EventProcessor extends AbstractLoggingActor {
private static final Logger LOG = LoggerFactory.getLogger(EventProcessor.class);
private final CassandraReadJournal journal;
public EventProcessor(ActorSystem system) {
journal =
PersistenceQuery.get(system)
.getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier());
journal
.eventsByTag(OnBoardingBehavior.ENTITY_TYPE_KEY.name(), Offset.noOffset())
.map(EventEnvelope::persistenceId)
.to(Sink.foreach(this::logMessage))
.run(system);
}
private void logMessage(String id) {
LOG.info(String.format("########## Received persistenceId %s", id));
}
@Override
public Receive createReceive() {
return null;
}
}
这就是我们将 actor 包裹在监护人内部的方式:
akka.actor.ActorSystem classicSystem = context.getSystem().classicSystem();
ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create(classicSystem);
Props clusterSingletonManagerProps =
ClusterSingletonManager.props(
Props.create(EventProcessor.class, classicSystem),
PoisonPill.getInstance(),
settings);
classicSystem.actorOf(clusterSingletonManagerProps, "clusterSingletonManager");
当我们运行服务时,我们收到以下异常(在 actorOf 行上):
java.lang.UnsupportedOperationException: cannot create top-level actor [clusterSingletonManager] from the outside on ActorSystem with custom user guardian
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:900)
at r.service.onboarding.actor.Guardian.initializeEventProcessor(Guardian.java:95)
at r.service.onboarding.actor.Guardian.<init>(Guardian.java:56)
at r.service.onboarding.actor.Guardian.lambda$create$745d95f3$1(Guardian.java:66)
at akka.actor.typed.javadsl.Behaviors$.$anonfun$setup$1(Behaviors.scala:47)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at a.a.t.i.InterceptorImpl$$anon$1.start(InterceptorImpl.scala:48)
at akka.actor.typed.BehaviorInterceptor.aroundStart(BehaviorInterceptor.scala:55)
at a.a.typed.internal.InterceptorImpl.preStart(InterceptorImpl.scala:71)
at a.a.typed.internal.InterceptorImpl$.$anonfun$apply$1(InterceptorImpl.scala:28)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:275)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at a.a.t.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
at a.a.t.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
... 5 frames excluded
我想提一下,我们能够从同一个守护者中生成其他 Actor ,例如:
ActorRef<Command> actorRef =
context.spawn(OnBoardingBehavior.create(uuid), "OnBoardingBehavior-" + uuid);
我对 Akka 完全陌生,因此我们将不胜感激!
最佳答案
终于明白了。显然,当在类型化系统中运行时,我无法将旧的非类型化方法与 ClusterSingletonManager.props 一起使用。我在这里找到了类型系统上的正确方法: https://doc.akka.io/docs/akka/current/typed/cluster-singleton.html
所以我的 Actor 现在是一个 AbstractBehavior:
public class EventProcessor extends AbstractBehavior<Void>
这就是我将其包装为单例的方式:
ClusterSingleton singleton = ClusterSingleton.get(context.getSystem());
singleton.init(
SingletonActor.of(EventProcessor.create(identityRequestAdapter), "eventProcessor"));
关于java - 无法使用自定义用户监护人从外部在 ActorSystem 上创建顶级 Actor [clusterSingletonManager],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60973016/
我有一个数据框: title | cast ------------------------------ movie1 | cast1,cast2,cast3 movie2
据我所知,Actor 模型是一种关于并发的理论。 Erlang 和 Scala 都实现了这个理论模型,但它们的实现都不完全符合 Actor 模型。 从计算机科学家的角度来看,Erlang、Scala
我是 akka 流的新手。我将 kafka 用作源(使用 ReactiveKafka 库)并通过流对数据进行一些处理并使用订阅者 (EsHandler) 作为接收器。 现在我需要处理错误并通过错误处理
考虑以下代码示例(版本 1)。此处父 actor (ActorA) 向子 actor (ActorB) 发送消息,然后停止自身。由于父 actor 的自停止,在高负载下,子 actor 甚至在从邮箱中
我有以下 Scala 代码: package dummy import javax.servlet.http.{HttpServlet, HttpServletRequest => H
如何描述“数据流编程”和“ Actor 模型”之间的区别?据我了解,它们并非无关,但又不相同。 DF 是否是一个更广泛的概念,其要点是与控制流模型的区别,而 Actor 模型是更详细的和理论上有根基的
GPars 中的 Actor 有自己的消息队列(邮箱)。假设一个参与者有 15 条待处理的消息,然后系统突然宕机(比如由于电源故障)。这 15 条消息会发生什么。当系统再次启动并运行时,消息队列会自动
我刚刚开始使用 AKKA,并且有一个关于非参与者代码如何与参与者代码对话的基本问题。 非参与者代码如何调用参与者并获得响应?我试过使用 Patterns.ask 从非 Actor 调用 Actor ,
这个问题在这里已经有了答案: 关闭 9 年前。 Possible Duplicate: Casting vs using the ‘as’ keyword in the CLR 我最近了解到一种不同
我在设置边界矩形位置时遇到麻烦,这就是我问这个的原因。当我设置 Actor 类型的对象的边界矩形位置时,看来我传递给 [setBounds(x,y,with,height)] 2 的坐标相对于 Act
我是 Actor 模型的新手。任何人都可以解释 Actor 模型中 Actor 的生命周期吗?我一直在文档中寻找答案,但找不到任何令人满意的内容。 我对 Actor 完成后做什么很感兴趣onRecei
例如,我有两个 Actor ——一个父 Actor 和一个子 Actor 。当 parent 收到一条消息时,它会产生与消息中指定的一样多的子actor。如何测试此功能?有没有办法模拟上下文或其他方法
我目前在 Futures 编程,我对 Actor 很好奇。我想听听有经验的声音: Actor 相对于 future 有什么优势? 我什么时候应该使用一种而不是另一种? 据我所知, Actor 持有状态
Actor 模型框架(例如 Orleans )和复杂事件处理 (CEP)(例如 Apache Storm )之间有什么区别? 其中每种方法都适用(或者一种方法比另一种方法更适用)的使用示例肯定会有所帮
你们中的任何人都可以帮助我理解传入的基本消息吗scala 使用 Actor 模型? 我正在尝试编写一个包含 3 个 Actor 的简单程序。 Actor “BossActor”创建了 2 个 Acto
Akka 1.1.3 文档指出“become 方法对于许多不同的事情都很有用,但它的一个特别好的例子是它用于实现有限状态机 (FSM) 的示例”。在运行时热交换 Akka actor 的实现还有哪些其
我正在尝试使用参与者实现消息处理管道。管道的步骤包括读取、过滤、扩充以及最后存储到数据库中等功能。类似于此:http://sujitpal.blogspot.nl/2013/12/akka-conte
可以使用 actor.send() 或 actor.offer() 向 Actor 发送消息一旦我的 Actor 从其 channel 收到消息,我想返回响应。我该怎么做呢?我没有看到任何明显的内置方
你们中的任何人都可以帮助我理解传入的基本消息吗scala 使用 Actor 模型? 我正在尝试编写一个包含 3 个 Actor 的简单程序。 Actor “BossActor”创建了 2 个 Acto
我目前正在尝试保存特殊 Actors所以如果加载旧 map ,我可以再次将它们放在 map 上。因此我想将它们放入 HashMap> monsterAtMap 中并从那里删除它们Stages .所以我
我是一名优秀的程序员,十分优秀!