gpt4 book ai didi

java - Spring WebClient 和长轮询

转载 作者:行者123 更新时间:2023-12-04 09:38:14 30 4
gpt4 key购买 nike

我想使用 Spring 的响应式 WebClient 来轮询使用长轮询的 REST 端点。

端点为聊天 channel 提供消息。当我调用它并且没有消息时,它会阻塞(即不返回),直到出现消息(或 30 秒过去)。

因此,在同步世界中,我会专门使用一个线程来监控这个 channel ,通过 RestTemplate 调用端点,等待结果,将其写入共享队列并开始下一个请求。然后,消费者可以对出现在队列中的新项目使用react。

在响应式(Reactive)世界中,这有点不同。理想情况下,消费者会订阅 Flux 消息。问题是如何构建这个 Flux。

逻辑应该是:

Mono<String> message = WebClient.get(). […] .bodyToMono(String.class);
// When the mono completes, create a new one just as described above
// Combine all of the monos into a Flux
flux.subscribe(message -> System.out.println("New message" + message);

我认为我需要某种 switch…运营商,但我可以找到正确的。

最佳答案

正如@123 指出的:

You can just use repeat, i.e WebClient.get(). […] .bodyToMono(String.class).repeat(), will give you a flux of the replies, and will only start the next one when the previous is done.



其实这里需要的是 defer()repeat() : defer()接受 Monos 和 repeat() 的供应商完成之前的订阅后,将重新订阅 Mono。这将导致供应商再次被调用,因此将启动一个新的 http 请求。

此外,由于它永远运行,它会导致应用程序不干净地关闭:如果发生关闭,则可能有一个正在运行的 http 请求。要彻底结束 Flux, takeUntilOther()可以使用,它需要另一个 Publisher(如 EmitterProcessor )。然后,在 @PreDestroy方法,可以调用 shutdown.onNext(true) ,这将导致http请求被取消。

我的解决方案现在看起来像这样:
   Mono.defer(() -> receiveMessage())
.repeat()
.takeUntilOther(shutdown)
.subscribe(message -> System.out.println("New message" + message);

关于java - Spring WebClient 和长轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62445591/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com