gpt4 book ai didi

java - 响应式(Reactive)存储库

转载 作者:太空宇宙 更新时间:2023-11-04 10:37:52 25 4
gpt4 key购买 nike

我尝试创建一些基于响应式(Reactive)堆栈(Reactor + WebFlux)的基本 Spring 5 应用程序。

我的下一个目标是实现响应式(Reactive)存储库,它能够:

  1. 保存书籍。
  2. 查找所有书籍。

我的存储库需要涵盖以下场景:

场景A:

  1. 没有人订阅 FindAll
  2. 有人保存了一本书(id = 1)
  3. Client1 订阅 FindAll
  4. 图书 (id=1) 已推送至 Client1(Client1 保持订阅状态,流式传输未完成!)
  5. 有人保存了一本书(id = 2)
  6. 图书 (id=2) 已推送至 Client1(Client1 保持订阅状态,流式传输未完成!)

因此,在我看来,这种情况是冷源和热源概念的混合。在任何人订阅之前,我们都会收集某人在我们的存储库中的某个缓冲区中保存的数据(假设是普通列表)。对于将订阅 FindAll 的所有订阅者,我们需要推送缓冲列表(在订阅之前收集的列表),并且不要完成流以允许推送以后的集合更新。

我能够实现这一目标,但我仍然在想是否有更简单的方法可以做到这一点?也许 Reactor 项目中有一个解决方案已经涵盖了这个场景?

我的实现:

public class InMemoryBookRepository {

private final Map<String, Book> bookMap = new ConcurrentHashMap<>();
private final UnicastProcessor<Book> processor = UnicastProcessor.create();
private final FluxSink<Book> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<Book> hotFlux = processor.publish().autoConnect();

@Override
public void save(Book book) {
bookMap.put(book.getId(), book);
fluxSink.next(book);
}

@Override
public Flux<Book> findAll() {
//without fromIterable I cannot push books that where saved before someone subscribed
return Flux.fromIterable(bookMap.values())
.concatWith(hotFlux)
//Unfortunately this solution produces duplicates so we need to filter them
.distinct();
}
}

Ofc,我不能只使用 Cold 出版商 - 因为流将在发布收集的书籍后完成。出于同样的原因,我不能使用 Hot one,因为我会错过某人订阅之前生成的元素。

旁注:在我的代码中,我的 map 没有任何清理机制,因此它会在某些时候产生异常,但现在这并不重要。

最佳答案

如此简单...我不知道为什么我错过了这个美丽的运算符:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#replay--

所以基本上删除整个 List/Map 部分代码并使用 replay() 而不是 publish()

简化示例:

UnicastProcessor<String> processor = UnicastProcessor.create();
FluxSink<String> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
//change 'publish()' to 'replay()'
Flux<String> hotFlux = processor.publish().autoConnect();

hotFlux.subscribe(n -> log.info("1st subscriber: {}", n));

fluxSink.next("one");

hotFlux.subscribe(n -> log.info("2nd subscriber: {}", n));

fluxSink.next("two");

使用publish()输出:

1st subscriber: one
1st subscriber: two
2nd subscriber: two

使用replay()输出:

1st subscriber: one
2nd subscriber: one
1st subscriber: two
2nd subscriber: two

关于java - 响应式(Reactive)存储库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49295745/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com