gpt4 book ai didi

java - SpringBoot ListenableFuture中如何处理异常

转载 作者:行者123 更新时间:2023-12-02 11:12:24 31 4
gpt4 key购买 nike

所以我有一个像这样启动的 SpringBoot 端点 Controller :

@RequestMapping(value = "/post", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public Response post(@Valid @RequestBody Message message) throws FailedToPostException {
message.setRecieveTime(System.currentTimeMillis());

return this.service.post(message);
}

以及 post 函数:

public Response post(Message message) throws FailedToPostException{


ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("topicName", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {

@Override
public void onSuccess(SendResult<String, Message> result) {
LOGGER.info("Post Finished. '{}' with offset: {}", message,
result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
LOGGER.error("Message Post Failed. '{}'", message, ex);

long nowMillis = System.currentTimeMillis();
int diffSeconds = (int) ((nowMillis - message.getRecieveTime()) / 1000);

if (diffSeconds >= 10) {
LOGGER.debug("timeout sending message to Kafka, aborting.");
return;
}
else {
post(message);
}
}
});

LOGGER.debug("D: " + Utils.getMetricValue("buffer-available-bytes", kafkaTemplate));

return new Response("Message Posted");
}

现在您可以看到,我们正在尝试确保,如果 kafkaTemplate.send 失败,我们将再次递归调用 post(message)到 10 秒,直到生产者内存缓冲区被清除并且消息通过。

问题是:

  1. 我们希望能够向端点的客户端返回失败响应(例如:“无法确认消息”)。
  2. 是否有更好的方法来处理上述代码中来自 Future 的异常?
  3. 有没有办法避免在这里使用递归函数?我们这样做是因为我们想尝试将消息传递到 Kafka 大约 10 秒,然后再将其作为电子邮件发送以供查看。

旁注:我仍然没有使用 kafkaTemplate.metrics() 中的 buffer-available-bytes 属性,我打算使用它来最大程度地减少出现此问题的可能性,但仍然需要处理上述问题以防出现某些竞争条件

最佳答案

有几种方法可以做到这一点,但我真的很喜欢 Spring Retry 作为解决此类问题的方法。这里有一些伪代码,但如果您需要有关如何执行此操作的更多细节,我可以使事情变得更明确:

@Retryable(maxAttempts = 10, value = KafkaSendException.class)
public Response post(Message message) throws FailedToPostException{
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("topicName", message);

try {
future.get(1. TimeUnit.SECONDS);
} catch(SomeException ex) {
LOGGER.error("Message Post Failed. '{}'", ex.getCause().getMessage(), ex);
throw ex;
}

LOGGER.info("Post Finished. '{}' with offset: {}", message,
result.getRecordMetadata().offset());

}

无需递归即可有效地执行相同的操作。我不建议使用递归代码来处理错误。

Controller 应该能够使用漂亮的@ExceptionHandler来处理实际的KafkaSendException

关于java - SpringBoot ListenableFuture中如何处理异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50536117/

31 4 0