gpt4 book ai didi

java - 如何在Reactor中进行多线程文件处理

转载 作者:行者123 更新时间:2023-11-30 05:25:37 27 4
gpt4 key购买 nike

我正在尝试使用 Reactor 的 Flux 并行处理多个文件。主要工作负载发生在对 flatMap 的调用中,然后对 Flux 进行转换和过滤。

每当我尝试订阅生成的 Flux 时,主线程都会在收到任何值之前退出。

Flux.fromStream(Files.list(Paths.get("directory"))
.flatMap(path -> {
return Flux.create(sink -> {
try (
RandomAccessFile file = new RandomAccessFile(new File(path), "r");
FileChannel fileChannel = file.getChannel()
) {
// Process file into tokens
sink.next(new Token(".."));
} catch (IOException e) {
sink.error(e);
} finally {
sink.complete();
}
}).subscribeOn(Schedulers.boundedElastic());
})
.map(token -> /* Transform tokens */)
.filter(token -> /* Filter tokens*/)
.subscribe(token -> /* Store tokens in list */)

我希望在列表中找到处理管道的输出,但程序立即退出。首先我想知道我是否正确使用 Flux 类,其次我如何等待订阅调用完成?

最佳答案

I'd expect to find the output of the processing pipeline in my list, but the program immediately exits.

您那里的代码在主线程上设置您的 react 链,然后......在主线程上不执行任何其他操作。因此,主线程完成了其工作,并且由于 boundedElastic() 线程是守护线程,因此没有其他线程阻止程序退出,因此它退出。

您可以通过一个更简单的示例看到相同的行为:

Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);

您当然可以调用 newBoundedElastic("name", false) 使其成为非守护进程支持的调度程序,但随后您必须跟踪它并在完成后调用 dispose,因此它实际上只是反转了问题(程序无限运行,直到您处置调度程序。)

快速的“n”肮脏解决方案只是将 Flux 的最后一个元素作为程序中的最后一行进行阻止 - 所以如果我们添加:

f.blockLast();

...然后程序在退出之前等待最后一个元素被发出,我们就得到了我们想要的行为。

对于简单的概念证明,这很好。然而,它在“生产”代码中并不理想。首先,“无阻塞”是响应式(Reactive)代码中的一般规则,因此,如果您有这样的阻塞调用,则很难确定它是否是有意的。如果您添加了其他链并希望它们完成,则必须为每个链添加阻塞调用。这很困惑,而且不可持续。

更好的解决方案是使用 CountDownLatch:

CountDownLatch cdl = new CountDownLatch(1);

Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(500))
.doFinally(s -> cdl.countDown())
.subscribe(System.out::println);

cdl.await();

这具有不显式阻塞的优点,并且还能够同时处理多个发布者(如果将初始值设置为高于 1)。这也往往是我认为通常推荐用于此类的方法的事情 - 所以如果你想要最广泛接受的解决方案,这可能就是它。

但是,对于需要等待多个发布者而不是只有一个发布者的所有示例,我倾向于使用 Phaser - 它的工作方式与 CountdownLatch 类似,但可以动态 register() 以及 deregister()。这意味着您可以创建单个移相器,然后根据需要轻松向其注册多个发布者,而无需更改初始值,例如:

Phaser phaser = new Phaser(1);

Flux.just(1, 2, 3, 4, 5)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);

Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);

phaser.arriveAndAwaitAdvance();

(如果需要,您当然也可以将 onSubscribedoFinally 逻辑包装在单独的方法中。)

关于java - 如何在Reactor中进行多线程文件处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58685089/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com