gpt4 book ai didi

java - 通量平坦图背压

转载 作者:太空宇宙 更新时间:2023-11-04 09:32:56 26 4
gpt4 key购买 nike

如何应用反压来限制 Publisher 生成的项目数量超出并行运行的 flatMap 的处理能力?

1。设置

出于说明目的,有一个快速的 twitter 用户名生成器、一个慢速的 twitter 查找调用、一个慢速的 twitter 文件编写器和一个打印方法。

private Consumer<FluxSink<String>> twitterUsernameGenerator() {
return (sink) -> Stream.of("a", "b", "c", "d").forEach(sink::next);
}

private Flux<TwitterMessage>findTwitterMessagesByUsername() {
return Flux.create(sink -> {
sink.next(new TwitterMessage(...));
sleep(2000);
sink.next(new TwitterMessage(...));
}
});
}

private void print(Object o) {
System.out.println("[" + Thread.currentThread().getName() + "] " + o);
}

最终目标是并行运行 Twitter 查找,同时向生成器施加反压,以免发出超出可处理范围的用户名(预计会进行一些预取)。

2。生成 Twitter 用户名

Flux.create(twitterUsernameGenerator())
.publishOn(Schedulers.single())
.doOnNext(this::print)
.subscribe();

这很好地在一个单独的线程上生成 5 个 Twitter 用户名

[single-1] a
[single-1] b
[single-1] c
[single-1] d

3。查找 Twitter 消息

不确定它是否正确,但我认为,flatMap 从一个用户名生成许多 Twitter 消息,并行 在两个线程上执行此I/O 密集型操作。

Flux.create(twitterUsernameGenerator())
.publishOn(Schedulers.single())
.doOnNext(this::print)
.parallel(2)
.runOn(Schedulers.newParallel("p", 2))
.flatMap(username -> findTwitterMessagesByUsername(username))
.doOnNext(this::print)
.subscribe();

哇!生成器生成用户名的速度比我们能够处理的要快。

[single-1] a
[single-1] b
[single-1] c
[single-1] d
[p-1] TwitterMessage{...}
[p-2] TwitterMessage{...}
...

3。向生成器施加背压

如何将背压应用于生成器函数,以便结果变得更接近此

[single-1] a
[single-1] b
[p-1] TwitterMessage{...}
[single-1] c
[p-2] TwitterMessage{...}
[single-1] d
...

最佳答案

背压发生在大于 4 个元素的“批处理”中。如果您修改生成器以生成更多用户名,例如

  private Consumer<FluxSink<String>> twitterUsernameGenerator() {
return (sink) -> IntStream.rangeClosed(0, 100000)
.boxed().map(String::valueOf)
.collect(Collectors.toList())
.forEach(sink::next);
}

产生的通量与预期的通量有些相似。

您可以尝试将 onBackPressureError() 添加到原始 Flux:

Flux.create(twitterUsernameGenerator())
.onBackpressureError()
.publishOn(Schedulers.single())
.....

当背压发生时,它将通过抛出异常来清楚地发出信号。

关于java - 通量平坦图背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56869204/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com