gpt4 book ai didi

java - react 堆项目 : ConnectableFlux auto-connecting on demand

转载 作者:搜寻专家 更新时间:2023-11-01 00:53:58 24 4
gpt4 key购买 nike

我有一个数据项源,我想与多个下游流共享该 Flux。

它与the example in the reference guide 非常相似, 但我觉得这个例子通过手动调用 .connect() 作弊。具体来说,我不知道会有多少下游订阅者,而且我无法控制“最后”调用 .connect() 。消费者应该能够订阅,但不能立即触发数据拉取。然后在未来的某个地方,当实际需要数据时,他们会根据需要提取数据。

此外,源对消费敏感,因此无法重新获取。
此外,它将非常大,因此缓冲和重放不是一种选择。

理想情况下,最重要的是,整个事情发生在一个线程中,因此没有并发或等待。
(给订阅者加入一个非常短的等待时间是不可取的)

我几乎能够为 Monos 实现预期的效果(单一最终结果值):

public class CoConsumptionTest {
@Test
public void convenientCoConsumption() {
// List used just for the example:
List<Tuple2<String, String>> source = Arrays.asList(
Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
);

// Source which is sensitive to consumption
AtomicInteger consumedCount = new AtomicInteger(0);
Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();

@Override
public boolean hasNext() {
return sourceIterator.hasNext();
}

@Override
public Tuple2<String, String> next() {
Tuple2<String, String> e = sourceIterator.next();
consumedCount.incrementAndGet();
System.out.println("Audit: " + e);
return e;
}
};

// Logic in the service:
Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
ConnectableFlux<Tuple2<String, String>> co = f.publish();

Function<Predicate<Tuple2<String, String>>, Mono<Tuple2<String, String>>> findOne = (highlySelectivePredicate) ->
co.filter(highlySelectivePredicate)
.next() //gives us a Mono
.toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
.doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes

// Subscribing (outside the service)
assumeThat(consumedCount).hasValue(0);
Mono<Tuple2<String, String>> a2 = findOne.apply(select("a", "2"));
Mono<Tuple2<String, String>> b1 = findOne.apply(select("b", "1"));
Mono<Tuple2<String, String>> c1 = findOne.apply(select("c", "1"));
assertThat(consumedCount).hasValue(0);

// Data is needed
SoftAssertions softly = new SoftAssertions();

assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
softly.assertThat(consumedCount).hasValue(4);

assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
softly.assertThat(consumedCount).hasValue(4);

assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
softly.assertThat(consumedCount).hasValue(4);

softly.assertAll();
}

private static Predicate<Tuple2<String, String>> select(String t1, String t2) {
return e -> e.getT1().equals(t1) && e.getT2().equals(t2);
}
}

问题:我想知道如何为 Flux 结果实现这一点,即应用过滤后的多个值,而不仅仅是第一个/下一个。 (还是按需索取)
(尝试天真地将 .toProcessor() 替换为 .publish().autoConnect(0) 但没有成功)

编辑 1:虽然不允许对源进行缓冲,但作为参数出现的过滤器应该具有高度选择性,因此可以在过滤后进行缓冲。

编辑 2:一段时间后回到这个问题上,我在较新版本的 reactor 上尝试了我发布的示例,它确实有效。

io.projectreactor:reactor-bom:Californium-SR8
> io.projectreactor:reactor-core:3.2.9.RELEASE

最佳答案

我不喜欢给出“非答案”式的答案,但我认为至少一个您的要求必须在这里给出。从你的问题来看,要求似乎是:

  • 不允许缓冲
  • 不允许放置元素
  • 未知数量的订阅者
  • 订阅者可以随时连接
  • 每个订阅者都必须在需要时拥有所有可用数据
  • 不从源重新获取

假设一个订阅者从 Flux 请求数据,该 Flux 中的前几个元素被消耗,然后最终另一个订阅者出现在任意点将来想要相同的数据。对于上述要求,这是不可能的 - 您要么必须重新获取数据,要么将其保存在某个地方,并且您已经排除了这两种选择。

但是,如果您准备稍微放宽这些要求,那么有几个可能的选择:

已知订阅者数量

如果你能算出你最终会得到多少订阅者,那么你可以使用 autoConnect(n) 自动连接到 ConnectableFlux已订阅数。

允许删除元素

如果您可以允许删除元素,那么您只需在原始 Flux 上调用 share(); 使其在第一次订阅时自动连接, 然后 future 的订阅者将删除以前的元素。

为订阅者留出连接时间

这可能是更有希望的策略之一,因为您说:

no concurrency or waiting. (Giving a very small wait time for subscribers to join is not desirable)

You can turn the Flux into a hot source that caches all emitted elements for a certain time period.这意味着您可以以一定量的内存为代价(但无需缓冲整个流),为订阅者提供一小段等待时间,以便他们可以订阅并仍然接收所有数据。

缓冲已知数量的元素

与上面类似,您可以使用another variant of the cache() method只缓存已知数量的元素。如果您知道可以安全地将 n 元素装入内存,但不能再多了,那么这可以为订阅者提供尽可能多的安全连接时间。

关于java - react 堆项目 : ConnectableFlux auto-connecting on demand,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56618984/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com