gpt4 book ai didi

apache-kafka - 如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?

转载 作者:行者123 更新时间:2023-12-04 14:09:45 25 4
gpt4 key购买 nike

我有一个基于 Webflux 的微服务,它有一个简单的响应式(Reactive)存储库:

    public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
}
现在我想扩展这个微服务来消费来自 Kafka 的事件消息。该消息/事件随后将被保存到数据库中。
对于 Kafka 监听器,我使用了 Spring Cloud Stream。我创建了一些简单的 Consumer 并且效果很好 - 我能够使用消息并将其保存到数据库中。
    @Bean
public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
return input ->
input.foreach((key, value) -> {
LOG.info("Received event, Key: {}, value: {}", key, value);
repository.save(initNotification(value)).subscribe();
});
}
但这是连接 Spring Cloud Stream 消费者和响应式(Reactive)存储库的正确方法吗?看起来不像是我必须打电话的时候 subscribe()到底。
我读了 Spring Cloud Stream documentation (for 3.0.0 release)他们说
Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.
还有 this presentation video他们提到他们有使用项目 react 器的响应式(Reactive)编程支持。所以我想有一种我不知道的方式。你能告诉我怎么做吗?
如果这一切听起来太愚蠢,我深表歉意,但我对 Spring Cloud Stream 和响应式(Reactive)编程很陌生,并且没有找到很多描述这一点的文章。

最佳答案

只需使用 Flux 作为消费类型,如下所示:

@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) {
return input ->
input
.map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
.concatMap((value) -> repository.save(initNotification(value)))
.subscribe();
}
如果您使用 Function如果返回类型为空( Function<Flux<Message<Event>>, Mono<Void>> )而不是 Consumer,则框架可以自动订阅。与 Consumer您必须手动订阅,因为框架没有对流的引用。但在 Consumer如果您订阅的不是存储库而是整个流,这没问题。

关于apache-kafka - 如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65309096/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com