gpt4 book ai didi

java - 如何在准备就绪时使用响应式(Reactive) Flux/Mono 将消息推送到上游,而不是间隔轮询状态?

转载 作者:行者123 更新时间:2023-11-30 01:56:49 24 4
gpt4 key购买 nike

尝试在消息可用/准备就绪时将消息推送到上游,并在刷新后关闭连接,而不是使用 Spring React Flux 间隔轮询消息。

@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {

return Flux.<String>interval(Duration.ofSeconds(3))
.map(status -> {
if (getSomething(randomId).
equalsIgnoreCase("value"))
return "value";
return "ping";
}).take(Duration.ofSeconds(60)).timeout(Duration.ofSeconds(60));
}

Kafka 监听器在获取时更新映射中的 randomId 值,getSomething 方法按间隔检查映射中的 randomId 值。因此,我不想检查间隔并将数据存储在 map 中,而是想在监听器接收时将消息推送到客户端。

最佳答案

我基于此 stackoverflow 构建了解决方案 Spring 5 Web Reactive - Hot Publishing - How to use EmitterProcessor to bridge a MessageListener to an event stream答案是,使用 EmitterProcessor 来热发布可用的消息。

这是示例代码

@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {
EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
Flux<String> autoConnect = emitterProcessor.publish().autoConnect();
FluxSink<String> sink = emitterProcessor.sink();
//storing randomId and processor sink details
randomIdMap.putIfAbsent(randomId, emitterProcessor);
/** This will return ping status to notify client as
connection is alive until the randomId message received. **/
sendPingStatus(sink, randomId);
}

下面的方法展示了如何在消息到达 kafka 消费者时将消息推送到客户端并关闭通量连接。

@KafkaListener(topics = "some-subscription-id",
containerFactory = "kafkaListenerContainerFactory")
public void pushMessage(SomeMessage message, Acknowledgment acknowledgment) {
EmitterProcessor emitter = randomIdMap.get("randomId");
if (emitter != null ) {
emitter.onNext(message);
emitter.onComplete();
randomIdMap.remove("randomId");
acknowledgment.acknowledge();
}
}

关于java - 如何在准备就绪时使用响应式(Reactive) Flux/Mono 将消息推送到上游,而不是间隔轮询状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54161603/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com