gpt4 book ai didi

java - 如何并行调度多个 Flux 并限制 onNext 和 onRequest 事件

转载 作者:行者123 更新时间:2023-12-01 17:22:53 26 4
gpt4 key购买 nike

我想组合多个Flux同一类型的在一起。在订阅时,它们应该并行执行。订阅方法应该能够限制请求数量。

我尝试过Flux.merge(..)Flux.concat(..) 。后者似乎强制执行顺序请求生产,即使在并行调度程序上发布时,第一个以急切的方式请求所有元素并且仅尊重 take(n)方法onNext事件但不适用于onRequest Activity 。

我准备了一个最小的例子。三 Flux这些都将在 Schedulers.boundedElastic() 上执行繁重的 I/O 工作。 。每个记录他们的 onRequest Activity 。

Flux<String> f1 = Flux.just("1").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 1));
Flux<String> f2 = Flux.just("2").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 2));
Flux<String> f3 = Flux.just("3").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 3));

接近Flux.concattake(2)

Flux.concat(f1, f2, f3).take(2)
.doOnNext(i -> LOGGER.info("onNext {}", i))
.subscribeOn(Schedulers.boundedElastic()).subscribe();

结果:

00:16:11.980   [oundedElastic-1] d.f.B.Application                        : onRequest 1
00:16:13.981 [oundedElastic-2] d.f.B.Application : onNext 1
00:16:13.981 [oundedElastic-2] d.f.B.Application : onRequest 2
00:16:15.982 [oundedElastic-3] d.f.B.Application : onNext 2

两个 onRequest和两个onNext事件被传播。请求延迟了两秒但与预期并行

接近Flux.maptake(2)

Flux.merge(f1, f2, f3).take(2)
.doOnNext(i -> LOGGER.info("onNext {}", i))
.subscribeOn(Schedulers.boundedElastic()).subscribe();

结果

00:21:58.301   [oundedElastic-1] d.f.B.Application                        : onRequest 1
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 2
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 3
00:22:00.303 [oundedElastic-4] d.f.B.Application : onNext 3
00:22:00.304 [oundedElastic-3] d.f.B.Application : onNext 2

onRequest 即使只有两个预期的事件也会被发出take(n)方法只是限制了发出的 onNext事件。执行按预期并行完成。

未知方法的预期结果

00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 2
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 3
00:22:00.303 [oundedElastic-4] d.f.B.Application : onNext 3
00:22:00.304 [oundedElastic-3] d.f.B.Application : onNext 2

只有两个onRequest事件已发出并完成

问题

是否可以将这三个结合起来 Flux以某种方式可以同时计算它们但在其请求行为中受到限制(take(n)?)?在我的真实场景中,请求会导致 WebClient 调用 WebService,从而启动耗时的 I/O 任务。因此,即使我的应用程序只接收两个 onNext 事件,第三个发出的 onRequest 也会导致请求进入涅槃状态。

最佳答案

我也是项目 react 堆的新手,但我想出了这个解决方法,有些人可能会称之为黑客:

Flux.fromIterable(Lists.partition(Arrays.asList(f1, f2, f3), 2))
.take(1)
.flatMap(partition -> Flux.merge(partition)
.doOnNext(i -> LOGGER.info("onNext {}", i)))
.subscribeOn(Schedulers.boundedElastic())
.subscribe();

它为我打印了这个:

2020-04-17 01:07:37.658  INFO 59488 --- [oundedElastic-1] com.example.demo.SomeService1            : onRequest 1
2020-04-17 01:07:37.659 INFO 59488 --- [oundedElastic-1] com.example.demo.SomeService1 : onRequest 2
2020-04-17 01:07:39.664 INFO 59488 --- [oundedElastic-3] com.example.demo.SomeService1 : onNext 2
2020-04-17 01:07:39.667 INFO 59488 --- [oundedElastic-2] com.example.demo.SomeService1 : onNext 1

关于java - 如何并行调度多个 Flux 并限制 onNext 和 onRequest 事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61261192/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com