- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
我们可以在初始化 Rsocket 实例的时候指定客户端可以被调用的方法,使用 acceptor() 指定可被调用的方法和方法使用的通信模型类型:
RequestResponse
时:
.acceptor(SocketAcceptor.forRequestResponse(payload -> {}))
FireAndForget
时
.acceptor(SocketAcceptor.forFireAndForget(payload -> {}))
RequestStream
时
.acceptor(SocketAcceptor.forRequestStream(payload -> {}))
RequestStream
时
.acceptor(SocketAcceptor.forRequestChannel(
payloads ->
Flux.from(payloads)...));
接下来编写客户端方法的处理逻辑,以 RequestResponse 为例 。
https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/CallingTheClientSide.java 。
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);
// 随机生成 UUID 标识客户端
UUID uuid = UUID.randomUUID();
logger.info("My UUID is {}", uuid);
// 生成 SETUP 阶段(建立连接时) Payload 使用的 route 信息
ByteBuf setupRouteMetadata = encodeRoute("connect.setup");
RSocket socket = RSocketConnector.create()
// 设置 metadata MIME Type,方便服务端根据 MIME 类型确定 metadata 内容
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
// SETUP 阶段的 Payload,data 里面存放 UUID
.setupPayload(ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
setupRouteMetadata))
// 编写 Request&Response Acceptor
.acceptor(SocketAcceptor.forRequestResponse(
payload -> {
String route = decodeRoute(payload.sliceMetadata());
logger.info("[Client Acceptor] Received RequestResponse[route={}]", route);
String metadataUtf8 = payload.getMetadataUtf8();
String dataUtf8 = payload.getDataUtf8();
logger.info("[Client Acceptor] This Req&Resp contains data: {}, metadata: {}", dataUtf8, metadataUtf8);
payload.release();
if ("request.status.callback".equals(route)) {
return Mono.just(ByteBufPayload.create("Thanks for handling my task!"));
} else if ("request.server.call".equals(route)) {
return Mono.just(ByteBufPayload.create("You called my handler actively from server!"));
}
byte[] respBytes = String
.format("Client received your message, but no handler matched. Your meta is %s and data is %s",
metadataUtf8, dataUtf8).getBytes();
return Mono.just(DefaultPayload.create(respBytes));
}
))
// 设置重连策略
.reconnect(Retry.backoff(2, Duration.ofMillis(500)))
.connect(
TcpClientTransport.create(
TcpClient.create()
.host("127.0.0.1")
.port(8099)))
.block();
在这里我们设置客户端能够接收 RequestResponse 类型的服务端请求,仔细观察可以看到,服务端发送的请求也是可以携带包含路由信息的 metadata 的,在客户端,我们也可以根据 Payload 中的路由信息将请求分发到不同方法中处理.
为了方便演示,如果服务端调用时指定的路由信息是 request.status.callback ,那么服务端就是在完成一个由客户端发起的,异步执行的任务后调用客户端的回调函数返回任务执行结果.
如果服务端调用时指定的路由信息是 request.server.call ,那么服务端就是在主动调用客户端以获取一些状态信息.
当然,使用上面的代码设置客户端可被调用的 RSocket 方法有一个局限性,那就是我们只能设置 RequestResponse FireAndForget RequestStream Channel 这四种通信模式的其中一种。也就是说,用这种方法,服务端无法同时向服务端发出 RequestResponse FireAndForget RequestStream Channel 请求。本文会在第四部分展示如何让客户端支持同时响应这四种通信模式.
如果客户端提交一个耗时任务,服务端可以接受这个任务然后立刻返回响应:“任务提交成功”,然后执行任务。当任务执行完,服务端再使用回调函数将结果返回给客户端.
我们不妨将执行任务的模块封装成一个 Spring Service:
@Service
public class RequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(RequestProcessor.class);
public void processRequests(RSocketRequester rSocketRequester, UUID uuid) {
logger.info("[RequestProcessor.processRequests]I'm handling this!");
ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("request.status.callback"));
Mono.just("Your request " + uuid + " is completed")
.delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(10, 15)))
.flatMap(
m -> rSocketRequester.rsocketClient()
.requestResponse(
Mono.just(ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
String.format("[TASK %s]This is a task result from server using spring.", uuid)),
routeMetadata
)))
.doOnSuccess(p -> logger.info("[RequestProcessor.processRequests]Received from client: {}", p.getDataUtf8()))
)
.subscribe();
}
}
这个 Service 中的方法接收一个 RSocketRequester 和一个 任务的 UUID,当任务完成时,这个方法会生成一个 Payload 存放任务结果,指定 metadata 中的路由信息为 request.status.callback 。这样客户端在收到这个 RequestResponse 时就能知道这是一个已经提交任务的回调。在这里我们使用 delayElement 模拟处理任务时耗时的操作.
值得注意的是, RSocketRequester 参数的来源,我们在编写服务端接收任务提交的方法时可以将其作为参数,这是 Spring RSocket 的固定用法,这样就可以拿到服务端-客户端连接的 RSocketRequester 实例,然后就可以在 Service 中通过 RSocketRequester 实例调用客户端的回调函数:
@MessageMapping("handler.task")
public Mono<String> task(String request, RSocketRequester rSocketRequester) {
logger.info("[handler.request]Client request: {}", request);
UUID uuid = UUID.randomUUID();
this.requestProcessor.processRequests(rSocketRequester, uuid);
return Mono.just(uuid.toString());
}
我们在 【RSocket】使用 RSocket (一)——建立连接 一文中已经在连接建立的时刻将客户端-服务端连接的 RSocketRequester 实例保存在一个 ConcurrentHashMap 中了。我们可以通过一些机制,比如定时任务,或者使用 REST API 向服务端下命令的方式,让服务端主动调用已经建立连接的客户端的 RSocket 方法.
在这个示例里,我们编写两个 REST API,一个 API 返回所有已连接到服务端的客户端信息,包括客户端 UUID、连接建立的时间等:
@ResponseBody
@GetMapping("/client/list")
public List<ConnectedClientDto> clientsInfo() {
List<ConnectedClientDto> info = new ArrayList<>();
RSocketController.clientsManager.clients.forEach((key, value) -> {
info.add(new ConnectedClientDto(key, value.connectedTime));
});
return info;
}
另一个 API 用于触发服务端向客户端发送请求:
@GetMapping("/client/call")
public ServerResponse callFromServer(String clientRoute, String clientUUID) {
RSocketRequester requester = RSocketController.clientsManager.getClientRequester(clientUUID);
if (requester == null) {
return new ServerResponse("failed: client rSocket has closed.");
}
ByteBuf routeMetadata = TaggingMetadataCodec
.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList(clientRoute));
Mono.just("Server is calling you.")
// .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(5, 10)))
.flatMap(m -> requester.rsocketClient().requestResponse(
Mono.just(
ByteBufPayload.create(
ByteBufUtil.writeUtf8(
ByteBufAllocator.DEFAULT,
"This is a message from server using spring-stack."),
routeMetadata)))
.doOnSubscribe(subscription -> logger.info("subscribed."))
.doOnError(throwable -> logger.error("Error when calling client: {}", throwable.toString()))
.doOnSuccess(p -> logger.info("[test.connect.requester]Received from client: {}.", p.getDataUtf8()))
)
.subscribe();
return new ServerResponse(String.format("request from server has sent to the client %s.", clientUUID));
}
我们首先启动服务端再启动客户端,然后测试上述两个 API:
启动两个客户端和服务端后查看连接信息 。
向其中一个客户端发送一个请求 。
可以从客户端的输出看到客户端接收到了这次请求 。
前面我们提到如果使用 .acceptor(SocketAcceptor.for...) 来添加客户端可以被调用的方法时,只能指定四种通信模式中的一种.
这时候,我们可以实现 io.rsocket.SocketAcceptor 接口,重写 accept 方法, accept 方法的返回值是 Mono<RSocket> ,我们可以实现 RSocket 接口并重写其中 fireAndForget requestResponse requestStream requestChannel 四个方法来达到让客户端同时接收四种通信模式的目的.
首先实现 RSocket 接口,并重写其中的方法:
// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/service/ClientService.java
public class ClientService implements RSocket {
Logger logger = LoggerFactory.getLogger(ClientService.class);
static String decodeRoute(ByteBuf metadata) {
final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
return routingMetadata.iterator().next();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
logger.info("Receiving: " + payload.getDataUtf8());
return Mono.empty();
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
logger.info("Receiving: " + payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Client received your RequestResponse"));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.range(-5, 10)
.delayElements(Duration.ofMillis(500))
.map(obj ->
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.range(-5, 10)
.delayElements(Duration.ofMillis(500))
.map(obj ->
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
}
}
这只是一个示例,如果业务需要也可以解析 Payload 中的 metadata 来实现路由.
接下来我们实现 RSocketAcceptor 接口:
// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/SocketAcceptorImpl.java
public class SocketAcceptorImpl implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
return Mono.just(new ClientService());
}
}
然后我们在初始化客户端的时候这样设定 Acceptor 即可:
RSocket socket = RSocketConnector.create().acceptor(new SocketAcceptorImpl())
最后此篇关于【RSocket】使用RSocket(三)——服务端主动调用客户端方法的文章就讲到这里了,如果你想了解更多关于【RSocket】使用RSocket(三)——服务端主动调用客户端方法的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我有一个数组列表。 ArrayList arrayList = new ArrayList<>(); arrayList.add("timestamp"); arrayList.add("Code")
我想知道如何控制 Navlink 的类名,如果实际路径是 X,则 NavLink 的类是事件的。我在使用 Laravel 和简单的 bootstrap 之前做过它,但我不知道如何用 React 和 R
我的单页网站目前使用箭头键平滑滚动到 anchor 。问题是当您滚动到每个部分时,悬停的链接不会跟随每个部分。它仅遵循您的箭头键命令。我怎样才能改变这个?这是当前网站 ( http://www.jon
我们有一个主动/被动拓扑,其中有两个具有共享原始存储的 x86 复合体,其中在给定时刻只有一个节点可以访问共享存储(也称为主动节点)。如果主动节点发生故障转移,被动节点将启动接管并成为可以访问共享存储
有源 GPS 会消耗多少电量?没有 gps 导航器软件的开销。秒我想每 2 分钟对 gps 进行一次采样并将其保存到文件中。这将花费我多少电池电量? 我的生命周期会缩短 10% 吗? 20%? ..?
AWS今年针对主动-被动设置(全局数据存储)引入了跨区域复制。这意味着有一个用于读取/写入的主Redis群集和一个用于读取的辅助群集。 就我而言,我们想在不同区域中使用 Active-Active R
我正在使用斯坦福标记器来确定词性。但是,我想从文本中获取更多信息。是否有可能获得更多信息,例如句子的时态或是否处于主动/被动状态? 到目前为止,我使用的是非常基本的 PoS 标记方法: List> t
我尝试将 SSL 与 Active MQ 创建的 JMX 连接器一起使用,但没有成功。我能够让 SSL 与 JVM 平台 JMX 连接器一起工作,但这需要存储 keystore 和信任库密码明文,这对
RFC 1006 TCP 连接中主动连接和被动连接有何区别? 最佳答案 这里有解释:https://www.rfc-editor.org/rfc/rfc793 A passive OPEN reque
在 MarkLogic 7 中,副本是主动-主动还是主动-被动? 最佳答案 您是在询问本地磁盘故障转移(又名林复制)、数据库复制还是灵活复制? 事实上,这三个都是为主动-被动使用而设计的:换句话说,单
我正在使用 Windows Azure 网站和 Web 作业。 我有一个控制台应用程序,用于每晚下载 FTP 文件。他们最近从被动 FTP 切换为主动 FTP。我对此没有任何控制权。 所附代码在我的计
我是 JNDI 和 JMS 技术的初学者。 我的 JNDI 文件为: java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQIn
我使用 Active Android在我的项目中。试图了解更多关于它的信息,特别是关于表项属性的信息 - 类似于 ForeignKeyAction。 如果发生删除操作,我希望我的模型只删除它自己,而不
主-主双活mysql复制如何处理唯一键值。对于主键值,我们可以设置 auto_increment_increment 和 auro_increment_offset 参数以避免主键 auto_incr
我是 Linux 新手,我的应用程序在 Windows 上运行了一段时间,对于文件传输,我们使用第三方 sftp,它在主动和被动模式下都运行良好。 现在我正在迁移我的应用程序以支持 Linux 操作系
我在使用 SymmetricDS(开源版本)在 2 个 Postgres 服务器之间复制数据时遇到问题。以下是了解我的问题的相关信息: 我已经用 Vagrant 部署了 3 个服务器: symmetr
我们可以对 Azure SQL 数据库进行被动只读异步实时同步,以实现灾难恢复。 但我们的要求是在两个事件读写数据库之间实现实时同步,以便为世界不同地点的客户提供低延迟。 例如: I'm provid
我们可以对 Azure SQL 数据库进行被动只读异步实时同步,以实现灾难恢复。 但我们的要求是在两个事件读写数据库之间实现实时同步,以便为世界不同地点的客户提供低延迟。 例如: I'm provid
在我的 TCP 服务器上,我希望有: 非阻塞被动套接字具有非阻塞accept(); 接受连接后,我想执行一些身份验证,例如验证客户端提供的 ID 和密码。所以我有明确定义的协议(protocol),我
我的应用中有 anchor 链接。如果 anchor 链接处于事件状态,如何使事件样式发挥作用。 AnchorLink 最佳答案 对于当前事件链接的样式,您可以为参数 activeClassName
我是一名优秀的程序员,十分优秀!