gpt4 book ai didi

java - 同步异步后端

转载 作者:行者123 更新时间:2023-12-01 16:39:45 26 4
gpt4 key购买 nike

我需要创建一个 REST 端点,它将“同步”由 JMS 工作的后端服务的请求和响应。换句话说,我的端点应该向 JMS 输入队列发送消息,在 JMS 输出队列中等待响应。如果在超时时间内没有响应,则错误将返回给消费者。对于消费者来说,这个端点应该看起来像一个正常的同步请求/响应。

目前我已经使用 java.util.concurrent.Exchanger 实现了它。我的代码(简化):

REST 端点:

@RestController
public class Endpoint {

private ConcurrentMap<String, Exchanger> exchangers = new ConcurrentHashMap<>();

@GetMapping("/data/{requestId}")
public ResponseEntity<String> getData(@Parameter(in = ParameterIn.PATH, required = true) String requestId) {
Exchanger<String> syncExchanger = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
// wait for JMS response and return it
return waitForResponse(syncExchanger, requestId, timeout);
}

private synchronized Exchanger<String> createAndPutIfNotExists(String requestId) {
if (exchangers.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
Exchanger<String> exchanger = new Exchanger<>();
exchangers.put(requestId, exchanger);
return exchanger;
}

private String waitForResponse(Exchanger<String> exchanger, String requestId, int timeout) {
try {
return exchanger.exchange(null, timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "interrupted";
} catch (TimeoutException e) {
throw new TimeoutException("timeout on waiting JMS response.", e);
} finally {
exchangers.remove(requestId);
}
}

@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
Exchanger<String> exchanger = exchangers.get(requestId );

if (exchanger != null) {
try {
exchanger.exchange(payload);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
exchangers.remove(requestId );
}
}
}
}

该解决方案有效。但它在等待响应时阻塞请求线程。那么Web服务器线程池在高负载时就会超出限制。

有没有办法以非阻塞的方式做到这一点?

类似这样的事情:

@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData() {
return CompletableFuture.supplyAsync(() -> {
sendToJMS(requestId);

// How to wait for JMS response with some timeout ?

});
}

@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();

// How to "complete" CompletableFuture ?

}

最佳答案

Spring 接受 CompletableFuture 作为 Controller 中的返回类型,因此您可以在 createAndPutIfNotExists() 中创建一个返回类型,并在 onMessage() 中完成它.

将您的交易所映射替换为 future 映射:

private ConcurrentMap<String, CompletableFuture<String>> futures = new ConcurrentHashMap<>();

然后调整发送部分:

@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData(@PathParam("requestId") String requestId) {
CompletableFuture<String> future = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
CompletableFuture<String> result = future.orTimeout(timeout, TimeUnit.SECONDS);
result.thenRun(() -> futures.remove(requestId, future));
return result;
}

private synchronized CompletableFuture<String> createAndPutIfNotExists(String requestId) {
if (futures.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
CompletableFuture<String> future = new CompletableFuture<>();
futures.put(requestId, future);
return future;
}

请注意,超时处理是使用 Java 9 的 orTimeout() 方法执行的。如果您使用的是 Java 8,则需要 custom timeout hanlding .

您可能还需要执行一些 thenApplyAsync(s -> s, executor) 技巧,将响应提交移出 JMS/超时处理线程。

最后,在收到响应时只需complete() future:

@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
CompletableFuture<String> future = futures.get(requestId);

if (future != null) {
try {
future.complete(payload);
} finally {
futures.remove(requestId, future);
}
}
}

关于java - 同步异步后端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61884367/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com