- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
Source Code: https://github.com/joexu01/rsocket-demo 。
让我们来简单复习一下 RSocket 的四种通信模式:
即发即忘 - FireAndForget:立即发送一个请求,无需为这个请求发送响应报文。适用于监控埋点,日志上报等,这种场景下无需回执,丢失几个请求无伤大雅 。
请求响应 - RequestResponse:请求方发送一条请求消息,响应方收到请求后并返回一条响应消息。传统的HTTP是典型的Request-Response 。
流式响应 - RequestStream:请求方发送一个请求报文,响应方发回N个响应报文。传统的MQ是典型的RequestStream 。
双向通道 - Channel:创建一个通道上下文,双方可以互相发送消息。IM是个典型的RequestChannel通讯场景 。
*本篇文章的客户端示例文件在 rsocket-client-raw/src/main/java/org/example/FourCommunicationScheme.java 。
我们使用 decodeRoute 和 encodeRoute 函数来解码和编码路由信息.
static String decodeRoute(ByteBuf metadata) {
final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
return routingMetadata.iterator().next();
}
static ByteBuf encodeRoute(String route) {
return TaggingMetadataCodec.createTaggingContent(
ByteBufAllocator.DEFAULT,
Collections.singletonList(route));
}
服务端处理函数 。
在这里我们编写一个简单的 Handler,它的 Route 是 test.echo ,它接收一个请求并返回请求 Payload 的 data 中的字符串.
@MessageMapping("test.echo")
public Mono<String> simplyEcho(String data) throws InterruptedException {
Thread.sleep(1500);
logger.info("[test.echo]Received echo string from client: {}", data);
return Mono.just(String.format("[test.echo]I received your string: %s. Thank you.", data));
}
注意,这里的参数也可以是 Mono<String> ,然后对 Mono 进行操作并返回。事实上,如果严格按照响应式编程的策略,这里应该直接对 Mono 进行操作.
客户端发送请求 。
ByteBuf routeMetadata = encodeRoute("test.echo");
Payload echoPayload = ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "This is a message from client using rsocket-java library."),
routeMetadata);
Mono<Payload>
。然后我们对这个 Mono 设定一些操作(具体操作请看代码注释):
Mono<Payload> requestResponse = socket.requestResponse(echoPayload);
requestResponse
// 当 subscribe() 操作开始执行时打印一下日志
.doOnSubscribe(subscription -> logger.info("Test1 subscribed to {}", subscription.toString()))
// 当携带的请求成功后要做的事情
.doOnSuccess(payload -> {
logger.info("Test1 - Successfully returned: {}", payload.getDataUtf8());
payload.release();
})
.doOnError(throwable -> logger.info("Test1 doOnError: {}", throwable.toString()))
// 可以使用 timeout 丢弃等待超时的 Mono
//.timeout(Duration.ofSeconds(1))
// 可以使用 doOnTerminate 在请求结束后做一些工作
// .doOnTerminate(() -> {})
// 但是一定要设置 doOnError
//.doOnError(TimeoutException.class, e -> logger.info("Test1 doOnError: {}", e.toString()))
// .onErrorReturn(TimeoutException.class, DefaultPayload.create("Payload: Test1 - timeout"))
// 可以使用 log() 来观察数据的状态
//.log()
// 客户端在执行 subscribe() 操作时才会开始从服务端接收数据流
// 在响应式编程中使用 subscribe 操作符是订阅一个数据流并处理发布的数据、错误和完成信号的核心方式之一
.subscribe();
请求发出后主线程不会阻塞,所以我们需要使用 socket.onClose().block(); 保持连接.
然后我们尝试运行服务端和客户端,看看一看客户端的输出:
[main] INFO org.example.RSocketClientRaw - My UUID is 0718ef3b-9ee0-42f1-9003-700a8aa9a98d
[main] INFO org.example.RSocketClientRaw - Test1 subscribed to RequestResponseRequesterMono
[reactor-tcp-epoll-2] INFO org.example.RSocketClientRaw - Test1 - Successfully returned: [test.echo]I received your string: This is a message from client using rsocket-java library.. Thank you.
服务端日志:
2023-03-12 21:47:29.291 INFO 32099 --- [or-http-epoll-2] o.example.controller.RSocketController : [connect.setup]Client connection: 0718ef3b-9ee0-42f1-9003-700a8aa9a98d
2023-03-12 21:47:32.304 INFO 32099 --- [or-http-epoll-2] o.example.controller.RSocketController : [test.echo]Received echo string from client: This is a message from client using rsocket-java library.
客户端成功地发出请求并收到来自服务端的回复.
服务端 。
@MessageMapping("upload.log")
public void fireAndForgetHandler(@Headers Map<String, Object> header, RSocketRequester requester, String data) {
header.forEach((k, v) -> System.out.printf("[upload.log]header key: %s, val: %s\n", k, v));
System.out.printf("[upload.log]UploadEventLogs: Received log string from client: %s\n", data);
}
服务端接受一个请求,不返回任何结果(Fire'n'Forget),只在服务端打印 Header 的内容.
客户端 。
// 测试 FnF
routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("upload.log"));
socket.fireAndForget(
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "This is a log from client using rsocket-java library."),
routeMetadata))
.doOnSubscribe(subscription -> logger.info("Test2 - Fire And Forget onSubscribe: {}", subscription.toString()))
.subscribe();
客户端输出:
[main] INFO org.example.RSocketClientRaw - Test2 - Fire And Forget onSubscribe: FireAndForgetRequesterMono
服务端输出:
2023-03-10 15:10:25.675 INFO 5318 --- [or-http-epoll-4] o.example.controller.RSocketController : [test.echo]Received echo string from client: This is a message from client using rsocket-java library.
[upload.log]header key: dataBufferFactory, val: NettyDataBufferFactory (PooledByteBufAllocator(directByDefault: true))
[upload.log]header key: rsocketRequester, val: org.springframework.messaging.rsocket.DefaultRSocketRequester@607cc59
[upload.log]header key: lookupDestination, val: upload.log
[upload.log]header key: contentType, val: application/binary
[upload.log]header key: rsocketFrameType, val: REQUEST_FNF
[upload.log]UploadEventLogs: Received log string from client: This is a log from client using rsocket-java library.
服务端 。
服务端接收一个 Mono<String> 然后返回给客户端包含 10 个 String 的 Flux .
事实上,严格按照响应式编程的策略,这里应该直接对 Mono 进行操作,可以使用 flatMapMany() 把生成的数据流通过异步方式处理,扩展出新的数据流。下面是扩展新数据流的简单示例:
Mono.just(3)
.flatMapMany(i -> Flux.range(0, i))
.subscribe(System.out::println);
在这里为了演示方便就先打印 Mono 然后新生成一个 Flux .
@MessageMapping("handler.request.stream")
public Flux<String> responseStreaming(Mono<String> request) {
request
.doOnNext(s -> logger.info("[handler.request.stream]: {}", s))
// 可以使用 then() 结束操作链
.then()
.subscribe();
return Flux
.range(1, 10)
.map(idx -> String.format("Resp from Server: %s, Thank you!", idx));
}
客户端 。
请看代码注释来理解对数据流 Flux 的各种操作:
// 测试 RequestStream
routeMetadata = encodeRoute("handler.request.stream");
Flux<Payload> requestStream = socket.requestStream(
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "TEST3 - Request&Stream"),
routeMetadata));
requestStream
// 当然可以使用 map 对每个 Payload 进行操作,这会改变数据
// .map(payload -> System.out.printf("%s\n", payload.getDataUtf8()))
.doOnSubscribe(subscription -> logger.info("Test3 subscribed to {}", subscription.toString()))
// 使用 doOnNext 不会对流的数据进行改变
// doOnNext()是一个 Reactor 式流操作符,它允许编写者注册一个在每次出现新元素时执行的回调函数
.doOnNext(nextPayload -> System.out.println("Test3 Received payload: " + nextPayload.getDataUtf8()))
// 当需要从流中选择一些特定的元素时,可以使用 Flux.take(long n) 操作符
// 该操作符将创建一个新的 Flux,该 Flux 包含原始 Flux 的前 n 个元素
// take 操作符发出了指定数量的元素之后,就不再接收任何元素,并且将取消其上游发布者的订阅
// 在这里服务端使用 Flux.range 来限定 Flux 流中的元素个数
// 如果服务端使用 Flux.interval 生成一个无限长度的流,客户端使用 take 接收限定个数的元素
// 便会取消发布者的订阅
.take(5)
.subscribe();
客户端输出结果:
[main] INFO org.example.RSocketClientRaw - My UUID is 28afc749-75e1-4289-8607-14810103de6c
[main] INFO org.example.RSocketClientRaw - Test3 subscribed to RequestStreamRequesterFlux
Test3 Received payload: Resp from Server: 1, Thank you!
Test3 Received payload: Resp from Server: 2, Thank you!
Test3 Received payload: Resp from Server: 3, Thank you!
Test3 Received payload: Resp from Server: 4, Thank you!
Test3 Received payload: Resp from Server: 5, Thank you!
服务端接收到了请求:
2023-03-12 22:01:33.520 INFO 32099 --- [or-http-epoll-3] o.example.controller.RSocketController : [handler.request.stream]: TEST3 - Request&Stream
服务端 。
服务端接收来自客户端的整数字符串,将它们乘以2以后发送回去。我们不妨把处理客户端请求流的函数封装为一个 Spring Service:
@Service
public class MathService {
public Flux<String> doubleInteger(Flux<String> request) {
return request
.map(s -> {
System.out.println("received " + s);
int i = Integer.parseInt(s);
return String.valueOf(i * 2);
});
}
}
编写处理函数:
@Autowired
private MathService mathService;
@MessageMapping("handler.request.channel")
public Flux<String> responseChannel(Flux<String> payloads) {
return this.mathService.doubleInteger(payloads);
}
客户端 。
Flux<Payload> payloadFlux = Flux.range(-5, 10)
.delayElements(Duration.ofMillis(500))
.map(obj ->
{
ByteBuf metadata = encodeRoute("handler.request.channel");
return ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString()), metadata);
});
Flux<Payload> channelResp = socket.requestChannel(payloadFlux);
channelResp
.doOnSubscribe(subscription -> logger.info("Test4 subscribed to {}", subscription.toString()))
.doOnError(throwable -> logger.info(throwable.toString()))
.doOnNext(nextPayload -> System.out.println("Test4 Received payload: " + nextPayload.getDataUtf8()))
.subscribe();
客户端输出:
[main] INFO org.example.RSocketClientRaw - My UUID is 96ff8fe7-416c-4607-9518-463114725a7a
[main] INFO org.example.RSocketClientRaw - Test4 subscribed to RequestChannelRequesterFlux
Test4 Received payload: -10
Test4 Received payload: -8
Test4 Received payload: -6
Test4 Received payload: -4
Test4 Received payload: -2
Test4 Received payload: 0
Test4 Received payload: 2
Test4 Received payload: 4
Test4 Received payload: 6
Test4 Received payload: 8
服务端输出:
2023-03-12 22:07:05.542 INFO 33083 --- [or-http-epoll-2] o.example.controller.RSocketController : [connect.setup]Client connection: 96ff8fe7-416c-4607-9518-463114725a7a
received -5
received -4
received -3
received -2
received -1
received 0
received 1
received 2
received 3
received 4
下一篇文章会展示服务端如何主动调用客户端的函数。如有错误欢迎在评论区批评指正! 。
最后此篇关于【RSocket】使用RSocket(二)——四种通信模式实践的文章就讲到这里了,如果你想了解更多关于【RSocket】使用RSocket(二)——四种通信模式实践的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
大多数语言都支持双向进程通信。例如,在 Python 中,我可以(草率地)执行以下操作: >>> from subprocess import * >>> p = Popen('nslookup',
致力于使用 C++ 在 arduino 和 PC (Win 7) 之间进行通信。使用 WriteFile 和 ReadFile 创建通信或简单地发送或接收数据没有问题。但是当我想以某种方式“协调”沟通
我们正在开发一个基于微服务的应用程序。它们将使用 Helm Package Manager 部署到 kubernetes,并且它们都存储了自己的存储库和 helm chart。以下是我们微服务的名称。
我正在开发一个大型 MVVM 应用程序。我为此使用了 MVVM 轻量级工具包。该应用程序就像一个带有后退和前进按钮的网络浏览器。主视图是一个用户控件。我在主视图用户控件中放置了后退和前进按钮。主视图又
我在 java 和 freepascal(lazarus) 应用程序之间的通信有问题。我使用套接字。它们正确连接。一切都很顺利,直到我想从一个应用程序向另一个应用程序发送一些东西。在java而不是“a
我已经使用客户端套接字和服务器套接字使用C#编写了群聊。 当我使用VS 2017在自己的PC中运行程序(服务器和客户端)时,客户端和服务器之间的通信工作正常。 当我在笔记本电脑中运行客户端程序,并在自
Kubernetes 中两个不同 Pod 之间的通信是如何发生的? 就我而言,我有两个 Pod:前端和后端,它们都有不同的容器。 我希望我的前端 pod 与后端 pod 通信,但我不想使用后端 pod
我正在尝试在浏览器中嵌入的 flash 实例与在 C# WinForms 应用程序中运行的 flash 实例之间进行通信...我收到一个编译错误,内容为: 1119 Access of possibl
鉴于网络上缺乏信息,请问一个问题:我要在 Android 中创建一个应用程序,使用一个数据库应用程序 rails 。为此,我需要一个手动 session 。所以如果有人准备好了示例/教程显示通信 an
我正在编写一个应用程序,它将通过 MySQL 数据库对用户进行身份验证。我已经用 Java (android) 编写了它,但现在正在移植到 Windows 手机。 PHP 文件使用 $get 然后回显
是否可以通过互联网在两个不同设备上的两个不同应用程序之间建立通信。我想从设备 A 上的应用程序点击一个设备 B 上的应用程序,然后从设备 B 上的应用程序获取数据到设备 A 上的应用程序。如果可能,如
这是脚本: 它被放置在其他网站上。 com 并显示一个 iframe。如果有人点击 iframe 中的某个内容,脚本应该将一个 div 写入 othersite 。 com. 所以我的问题是如何做到
你好我是 php 的新手,我用 c++ 编写了整个代码并想在 php 中使用这段代码。所以我为我的代码制作了 dll 以使用它。但是我不能在 php 中使用这个 dll,可以谁能给我完整的代码来使用
我确定之前已经有人问过(并回答过)此类问题,所以如果是这样,请将我链接到之前的讨论... 在 C++ 中,假设我有一个 ClassA 类型的对象,其中包含一个 ClassB 类型的私有(private
我正在尝试使用 ATmega32 进行串行通信。首先,我使用 RS232,使用 USB-to-RS232 建立使用串行终端的接收和传输(在我的例子中是 tera 术语)。无论我从串行终端 Atmega
我找不到适用于 Ruby 的 SSL 实现。 我的部分项目需要服务器和客户端之间的安全通信链接,我希望为此使用 SSL 以创建安全 session 。 谢谢 最佳答案 如果你使用 Ruby 1.9.x
我正在尝试在客户端/服务器之间进行 SSL 通信。 到目前为止,我已经从 keystore 创建了 java.security.cert.X509Certificate。接下来我应该怎么做才能使这次沟
我在与 Windows 上的 USB 设备 通信时遇到问题。我不能使用 libusb 或 WinUSB,因为我有一个特定的驱动程序(Silabs USB 到 UART,这是一个 USB 到串口的桥接器
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我发现 xcom 实际上是将数据写入数据库并从其他任务中提取数据。我的数据集很大,将其腌制并写入数据库会导致一些不必要的延迟。有没有办法在不使用 xcom 的情况下在同一 Airflow Dag 中的
我是一名优秀的程序员,十分优秀!