gpt4 book ai didi

java - 项目 react 堆通量的并发处理

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:06:28 26 4
gpt4 key购买 nike

我对项目 react 堆或响应式(Reactive)编程非常陌生,所以我可能做错了什么。我正在努力构建执行以下操作的流程:

给定一个实体类:

Entity {
private Map<String, String> items;
public Map<String, String> getItems() {
return items;
}
}
  1. 从数据库读取实体(ListenableFuture<Entity> readEntity())
  2. 对每个项目执行一些并行异步处理 ( boolean processItem(Map.Entry<String, String> item) )
  3. 当所有完成调用 doneProcessing ( void doneProcessing(boolean b) )

目前我的代码是:

handler = this;
Mono
.fromFuture(readEntity())
.doOnError(t -> {
notifyError(“some err-msg” , t);
return;
})
.doOnSuccess(e -> log.info("Got the Entity: " + e))
.flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
.all(handler::processItem)
.consume(handler::doneProcessing);

这东西能用,但是 handler::processItem调用不会在所有项目上同时运行。我尝试使用 dispatchOnpublishOnioasync SchedulerGroup并具有各种参数,但调用仍然在一个线程上连续运行。我做错了什么?

除此之外,我确信总体上可以改进上述内容,因此我们将不胜感激任何建议。

谢谢

最佳答案

您需要另一个 flatMap 来为每个单独的 map 元素 fork 和合并计算:

Mono.fromFuture(readEntity())
.flatMap(v -> Flux.fromIterable(v.getItems().entrySet()))
.flatMap(v -> Flux.just(v)
.publishOn(SchedulerGroup.io())
.doOnNext(handler::processItem))
.consume(handler::doneProcessing);

关于java - 项目 react 堆通量的并发处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36126999/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com