gpt4 book ai didi

java - spring webflux - websocketclient 线程未终止

转载 作者:行者123 更新时间:2023-11-30 06:24:55 29 4
gpt4 key购买 nike

我正在使用 Spring WebFlux WebSocketClient 来订阅和处理来自远程 Web 套接字的消息。在处理来自远程套接字的消息 Flux 有时会意外完成(或因错误而终止),从而导致 Web 套接字客户端的 onComplete (或 onError)回调执行。发生这种情况时,我的 onCompleteonError 回调会发布一个事件。事件监听器通过调用创建另一个 Web 套接字客户端的函数来响应,该客户端连接到同一外部 Web 套接字,并且套接字处理重新开始。

我的问题是,我无法弄清楚如何在客户端完成处理后释放 WebSocketClient 资源。这会导致未使用的线程在 JVM 中累积。特别是第一个 WebSocketClient 运行的线程(WebSocketClient-SecureIO-1WebSocketClient-SecureIO-2parallel- 1) 保持等待状态,并为新的“WebSocketClient”启动新线程。我以为在 WebSocketSession 上调用 close() 可以解决问题,但事实并非如此。

我的实现模式是:

public void startProcessing() {
WebSocketClient client = new StandardWebSocketClient();
Mono<String> subscribeMsg = Mono.just("...");

client
.execute(uri, webSocketSession ->
webSocketSession
.send(subscribeMsg.map(webSocketSession::textMessage))
.thenMany(webSocketSession.receive())
.map(webSocketMessage -> ...)
.buffer(Duration.ofSeconds(bufferDuration))
.doOnNext(handler)
.doOnComplete(() -> webSocketSession.close())
.then())
.subscribe(
aVoid -> LOGGER.info("subscription started"),
throwable -> {... publish restart event ...},
() -> {... publish restart event ...});
}

public void restartEventListener() {
startProcessing();
}

有关如何防止未使用的 WebSocketClient 线程在 JVM 中累积的任何建议?

最佳答案

一些想法:

一个WebSocketClient正在池化资源,因此您应该为许多请求重用同一个客户端。

您应该避免在 doOn* 内进行处理运营商。这些是副作用运算符,在当前 Scheduler 上同步执行。 。为了提高效率,您应该使用其他运算符。您可以将 websocket 消息映射到 Flux<DataBuffer>然后使用 DataBufferUtils::write将它们写入文件并仍然利用相同的响应式(Reactive)管道,而不是使用副作用运算符。

关闭其中之一的 websocket session 并不是一个坏主意,尽管我会使用 doOnTerminate成功和错误情况都会触发。

我也不明白发布事件以重新启动处理阶段的目标。使用retryrepeat运算符(operator)和同一个客户端应该工作得很好并且效率更高。

关于java - spring webflux - websocketclient 线程未终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47382187/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com