I have two rsocket-client and one rsocket-server. Both rsocket-client initially subscribe to the broadcast route listenCommand to receive commands from rsocket-server. Then one of the clients sends a command on route executeCommand to send it to other rsocketRequsters besides itself via the previously established route listenCommand connection. That is, I need to send the message to all rsocketRequester excluding myself through the already established connection between client and server. Now I get the error "reactor.core.Execptions$ErrorCallbackNotImplemented: ApplicationErrorExecption (0x201): Requset-Stream not implemented" and the second client does not receive a command from the listenCommand subscription. I've seen that it is suggested to make a client-side handler, but is it possible to solve this problem on the server? Maybe I'm missing something important?
我有两个r套接字客户端和一个r套接字服务器。两个RSocket客户端最初都订阅广播路由ListenCommand以接收来自RSocket服务器的命令。然后,其中一个客户端通过先前建立的路由侦听命令连接,在ROUTE ExecuteCommand上发送命令,以将其发送到除其自身之外的其他rsocketRequster。也就是说,我需要通过客户端和服务器之间已经建立的连接将消息发送给除我之外的所有rsocketRequester。现在,我收到错误“reactor.core.Execptions$ErrorCallbackNotImplemented:应用程序错误执行(0x201):请求集-流未实现”,并且第二个客户端没有收到来自listenCommand订阅的命令。我已经看到建议制作一个客户端处理程序,但在服务器上可以解决这个问题吗?也许我错过了什么重要的东西?
Expected result: the client that sends the command will not receive it over the broadcast connection, but the other client will.
预期结果:发送该命令的客户端不会通过广播连接收到该命令,但另一个客户端会收到该命令。
Rscoket-server controller:
RSCOKET服务器控制器:
private Sinks.Many<String> executedCommandSink = Sinks.many().multicast()
.directBestEffort();
@MessageMapping("project.{projectId}.command")
public Flux<String> listenCommand(@DestinationVariable String projectId, RSocketRequester requester) {
return executedCommandSink.asFlux();
}
@MessageMapping("project.{projectId}.command.execute")
public Mono<?> executeCommand(
@DestinationVariable String projectId, Mono<String> commandMono, RSocketRequester requester) {
Set<RSocketRequester> requesters = getRequestersForSendCommand(projectId, requester);
return commandMono.flatMap(command -> {
sendGraphCommand(requesters, command);
return Mono.just(command);
});
}
private void sendCommand(Set<RSocketRequester> requesters, String command) {
requesters.forEach(requesterToSend -> requesterToSend
.route("project.{projectId}.command", command)
.data(Flux.just(command))
.retrieveFlux(String.class)
.subscribe());
}
public Set<RSocketRequester> getRequestersForSendGraphCommand(String projectId, RSocketRequester requester) {
Set<RSocketRequester> sessionsByProject = ConcurrentHashMap.newKeySet();
allRequesters.forEachEntry(1L, entry -> {
if (entry.getValue().equals(projectId) && entry.getKey() != requester) {
sessionsByProject.add(entry.getKey());
}
});
return sessionsByProject;
}
Rsocket-client:
RSocket-客户端:
@Autowired
public RSocketManagerClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
clientUUID = UUID.randomUUID().toString();
log.info("Connecting using client ID: {}", clientUUID);
this.rsocketRequester = rsocketRequesterBuilder
.setupData(clientUUID)
.rsocketStrategies(strategies)
.connectWebSocket(URI.create("ws://127.0.0.1:8307/rsocket"))
.subscribeOn(Schedulers.parallel())
.block();
this.rsocketRequester.rsocket()
.onClose()
.doFirst(() -> log.info("Client: {} CONNECTED.", clientUUID))
.doOnError(error -> log.error("Connection to client {} CLOSED", clientUUID))
.doFinally(consumer -> log.info("Client {} DISCONNECTED", clientUUID))
.subscribe();
public void executeCommand() {
log.info("\nClient with id-{} subscribe on execute command", clientUUID);
String block = this.rsocketRequester
.route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command.execute")
.data("Hello")
.retrieveMono(String.class)
.doOnNext(System.out::println)
.block();
log.info("Response: {}", block);
}
public void subscribeOnCommand() {
log.info("\nClient with id-{} subscribe on command", clientUUID);
this.rsocketRequester
.route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command")
.retrieveFlux(String.class)
.doOnNext(System.out::println)
.subscribe();
}
更多回答
优秀答案推荐
我是一名优秀的程序员,十分优秀!