gpt4 book ai didi

java - Flux.fromStream 拆分和合并时失败

转载 作者:行者123 更新时间:2023-11-30 05:41:19 24 4
gpt4 key购买 nike

我有这个示例代码:

Flux<Integer> range = Flux.range(0, 10);
Flux<Long> longs = Flux.fromStream(new Random().longs(100, 500).boxed()); // (1)
// Flux<Long> longs = Flux.fromIterable(new Random().longs(100, 500).boxed().limit(30).collect(Collectors.toList())); // (2)

Flux<Tuple2<Integer, Long>> flux1 = Flux.zip(range, longs);

Flux<Integer> flux2 = flux1.map(e -> 2);
Flux<Integer> flux3 = flux1.map(e -> 3);

CountDownLatch countDownLatch = new CountDownLatch(1);

Flux.merge(flux2, flux3)
.doOnComplete(() -> countDownLatch.countDown())
.subscribe(e -> log.info("{}", e));

countDownLatch.await(1, TimeUnit.MINUTES);

此操作失败:

Caused by: java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:343)
at java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:139)
at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:57)
at reactor.core.publisher.Flux.subscribe(Flux.java:7777)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:579)
...

注释行 (1) 和取消注释行 (2) 可以解决该问题,但在我的用例中,longs 与 (1) 一样是无界的。我该如何解决这个问题?

真正的用例是在 flux2flux3 都完成时执行某些操作,它们在 map 调用中产生副作用 - 写入在本例中是一个文件,因此我需要确保在退出之前所有内容都已写入。

最佳答案

您可以使用延迟:

Flux<Long> longs = Flux.defer(() -> Flux.fromStream(new Random().longs(100, 500).boxed()));

即使您只有一个显式订阅,您也可以基于longs创建多个流。为每个创建一个订阅。

Flux.fromStream 只能订阅一次,因为 Java 流只能使用一次。

defer 通过为每个订阅者创建一个新流来解决这个问题。

关于java - Flux.fromStream 拆分和合并时失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55581462/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com