gpt4 book ai didi

java - react 堆项目 : How to control Flux emission

转载 作者:行者123 更新时间:2023-11-30 05:32:49 24 4
gpt4 key购买 nike

我有一个能发出一些日期的通量。此日期映射到我在某个执行器上运行的1024个模拟HTTP请求。

我想要做的是在发出下一个 Date 之前等待所有 1024 个 HTTP 请求。

当前运行时,onNext()被调用多次,然后稳定在某个稳定的速率。

如何改变这种行为?

附注如果需要的话,我愿意转向架构。

private void run() throws Exception {

Executor executor = Executors.newFixedThreadPool(2);

Flux<Date> source = Flux.generate(emitter ->
emitter.next(new Date())
);

source
.log()
.limitRate(1)
.doOnNext(date -> System.out.println("on next: " + date))
.map(date -> Flux.range(0, 1024))
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)))
.subscribe(s -> System.out.println(s));

Thread.currentThread().join();
}

HTTP请求模拟:

private static String simulateHttp() {
try {
System.out.println("start http call");
Thread.sleep(3_000);
} catch (Exception e) {}

return "HTML content";
}

编辑:改编自答案的代码:

  • 首先,我的代码中有一个错误(需要另一个 flatMap)
  • 其次,我给两个flatMap都添加了1concurrency参数(看来两个都需要)

    Executor executor = Executors.newSingleThreadExecutor();

    Flux<Date> source = Flux.generate(emitter -> {
    System.out.println("emitter called!");
    emitter.next(new Date());
    });

    source
    .limitRate(1)
    .map(date -> Flux.range(0, 16))
    .flatMap(Function.identity(), 1) # concurrency = 1
    .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
    .subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
    .subscribe(s -> System.out.println(s));

    Thread.currentThread().join();

最佳答案

您应该看看这些方法:

concatMap 确保通量上的元素在运算符内按顺序处理:

Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.

flatMap 允许您通过公开 concurrencyprefetch 参数来执行相同的操作,这使您可以更好地控制此行为:

The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher (in other words prefetch size means the size of the first Subscription.request(long) to the merged Publisher).

关于java - react 堆项目 : How to control Flux emission,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57199893/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com