gpt4 book ai didi

java - 使用 Project Reactor 递归 react 流

转载 作者:搜寻专家 更新时间:2023-10-31 19:36:45 25 4
gpt4 key购买 nike

我的目标是使用 react 流和 Project Reactor 遍历目录图并记录它们的所有名称。

由于文件系统是远程的,对它的调用是阻塞的。所以我想将阻塞调用的执行与我的非阻塞异步代码的其余部分分开。我正在使用以下建议执行此操作:http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking .

这是我需要遍历的结构:

/
/jupiter
/phase-1
/sub-phase-1
/sub-phase-2
/sub-phase-3
/phase-2
/phase-3
/phase-4

/earth
/phase-1
/sub-phase-1
/sub-phase-2
/sub-phase-3
/phase-2
/phase-3
/phase-4

/mars
/phase-1
/sub-phase-1
/sub-phase-2
/sub-phase-3
/phase-2
/phase-3
/phase-4

这是我到目前为止想出的代码:

public class ReactorEngine {

private static Logger log = LoggerFactory.getLogger(ReactorEngine.class);

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);

Server server = new Server();

Flux.fromIterable(server.getChildren("/"))
.flatMap(parent -> Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic()))
.publishOn(Schedulers.elastic())
.doOnTerminate(latch::countDown)
.subscribe(ReactorEngine::handleResponse);

latch.await();
}

private static void handleResponse(List<String> value) {
log.info("Received: " + value);
}

}

public class Server {

public List<String> getChildren(final String path) {
// Generate some I/O
...
}
}

所以我从顶级目录开始,异步请求第一级向下(他们的 child )。一切顺利,这是输出:

15:35:05.902 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:35:07.062 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:35:07.140 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:35:07.140 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:35:07.140 [elastic-5] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:35:08.140 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/earth/phase-1/, /earth/phase-2/, /earth/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/jupiter/phase-1/, /jupiter/phase-2/, /jupiter/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/mars/phase-1/, /mars/phase-2/, /mars/phase-3/]

现在我的问题是如何将作为结果出现的元素放回流中,以便引擎将递归调用 server.getChildren(parent) 直到遍历整个目录图?

实际上递归是要走的路,还是有更好的“响应式(Reactive)”方式来做到这一点,也许是通过运算符?

谢谢!

编辑

Simon 建议的 expand(Function) 运算符非常适合遍历图形。我已将代码更改为:

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);

Server server = new Server();

Flux.fromIterable(server.getChildren("/"))
.expand(p -> Flux.fromIterable(server.getChildren(p)).subscribeOn(Schedulers.elastic()))
.publishOn(Schedulers.elastic())
.doOnTerminate(latch::countDown)
.subscribe(ReactorEngine::handleResponse);

latch.await();
}

但是,我失去了调用服务器的阻塞 server.getChildren(String) 方法的异步方式。正如您在这些日志中看到的,每个子目录都是同步获取的,每秒一次:

15:57:55.398 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:57:56.558 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:57:56.593 [main] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:57:56.593 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/
15:57:57.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:57:57.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/
15:57:58.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:57:58.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/
15:57:59.599 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-1/...
15:57:59.599 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-1/
15:58:00.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-2/...
15:58:00.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-2/
15:58:01.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-3/...
15:58:01.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-3/
15:58:02.601 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-4/...
15:58:02.601 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-4/
15:58:03.602 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-1/...
15:58:03.603 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-1/
15:58:04.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-2/...
15:58:04.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-2/
15:58:05.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-3/...
15:58:05.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-3/
15:58:06.605 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-4/...
15:58:06.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-4/
15:58:07.605 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-1/...
15:58:07.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-1/
15:58:08.606 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-2/...
15:58:08.606 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-2/
15:58:09.607 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-3/...
15:58:09.607 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-3/
15:58:10.608 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-4/...
15:58:10.608 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-4/

能否请您提供有关如何将对 Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic()) 的调用返回到方案?没有我可以调用的 Flux.fromCallable(),这也许是有充分理由的。

但是由于我对响应式(Reactive)编程和 Project Reactor 的概念真的很陌生,所以我很难理解这种执行异步的方式。

谢谢。

最佳答案

有一个运算符 :) 看看 expandexpandDeep

关于java - 使用 Project Reactor 递归 react 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47847132/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com