- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
采用二进制点对点数据传输,主要应用于分布式架构之中,是一种基于Reactive Stream规范标准实现的新的通信协议.
参考 阿里云开发者社区的介绍 。
相关文档和资料:
RSocket By Example 。
rsocket-java 原生库例子 。
Spring RSocket 支持文档 。
在这里我们在客户端使用 rsocket-java 原生库,在服务端使用 spring-boot-starter-rsocket.
点击查看源代码 。
新建一个 RSocketController 类来处理 RSocket 相关的请求.
@Controller
public class RSocketController {
private static Logger logger = LoggerFactory.getLogger(RSocketController.class);
// 对到来的连接做一些处理
@ConnectMapping("connect.setup")
public Mono<Void> setup(String data, RSocketRequester rSocketRequester) {
logger.info("[connect.setup]Client connection: {}\n", data);
return Mono.empty();
}
}
RSocket 的 metadata 中可以包含路由(Routing)信息,这和 一般 WEB 框架通过解析 URL 将请求导向不同的处理函数是一样的。在连接建立时,客户端会发送一个 SETUP Payload, @ConnectMapping 可以通过解析 SETUP Payload 的 metadata 中的路由信息来使用不同的连接建立阶段的处理函数。在这里,只要 SETUP Payload 的 metadata 中的路由信息是 connect.setup ,该函数就会处理建立连接后客户端发送的 SETUP Payload.
RSocket 协议支持双方主动调用对方的函数。如果服务端想要主动向客户端发送请求,他就可以在连接建立时保存 RSocketRequester 对象以便服务端在需要时向客户端发起请求.
首先在这里我们假设客户端建立连接时会将 UUID 放在 SETUP Payload 的 data 中。然后我们声明一个类来保存 RSocketRequester ,代码如下:
public class ConnectedClient {
public RSocketRequester requester;
public Date connectedTime;
ConnectedClient(RSocketRequester requester) {
this.requester = requester;
this.connectedTime = new Date();
}
}
然后我们建立一个 Service 来管理客户端的 RSocketRequester 。在这里使用 ConcurrentHashMap 来存储 Requester,键是客户端的 UUID,值是 ConnectedClient 对象.
@Service
public class ConnectedClientsManager {
private static Logger logger = LoggerFactory.getLogger(ConnectedClientsManager.class);
public final ConcurrentHashMap<String, ConnectedClient> clients;
public ConnectedClientsManager() {
this.clients = new ConcurrentHashMap<>();
}
public Set<String> getAllClientIdentifier() {
return this.clients.keySet();
}
public RSocketRequester getClientRequester(String clientIdentifier) {
return this.clients.get(clientIdentifier).requester;
}
public void putClientRequester(String clientIdentifier, RSocketRequester requester) {
requester.rsocket()
.onClose()
.doFirst(() -> this.clients.put(clientIdentifier, new ConnectedClient(requester)))
.doFinally(sig -> {
logger.info("Client closed, uuid is {}. signal is {}.", clientIdentifier, sig.toString());
this.clients.remove(clientIdentifier);
}).subscribe();
}
public void removeClientRequester(String clientIdentifier) {
this.clients.remove(clientIdentifier);
}
}
然后我们就可以在 RSocketController 中引入 ConnectedClientsManager 了.
@Controller
public class RSocketController {
private static Logger logger = LoggerFactory.getLogger(RSocketController.class);
public static ConnectedClientsManager clientsManager;
@Autowired
private void initializeClientsManager() {
clientsManager = new ConnectedClientsManager();
}
...
最后我们编写连接处理函数,将 Requester 保存起来:
@ConnectMapping("connect.setup")
public Mono<Void> setup(String data, RSocketRequester rSocketRequester) {
logger.info("[connect.setup]Client connection: {}\n", data);
clientsManager.putClientRequester(data, rSocketRequester);
return Mono.empty();
}
下面是 spring application 配置 application.yaml :
spring:
rsocket:
server:
port: 8099
transport: tcp
点击查看源代码 。
public class ConnectionSetup {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);
UUID uuid = UUID.randomUUID();
......
ByteBuf setupRouteMetadata = TaggingMetadataCodec.createTaggingContent(
ByteBufAllocator.DEFAULT,
Collections.singletonList("connect.setup"));
RSocketConnector
建立 RSocket:
metadata
的内容 data
中存放 UUID 字符串, metadata
中存放路由信息 ClientTransport
和服务端建立连接 block()
在连接建立真正之前阻塞进程
RSocket socket = RSocketConnector.create()
// 设置 metadata MIME Type,方便服务端根据 MIME 类型确定 metadata 内容
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
// SETUP 阶段的 Payload,data 里面存放 UUID
.setupPayload(ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
setupRouteMetadata))
// 设置重连策略
.reconnect(Retry.backoff(2, Duration.ofMillis(500)))
.connect(
TcpClientTransport.create(
TcpClient.create()
.host("127.0.0.1")
.port(8099)))
.block();
然后可以使用 socket.onClose().block(); 保持连接。此时如果我们运行客户端,然后再关闭客户端的话,会在服务端看到输出:
表明客户端和服务端建立了连接之后又关闭了连接.
最后此篇关于【RSocket】使用RSocket(一)——建立连接的文章就讲到这里了,如果你想了解更多关于【RSocket】使用RSocket(一)——建立连接的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
如何设置路由元数据(在服务器使用 Spring Boot Rsocket 时仅使用 RSocket-Java 的有效负载。Flux s = connection.flatMapMany(request
好的,我是 RSocket 的新手。我正在尝试创建一个简单的 RSocket 客户端和简单的 RSocket 服务器。从我所做的研究来看,RSocket 支持恢复: 它特别有用,因为当发送包含有关最后
我对 RSocket 完全陌生。 我阅读了常见问题解答和动机页面(并浏览了协议(protocol)页面)并了解 RSocket 可以在 TCP、WebSocket 和 Aeron 之上使用。但是我不明
我正在尝试在 Vue 中创建一个站点并在 Spring 上创建后端。我想使用 rsocket 来传输数据,但是只要我在 spring 中添加 rsocket seurity,我就会得到: 'metad
我在通过 TCP 连接到 Spring Boot RSocket 应用程序时遇到问题。使用 RSocketRequester 时的客户端工作正常,但是当我尝试使用 RSocketFactory 客户端
Is it somehow possible to use Spring's RSocket integration in a "traditional" servlet-b
目录 0. RSocket 简介 1. 服务端 1.1 SETUP阶段 - 处理客户端发起的连接请求
Source Code: https://github.com/joexu01/rsocket-demo 0. 四种通信模式 让我们来简单复习一下 RSocket 的四种通信模式:
1. 编写客户端接收请求的逻辑 我们可以在初始化 Rsocket 实例的时候指定客户端可以被调用的方法,使用 acceptor() 指定可被调用的方法和方法使用的通信模型类型:
rsocket 似乎是个很酷的主意。我有这个问题,但找不到更好的答案。 让我们考虑这个初始设置。客户端依次向server-1发送多个请求。 client --> server-1 server-1 正
我已经使用rsocket编写了简单的客户端和服务器程序-使用以下版本的librdmacm-dev和librdmacm1软件包(使用Ubuntu 14.04)的RDMA套接字API: librdmacm
在我的项目中,我希望有多个客户端连接到一个服务。我正在使用 java Rsocket 实现。 服务应该为每个客户端维护一个状态。现在在这一点上,我可以通过一些标识符来管理客户端。这个选项我已经实现了。
这是我的代码,它仅适用于我所在的选项卡。我收到了回复,似乎一切正常,但我仍然不完全了解该技术的操作,这就是我去找你的原因。 它以“responseHanlder”结尾 connect() {
有人可以告诉我或使用 提供现成的 CRUD 示例吗? WebFlux、RScoket 和 Spring(或 SpringBoot) ? 我研究了 RSocket 文档, WebFlux ,也写了我的简
我必须再次回到 Symbian 中的套接字。建立与远程服务器的连接的代码如下所示: TInetAddr serverAddr; TUint iPort=111; TRequestStatus iS
我似乎无法在 RSocket 上找到任何资源/教程,除了阅读他们在 GitHub 上的代码,我不明白。 我的服务器上有一个文件路径:String serverFilePath; 我希望能够从我的客户端
假设我有这个用于聊天消息的简单 Websocket 处理程序: @Override public Mono handle(WebSocketSession webSocketSession) {
我正在尝试为我的服务器编写一个客户端(在 Kotlin 和使用 Spring Reactive Web 中)。我在尝试使用 RSocket 时遇到了这个问题。如何使用 RSocket 获得 Flux?
我正在使用 Spring 对 RSocket 的支持,特别是请求流模型。 IE。: @MessageMapping("stream") Flux stream(final SubscriptionMe
解决摘要: 在目前的大多数 RSocket 示例中,即使在 SpringBoot 相关教程中,服务器端接受器也被简单地构造为一个新对象(如下面的 new MqttMessageService() )。
我是一名优秀的程序员,十分优秀!