gpt4 book ai didi

java - react 堆项目 : Handling fast and slow publishers

转载 作者:行者123 更新时间:2023-11-30 05:32:30 26 4
gpt4 key购买 nike

考虑以下代码:

AtomicInteger counter1 = new AtomicInteger();
AtomicInteger counter2 = new AtomicInteger();

Flux<Object> source = Flux.generate(emitter -> {
emitter.next("item");
});

Executor executor1 = Executors.newFixedThreadPool(32);
Executor executor2 = Executors.newFixedThreadPool(32);

Flux<String> flux1 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(1);
return "1_" + counter1.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));

Flux<String> flux2 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(100);
return "2_" + counter2.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor2)));

Flux.merge(flux1, flux2).subscribe(System.out::println);

您可以看到一个发布商的速度比另一发布商快 100 倍。尽管如此,当运行代码时,似乎所有数据都被打印出来,但是两个发布者之间存在巨大差距,这增加了加类时间。

值得注意的是,当更改数字时,executer2 将有 1024 线程,而 executer1 将只有 1 线程,那么我们仍然看到随着时间的推移差距越来越大。

我的期望是,在相应地调整线程池后,发布商将获得平衡。

  1. 我希望在发布商之间实现平衡(相对于线程池大小和处理时间)

  2. 如果我等待足够长的时间会发生什么?换句话说,是否会出现背压? (默认情况下我猜这是一个运行时异常,对吧?)

我不想删除项目,也不想出现运行时异常。相反,正如我提到的,我希望系统在其拥有的资源和处理时间方面保持平衡 - 上面的代码是否 promise 了这一点?

谢谢!

最佳答案

此示例中的 Flux 对象不是 ParallelFlux 对象,因此它们只会使用一个线程。

如果您创建一个能够处理数千个线程的调度程序,并将其传递给 Flux 对象之一,这并不重要 - 它们只会坐在那里不使用,这正是这个例子中发生了什么。没有背压,也不会导致异常 - 它只是使用一个线程尽可能快地运行。

如果您想确保 Flux 充分利用可用的 1024 个线程,那么您需要调用 .parallel(1024):

ParallelFlux<String> flux1 = Flux.merge(source).parallel(1).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(1);
return "1_" + counter1.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));

ParallelFlux<String> flux2 = Flux.merge(source).parallel(1024).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(100);
return "2_" + counter2.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));

如果您对代码执行此操作,那么您开始看到的结果更接近您的预期,2_ 超越了 1_,尽管事实上它是 sleep 时间延长 100 倍:

...
2_17075
2_17076
1_863
1_864
2_17077
1_865
2_17078
2_17079
...

但是,警告一下:

I'd like to achieve a balance between publishers (relative to the thread-pool sizes and processing time)

你不能在这里选择数字来平衡输出,至少不能可靠地或以任何有意义的方式 - 线程调度将是完全任意的。如果你想这样做,那么你可以使用 this variant of the subscribe method ,允许您在订阅使用者上显式调用 request()。这样您就可以通过仅请求您准备处理的尽可能多的元素来提供背压。

关于java - react 堆项目 : Handling fast and slow publishers,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57256014/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com