gpt4 book ai didi

java - Project Reactor - 有状态地组合两个发布者并发出结果

转载 作者:行者123 更新时间:2023-12-01 06:01:49 25 4
gpt4 key购买 nike

我想用 Reactor 设计一个处理管道,其功能如下。

我有两个输入发布者 orderEntries (冷)和 hotBroadcasts (热)。我想将 hotBroadcasts 发出的项目聚合到(可变的)内存数据结构中 - 比如说 HashMap - 以及来自 orderEntries 的每个项目我想从该 Map 中选择一个相应的元素,创建结果项并将其推送到下游订阅者。

来自 hotBroadcasts 的事件以任意顺序出现,这就是为什么我想将它们存储在内存中以便于检索。

从概念上讲,它应该像这样工作:

       orderEntries                      hotBroadcasts
| |
| |
| |
\ /
----------------> <----------------
(aggregate events from hotBroadcasts)
|
|
resulting item
|
|
\/
downstream subcriber

到目前为止,我设法使用 ReplayProcessor 绘制了一个解决方案,如 Kotlin 伪示例所示:

val orderEntries = Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
val hotBroadcasts = ReplayProcessor.create<String>(1000, false)

orderEntries.concatMap { entryId ->
// problematic filter - skims through all that ReplayProcessor has cached
hotBroadcasts.filter { broadcastId ->
"Broadcast:$entryId" == broadcastId
}
.take(1)
.map { "EntryId: $entryId, BroadcastId: $it" }
}.subscribe { LOG.info(it) }

Flux.interval(Duration.of(200, ChronoUnit.MILLIS))
.concatMap { Flux.just(it, it - 100000) }
.map { "Broadcast:$it" }
.subscribe {
hotBroadcasts.onNext(it)
}

这里的问题是,hotBroadcast 的过滤会浏览 orderEntries 中每个项目的所有项目。因此我想到将它们存储在 HashMap 中。

有人能指出我正确的方向吗?

最佳答案

可以聚合来自两个不同发布者的消息的对象是一个带有 2 个参数的异步过程调用。可以使用 io.reactivex.Single.zip(SingleSource arg1, SingleSource arg2, BiFunction func) 在 rxjava 中构造这样的调用,或在纯 Java 中使用 java.util.concurrent.CompletableFuture.thenCombine(CompletionStage arg2, BiFunction func) .

您需要一个特殊的 HashMap 来保存异步过程调用。当第一次使用给定标签访问此 HashMap 时,应该自动创建调用。

因此一位 Publicher 调用

asyncProc=callMap.get(label); // asyncProc is created and stored with the label as a key
asyncProc.arg1.complete(value);

以及其他 Publicher 调用

asyncProc=callMap.get(label); // previously created instance returned
asyncProc.arg2.complete(value);

两个发布者都提供了参数后,就会执行异步过程。

关于java - Project Reactor - 有状态地组合两个发布者并发出结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56367429/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com