gpt4 book ai didi

websocket - 使用带有 Rsocket 和 Spring Webflux 的 websockets 仅向特定客户端发送消息

转载 作者:行者123 更新时间:2023-12-04 01:31:04 54 4
gpt4 key购买 nike

我正在尝试在我的一个 POC 项目中将 Rsocket 与 websocket 结合使用。在我的例子中,不需要用户登录。当我收到来自其他服务的消息时,我想只向某些客户发送消息。基本上,我的流程是这样的。

                                  Service A                               Service B   
|--------| websocket |------------------| Queue based comm |---------------|
| Web |----------------->| Rsocket server |--------------------->| Another |
| |<-----------------| using Websocket |<---------------------| service |
|--------| websocket |------------------| Queue based comm |---------------|

就我而言,我正在考虑为每个连接和每个请求使用一个唯一的 ID。将两个标识符合并为相关 ID,并将消息发送到 Service B,当我从 Service B 收到消息时,确定它需要转到哪个客户端并发送它。现在我知道我可能不需要 2 项服务来执行此操作,但我这样做是出于其他一些原因。虽然我对如何实现其他部分有一个粗略的想法。我是 Rsocket 概念的新手。是否可以使用Spring Boot Webflux、Rsocket和websocket通过某个id向唯一的某个客户端发送消息?

最佳答案

基本上,我认为您有两个选择。第一个是过滤来自Service B的Flux , 第二个是使用 RSocketRequesterMap如@NikolaB 所述。

第一个选项:

data class News(val category: String, val news: String)
data class PrivateNews(val destination: String, val news: News)

class NewsProvider {

private val duration: Long = 250

private val externalNewsProcessor = DirectProcessor.create<News>().serialize()
private val sink = externalNewsProcessor.sink()

fun allNews(): Flux<News> {
return Flux
.merge(
carNews(), bikeNews(), cosmeticsNews(),
externalNewsProcessor)
.delayElements(Duration.ofMillis(duration))
}

fun externalNews(): Flux<News> {
return externalNewsProcessor;
}

fun addExternalNews(news: News) {
sink.next(news);
}

fun carNews(): Flux<News> {
return Flux
.just("new lambo!!", "amazing ferrari!", "great porsche", "very cool audi RS4 Avant", "Tesla i smarter than you")
.map { News("CAR", it) }
.delayElements(Duration.ofMillis(duration))
.log()
}

fun bikeNews(): Flux<News> {
return Flux
.just("specialized enduro still the biggest dream", "giant anthem fast as hell", "gravel long distance test")
.map { News("BIKE", it) }
.delayElements(Duration.ofMillis(duration))
.log()
}

fun cosmeticsNews(): Flux<News> {
return Flux
.just("nivea - no one wants to hear about that", "rexona anti-odor test")
.map { News("COSMETICS", it) }
.delayElements(Duration.ofMillis(duration))
.log()
}

}

@RestController
@RequestMapping("/sse")
@CrossOrigin("*")
class NewsRestController() {
private val log = LoggerFactory.getLogger(NewsRestController::class.java)

val newsProvider = NewsProvider()

@GetMapping(value = ["/news/{category}"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun allNewsByCategory(@PathVariable category: String): Flux<News> {
log.info("hello, getting all news by category: {}!", category)
return newsProvider
.allNews()
.filter { it.category == category }
}
}

NewsProvider类是对你的 Service B 的模拟,应该返回 Flux<> .每当您调用 addExternalNews它会插入 NewsallNews 返回方法。在NewsRestController类,我们按类别过滤新闻。在 localhost:8080/sse/news/CAR 上打开浏览器仅查看汽车新闻。

如果你想改用RSocket,你可以使用这样的方法:

    @MessageMapping("news.{category}")
fun allNewsByCategory(@DestinationVariable category: String): Flux<News> {
log.info("RSocket, getting all news by category: {}!", category)
return newsProvider
.allNews()
.filter { it.category == category }
}

第二个选项:

让我们存储 RSocketRequesterHashMap (我使用 vavr.io)与 @ConnectMapping .

@Controller
class RSocketConnectionController {

private val log = LoggerFactory.getLogger(RSocketConnectionController::class.java)

private var requesterMap: Map<String, RSocketRequester> = HashMap.empty()

@Synchronized
private fun getRequesterMap(): Map<String, RSocketRequester> {
return requesterMap
}

@Synchronized
private fun addRequester(rSocketRequester: RSocketRequester, clientId: String) {
log.info("adding requester {}", clientId)
requesterMap = requesterMap.put(clientId, rSocketRequester)
}

@Synchronized
private fun removeRequester(clientId: String) {
log.info("removing requester {}", clientId)
requesterMap = requesterMap.remove(clientId)
}

@ConnectMapping("client-id")
fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
val clientIdFixed = clientId.replace("\"", "") //check serialezer why the add " to strings
// rSocketRequester.rsocket().dispose() //to reject connection
rSocketRequester
.rsocket()
.onClose()
.subscribe(null, null, {
log.info("{} just disconnected", clientIdFixed)
removeRequester(clientIdFixed)
})
addRequester(rSocketRequester, clientIdFixed)
}

@MessageMapping("private.news")
fun privateNews(news: PrivateNews, rSocketRequesterParam: RSocketRequester) {
getRequesterMap()
.filterKeys { key -> checkDestination(news, key) }
.values()
.forEach { requester -> sendMessage(requester, news) }
}

private fun sendMessage(requester: RSocketRequester, news: PrivateNews) {
requester
.route("news.${news.news.category}")
.data(news.news)
.send()
.subscribe()
}

private fun checkDestination(news: PrivateNews, key: String): Boolean {
val list = destinations(news)
return list.contains(key)
}

private fun destinations(news: PrivateNews): List<String> {
return news.destination
.split(",")
.map { it.trim() }
}
}

请注意,我们必须在 rsocket-js 中添加两件事客户端:SETUP 框架中的有效载荷,用于提供客户端 ID 并注册响应程序,以处理由 RSocketRequester 发送的消息.

const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
//for connection mapping on server
payload: {
data: "provide-unique-client-id-here",
metadata: String.fromCharCode("client-id".length) + "client-id"
},
// ms btw sending keepalive to server
keepAlive: 60000,

// ms timeout if no keepalive response
lifetime: 180000,

// format of `data`
dataMimeType: "application/json",

// format of `metadata`
metadataMimeType: "message/x.rsocket.routing.v0"
},
responder: responder,
transport
});

有关这方面的更多信息,请参阅此问题:How to handle message sent from server to client with RSocket?

关于websocket - 使用带有 Rsocket 和 Spring Webflux 的 websockets 仅向特定客户端发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61045066/

54 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com