gpt4 book ai didi

java - 无法使用自定义用户监护人从外部在 ActorSystem 上创建顶级 Actor [clusterSingletonManager]

转载 作者:行者123 更新时间:2023-12-02 08:47:47 24 4
gpt4 key购买 nike

我正在开发一项使用 Akka Persistence 进行事件溯源的服务。到目前为止,我们已经成功地将事件存储在 Cassandra 日志中。现在我们想利用 Akka Persistence Query 来实现 CQRS。作为第一种方法,我们尝试遵循集群单例模式,让参与者按标签流式传输存储的事件。现在我们有一个相当简单的 actor 被包装为一个单例:

public class EventProcessor extends AbstractLoggingActor {
private static final Logger LOG = LoggerFactory.getLogger(EventProcessor.class);

private final CassandraReadJournal journal;

public EventProcessor(ActorSystem system) {
journal =
PersistenceQuery.get(system)
.getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier());

journal
.eventsByTag(OnBoardingBehavior.ENTITY_TYPE_KEY.name(), Offset.noOffset())
.map(EventEnvelope::persistenceId)
.to(Sink.foreach(this::logMessage))
.run(system);
}

private void logMessage(String id) {
LOG.info(String.format("########## Received persistenceId %s", id));
}

@Override
public Receive createReceive() {
return null;
}
}

这就是我们将 actor 包裹在监护人内部的方式:

    akka.actor.ActorSystem classicSystem = context.getSystem().classicSystem();

ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create(classicSystem);

Props clusterSingletonManagerProps =
ClusterSingletonManager.props(
Props.create(EventProcessor.class, classicSystem),
PoisonPill.getInstance(),
settings);

classicSystem.actorOf(clusterSingletonManagerProps, "clusterSingletonManager");

当我们运行服务时,我们收到以下异常(在 actorOf 行上):

java.lang.UnsupportedOperationException: cannot create top-level actor [clusterSingletonManager] from the outside on ActorSystem with custom user guardian
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:900)
at r.service.onboarding.actor.Guardian.initializeEventProcessor(Guardian.java:95)
at r.service.onboarding.actor.Guardian.<init>(Guardian.java:56)
at r.service.onboarding.actor.Guardian.lambda$create$745d95f3$1(Guardian.java:66)
at akka.actor.typed.javadsl.Behaviors$.$anonfun$setup$1(Behaviors.scala:47)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at a.a.t.i.InterceptorImpl$$anon$1.start(InterceptorImpl.scala:48)
at akka.actor.typed.BehaviorInterceptor.aroundStart(BehaviorInterceptor.scala:55)
at a.a.typed.internal.InterceptorImpl.preStart(InterceptorImpl.scala:71)
at a.a.typed.internal.InterceptorImpl$.$anonfun$apply$1(InterceptorImpl.scala:28)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:275)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at a.a.t.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
at a.a.t.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
... 5 frames excluded

我想提一下,我们能够从同一个守护者中生成其他 Actor ,例如:

ActorRef<Command> actorRef =
context.spawn(OnBoardingBehavior.create(uuid), "OnBoardingBehavior-" + uuid);

我对 Akka 完全陌生,因此我们将不胜感激!

最佳答案

终于明白了。显然,当在类型化系统中运行时,我无法将旧的非类型化方法与 ClusterSingletonManager.props 一起使用。我在这里找到了类型系统上的正确方法: https://doc.akka.io/docs/akka/current/typed/cluster-singleton.html

所以我的 Actor 现在是一个 AbstractBehavior:

public class EventProcessor extends AbstractBehavior<Void>

这就是我将其包装为单例的方式:

    ClusterSingleton singleton = ClusterSingleton.get(context.getSystem());

singleton.init(
SingletonActor.of(EventProcessor.create(identityRequestAdapter), "eventProcessor"));

关于java - 无法使用自定义用户监护人从外部在 ActorSystem 上创建顶级 Actor [clusterSingletonManager],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60973016/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com