gpt4 book ai didi

java - 如何在项目reactor中设置阻塞异步请求/响应?

转载 作者:行者123 更新时间:2023-12-02 02:17:31 29 4
gpt4 key购买 nike

我正在与 ANT+ USB 棒连接,并用项目 react 器替换我自己天真的“MessageBus”,因为它看起来非常合适。
USB接口(interface)本质上是异步的(单独的输入/输出管道),我想以阻塞方式处理一组请求/响应消息。

我已经设置了一个单独的线程,它不断地从 USB 管道读取消息并将它们写入接收器,该接收器提供任何人都可以订阅的共享 Flux。这似乎工作正常。

目前,我向 USB 管道发送一条消息,然后在共享通量上使用 .filter() 和 .blockFirst():(人为代码)

    /**
* Puts a message on the Usb out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}
*
* @param message Message to send.
* @return related response message.
*/

public AntMessage sendBlocking(AntBlockingMessage message) {
send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void
// bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.
return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite) Flux<AntMessage>
.filter(antMessage -> antMessage.getMessageId() == message.getMessageId())
.blockFirst(Duration.ofSeconds(10));
}

问题在于,即使在激活 Flux 之前,U 盘也可以响应,从而导致 TimeoutException。
向 USB 读卡器添加 Thread.sleep(10) 可以“解决”该问题,但是实现这种阻塞行为的正确方法是什么?

  • 设置订阅(使用 .take(1))、发送消息然后阻止订阅?
  • 设置一个 Flux 来完成发送和等待正确响应?

我想不通……

最佳答案

我找到了一个可行的解决方案,但我不确定它是否是最好的:

我设置了一个 Mono 来发送异步消息,并将其与一个过滤匹配消息的 Flux 合并。看到 Mono 从不发出值,我知道合并中的第一个对象是来自 Flux 的响应消息,因此我可以将其转换为正确的类型。

这仍然感觉有点脏,但话又说回来,尝试使用用于异步工作的框架来获得阻塞行为总是会感觉有点脏......

    public AntMessage sendBlocking(AntBlockingMessage requestMessage) {
Flux<AntMessage> response = this.antUsbReader.antMessages()
.filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
.take(1);

Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));
return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));
}

private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {
if (message instanceof RequestMessage) {
return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();
}
return response.getMessageId() == message.getMessageId();
}

关于java - 如何在项目reactor中设置阻塞异步请求/响应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57294242/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com