gpt4 book ai didi

带有 Kafka 和 Websockets 的 Spring WebFlux

转载 作者:行者123 更新时间:2023-12-04 14:09:22 27 4
gpt4 key购买 nike

现在我在我的 SpringBoot 应用程序中实现了一个简单的 Kafka 消费者和生产者,它工作正常我接下来要做的是我的消费者获取消费的消息并将其直接广播给所有订阅的客户端。我发现我不能将 STOMP Messaging 与 WebFlux 一起使用,所以我该如何完成这项任务,我看到了响应式 WebSocket 实现,但我不知道如何将我使用的数据发送到我的 websocket。
那是我的简单 KafkaProducer:

fun addMessage(message: Message){
val headers : MutableMap<String, Any> = HashMap()
headers[KafkaHeaders.TOPIC] = topicName
kafkaTemplate.send(GenericMessage<Message>(message, headers))
}
我的简单消费者看起来像这样:
@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
fun receiveData(message:Message) :Message{
//Take consumed data and send to websocket
}

最佳答案

我会考虑拥有一个 Sinks.many().multicast().onBackpressureBuffer()作为全局中间容器。然后在您的 receiveData()您只需将数据放入该 Reactor 抽象中即可。
对于您的 WebSocket 连接 session ,我建议实现 org.springframework.web.reactive.socket.WebSocketHandler并使用 Sinks.Many.asFlux()WebSocketSession.send(Publisher<WebSocketMessage> messages)应用程序接口(interface)。这样,只要连接到此 WebSocket 服务器,所有 session 都将使用相同的 Kafka 数据。
在文档中查看更多信息:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-websockethandler
更新
你可以在这里找到一些样本:https://github.com/artembilan/sandbox/tree/master/so-65667450

关于带有 Kafka 和 Websockets 的 Spring WebFlux,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65667450/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com