gpt4 book ai didi

spring-amqp - 在 spring-webflux 中使用 Spring AMQP 消费者

转载 作者:行者123 更新时间:2023-12-01 23:04:37 26 4
gpt4 key购买 nike

我有一个使用带有 webflux 的 Boot 2.0 的应用程序,并且有一个端点返回 ServerSentEvent 的 Flux。这些事件是通过利用 spring-amqp 从 RabbitMQ 队列中消费消息来创建的。我的问题是:如何最好地将 MessageListener 的已配置监听器方法连接到可以传递给我的 Controller 的 Flux?

Project Reactor 的 create部分提到它“将现有 API 与响应式(Reactive)世界连接起来非常有用 - 例如基于监听器的异步 API”,但我不确定如何直接连接到消息监听器,因为它包含在 DirectMessageListenerContainerMessageListenerAdapter。他们在创建部分的示例:

Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {

public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}

public void processComplete() {
sink.complete();
}
});
});

到目前为止,我最好的选择是创建一个 Processor 并在 RabbitMQ 监听器方法中每次调用 onNext() 以手动生成事件。

最佳答案

我有这样的事情:

@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {

public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);

RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("foo", "event-" + i);
}

}

private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);

@GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSeeFromAmqp() {
return this.sseFluxProcessor;
}

@RabbitListener(id = "fooListener", queues = "foo")
public void handleAmqpMessages(String message) {
this.sseFluxProcessor.onNext(message);
}

}

TopicProcessor.share() 允许有许多并发订阅者,当我们将此 TopicProcessor 作为 Flux 返回给我们的 /sseFromAmqp 通过 WebFlux 的 REST 请求。

@RabbitListener 只是将其接收到的消息委托(delegate)给该 TopicProcessor

main() 我有一个代码来确认即使没有订阅者我也可以发布到 TopicProcessor

使用两个单独的 curl session 进行测试,并通过 RabbitMQ 管理插件将消息发布到队列。

顺便说一下,我使用 share() 因为:https://projectreactor.io/docs/core/release/reference/#_topicprocessor

from multiple upstream Publishers when created in the shared configuration

那是因为 @RabbitListener 确实可以从不同的 ListenerContainer 线程同时调用。

更新

我还将这个示例移到了我的 Sandbox:https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux

关于spring-amqp - 在 spring-webflux 中使用 Spring AMQP 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49662157/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com