gpt4 book ai didi

java - 如何使用 vert.x-rx 创建响应式(Reactive)客户端-服务器 TCP 通信

转载 作者:行者123 更新时间:2023-11-30 06:05:08 26 4
gpt4 key购买 nike

我目前正在开发一个项目,该项目需要在外部系统和我将编写的应用程序之间进行 TCP 通信(用 Java 编写)。众所周知,使用常规 NIO 可以轻松实现这一点。然而,作为我正在开发的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信。请引用下图:

enter image description here

在右侧,我的应用程序作为 TCP 服务器运行,等待来自左侧外部系统的连接。我读过要创建 TCP 并监听连接,您只需执行以下操作即可:

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});

但是,我不明白的是当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统。

如何使用 vert.x-rx 对 EchoRequestMessage 的接收、其解码、EchoResponseMessage 的编码执行响应式(Reactive)编程,然后将其发送回外部系统,所有这些都在一个构建器模式类型设置中。我读过有关 Observables 和订阅的内容,但我不知道要观察什么或订阅什么。任何帮助将不胜感激。

最佳答案

要从套接字读取数据,您可以使用RecordParser。在套接字连接上,数据通常由换行符分隔:

RecordParser parser = RecordParser.newDelimited("\n", sock);

RecordParser 是一个 Vert.x ReadStream,因此它可以转换为 Flowable:

FlowableHelper.toFlowable(parser)

现在,如果可以从 Buffer 创建 EchoRequestMessage:

public class EchoRequestMessage {
private String message;

public static EchoRequestMessage fromBuffer(Buffer buffer) {
// Deserialize
}

public String getMessage() {
return message;
}
}

并将 EchoResponseMessage 转换为 Buffer:

public class EchoResponseMessage {
private final String message;

public EchoResponseMessage(String message) {
this.message = message;
}

public Buffer toBuffer() {
// Serialize;
}
}

您可以使用 RxJava 运算符来实现回显服务器流程:

vertx.createNetServer().connectHandler(sock -> {

RecordParser parser = RecordParser.newDelimited("\n", sock);

FlowableHelper.toFlowable(parser)
.map(EchoRequestMessage::fromBuffer)
.map(echoRequestMessage -> {
return new EchoResponseMessage(echoRequestMessage.getMessage());
})
.subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
throwable.printStackTrace();
sock.close();
}, sock::close);

}).listen(1234);

[编辑]如果您的协议(protocol)消息不是行分隔而是长度前缀,那么您可以创建自定义ReadStream:

class LengthPrefixedStream implements ReadStream<Buffer> {
final RecordParser recordParser;
boolean prefix = false;

private LengthPrefixedStream(ReadStream<Buffer> stream) {
recordParser = RecordParser.newFixed(4, stream);
}

@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}

@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
if (handler == null) {
recordParser.handler(null);
return this;
}
recordParser.handler(buffer -> {
if (prefix) {
prefix = false;
recordParser.fixedSizeMode(buffer.getInt(0));
} else {
prefix = true;
recordParser.fixedSizeMode(4);
handler.handle(buffer);
}
});
return this;
}

@Override
public ReadStream<Buffer> pause() {
recordParser.pause();
return this;
}

@Override
public ReadStream<Buffer> resume() {
recordParser.resume();
return this;
}

@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
recordParser.endHandler(endHandler);
return this;
}
}

并将其转换为Flowable:

FlowableHelper.toFlowable(new LengthPrefixedStream(sock))

关于java - 如何使用 vert.x-rx 创建响应式(Reactive)客户端-服务器 TCP 通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47534495/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com