gpt4 book ai didi

java - Websocket 通信与 Netty 环境

转载 作者:行者123 更新时间:2023-12-01 17:53:49 24 4
gpt4 key购买 nike

我需要实现两个Java环境之间的通信。接收者是一个 SpringBoot 响应式(Reactive)应用程序,处理通信的代码片段如下(我将跳过 bean 的配置)

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(webSocketSession.receive() // <- Step 0
.map(message -> {
log.info("Step 1");
return message.getPayloadAsText();
})
.map(message -> {
log.info("Step 2");
return webSocketSession.textMessage(this.receiveMessage(message));
}));
}

客户端部分是使用java 11中的Http API实现的

WebSocket webSocket = HttpClient
.newBuilder().executor(executor).build()
.newWebSocketBuilder()
.buildAsync(URI.create(url), new WebSocket.Listener() {
@Override
public void onOpen(WebSocket webSocket) {
log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
WebSocket.Listener.super.onOpen(webSocket);
}

@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
log.info("onText received with data " + data);
return WebSocket.Listener.super.onText(webSocket, data, last);
}

@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
log.info("Closed with status " + statusCode + ", reason: " + reason);
return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
}

@Override
public void onError(WebSocket webSocket, Throwable error) {
log.error("Error: " + error.getMessage());
WebSocket.Listener.super.onError(webSocket, error);
}

}).join();


webSocket.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true);
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok").thenRun(() -> log.info("Sent close"));

使用调试,我可以注意到一旦 join()已完成,WebSocket返回实例,执行receiver第0步的方法,Mono<Void>返回实例。

但问题是,即使我发送一些文本,步骤 1 和 2 也永远不会执行!

如果我尝试反向通信(从 SpringBoot 应用程序向 Sender 应用程序发送某些内容),则会收到消息。

最后,这是来自onClose的日志sendClose之后回调执行声明。

Closed with status 1002, reason: Server internal error

最佳答案

解决方案

buildAsync方法返回 CompletableFuture<WebSocket> 的实例,我们需要在使用 join() 刷新消息队列之前发送消息链

解决办法在这里

WebSocket webSocket = HttpClient
.newBuilder().executor(executor).build()
.newWebSocketBuilder()
.buildAsync(URI.create(url), new WebSocket.Listener() {
@Override
public void onOpen(WebSocket webSocket) {
log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
WebSocket.Listener.super.onOpen(webSocket);
}

@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
log.info("onText received with data " + data);
return WebSocket.Listener.super.onText(webSocket, data, last);
}

@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
log.info("Closed with status " + statusCode + ", reason: " + reason);
return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
}

@Override
public void onError(WebSocket webSocket, Throwable error) {
log.error("Error: " + error.getMessage());
WebSocket.Listener.super.onError(webSocket, error);
}

})
.thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), false))
.thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true))
.thenCompose(ws -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""))
.join();

关于java - Websocket 通信与 Netty 环境,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60753498/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com