gpt4 book ai didi

project-reactor - 如何并行处理 Flux 事件?

转载 作者:行者123 更新时间:2023-12-04 02:45:31 25 4
gpt4 key购买 nike

我有需要丰富的传入事件流,然后在它们到达时并行处理。

我在想 Project Reactor 是为这项工作量身定做的,但在我的测试中,所有处理似乎都是按顺序完成的。

这是一些测试代码:

ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
.map(i-> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.take(3)
// .subscribeOn(Schedulers.elastic());
// .subscribeOn(Schedulers.newParallel("test"));
// .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()),
System.out::println,
()-> System.out.println("Done"));
System.out.println("DONE AND DONE");

我尝试取消注释每条注释行,但是在每种情况下,输出都表明同一线程用于处理所有事件
Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done

(唯一的区别是没有调度器,它们在订阅线程上运行,而对于任何执行器,它们都在同一个线程中运行,而不是订阅线程。)

我错过了什么?

仅供引用,有一种“ sleep ”方法:
public static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
System.out.println("Exiting");
}
}

最佳答案

并行处理项目的一种方法是使用 .parallel/.runOn

flux
.parallel(10)
.runOn(scheduler)
//
// Work to be performed in parallel goes here. (e.g. .map, .flatMap, etc)
//
// Then, if/when you're ready to go back to sequential, call .sequential()
.sequential()

阻塞操作(例如阻塞 IO 或 Thread.sleep )将阻塞执行它们的线程。响应式(Reactive)流不能神奇地将阻塞方法变成非阻塞方法。因此,您需要确保在 Scheduler 上运行阻塞方法。适用于阻塞操作(例如 Schedulers.boundedElastic() )。
在上面的例子中,既然你知道你正在调用一个阻塞操作,你可以使用 .runOn(Schedulers.boundedElastic()) .
根据用例,您还可以使用异步运算符,如 .flatMap结合 .subscribeOn.publishOn将特定的阻塞操作委托(delegate)给另一个 Scheduler ,如 described in the project reactor docs .例如:
flux
.flatMap(i -> Mono.fromCallable(() -> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.subscribeOn(Schedulers.boundedElastic()))
事实上, .flatMap还有一个重载变体,它采用 concurrency参数,您可以在其中限制飞行中内部序列的最大数量。这可以用来代替 .parallel在某些用例中。它通常不适用于 Flux.interval不过,自从 Flux.interval不支持补充慢于滴答声的下游请求。

关于project-reactor - 如何并行处理 Flux 事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55892246/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com