gpt4 book ai didi

java - Spring react 堆 : How to wait for multiple Flux's by key?

转载 作者:行者123 更新时间:2023-11-30 10:02:26 24 4
gpt4 key购买 nike

从概念上讲,我有一个源(无限期地)发出 IP 地址和两个处理器。

这些处理器本质上是在发出 IO 请求。我想做的是在这些处理器完成后合并它们的结果,并将它们传递给某个可以同时处理这两个结果的接收器。

我试着写了一些玩具示例,但它不起作用,因为 source Flux 永无止境。

正确的做法是什么?

public class Demo {

public static void main(String[] args) throws Exception {


Flux<String> source = Flux.fromIterable(Lists.newArrayList("1.1.1.1", "2.2.2.2", "3.3.3.3")).delayElements(Duration.ofMillis(500)).repeat();
ConnectableFlux<String> ipsFlux = source.publish();

Flux<Foo> fooFlux1 = Flux.from(ipsFlux)
.map(ip -> new Foo(ip, "1"));

Flux<Foo> fooFlux2 = Flux.from(ipsFlux)
.map(ip -> new Foo(ip, "2"));

Flux.merge(fooFlux1, fooFlux2)
.groupBy(Foo::getId, Function.identity())
.subscribe(flux -> flux.collectMap(foo -> foo.type).subscribe(System.out::println));

ipsFlux.connect();

Thread.currentThread().join();

}

static class Foo {
String id;
String type;

public Foo(String id, String type) {
this.id = id;
this.type = type;
}

public String getId() {
return id;
}

@Override
public String toString() {
return "Foo{" +
"id='" + id + '\'' +
", value='" + type + '\'' +
'}';
}
}
}

最佳答案

查看合并运算符 ( https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#merge-org.reactivestreams.Publisher...- ) 的文档,似乎合并不适合处理无限流:

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

我会尝试 zip 运算符 ( https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#zip-org.reactivestreams.Publisher-org.reactivestreams.Publisher- )

Flux<Tuple2<Foo, Foo>> zipped = Flux.zip(fooFlux1, fooFlux2);

然后您的接收器可以在 Foo 可用时立即消耗它。

关于java - Spring react 堆 : How to wait for multiple Flux's by key?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56953907/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com