gpt4 book ai didi

java - Akka持久化自定义java插件

转载 作者:搜寻专家 更新时间:2023-11-01 03:22:43 26 4
gpt4 key购买 nike

我目前正在为 Akka 编写自己的插件SyncWriteJournal实现与 HSQLDB 连接的 API .

问题是我不明白 doAsyncReplayMessages 方法的要求.它声明它需要返回一个 future 并且所有消息都应该由 replayCallback 调用.

假设我有一个返回消息列表的查询:List<Message> messages .任何人都可以提供一个关于如何使用 replayCallback 的最小示例(带解释)吗? , 和 Future使用该列表正确实现该方法?怎么会replayCallbackFuture一起工作,方法应该返回什么doAsyncReplayMessages

谢谢!

-编辑-

在一些评论的帮助下,我已经完成了一个不完整的实现,但包含了所提出的想法:

public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
final Procedure<PersistentRepr> replayCallback) {
final ExecutionContext ec = context().system().dispatcher();

final Future<Void> future = Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
final List<Message> messages = getMessages();
for (int i = 0; i < feedbackList.size(); i++) {
replayCallback.apply(
new PersistentImpl(messages.get(i), i, persistenceId, false, null, null));
}
return null;
}
}, ec);

return future;
}

如您所见,它遗漏了一些我仍然遗漏的关键概念。 PersistentImpl 需要一个参数 Seq<String> confirm这是null仍然。也许更重要的是我回来了 null因为 future 期待Void作为返回类型,我不确定如何实现它。它目前抛出一个 NPE:

[ERROR] [08/28/2014 12:31:19.582] [akkaSystem-akka.actor.default-dispatcher-7] [akka://akkaSystem/system/journal] null
java.lang.NullPointerException
at akka.persistence.journal.japi.AsyncRecovery.asyncReadHighestSequenceNr(AsyncRecovery.scala:26)
at akka.persistence.journal.SyncWriteJournal$$anonfun$receive$1.applyOrElse(SyncWriteJournal.scala:53)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.persistence.journal.japi.SyncWriteJournal.aroundReceive(SyncWriteJournal.scala:16)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

最佳答案

您可以简单地将阻塞操作包装在 Future 中,例如:Future { fetchStuff() }

可以引用dnvriend/akka-persistence-jdbc: JdbcSyncWriteJournal同步日志的全面实现。

关于java - Akka持久化自定义java插件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25500951/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com