gpt4 book ai didi

java - Java中基于可变属性返回Mono的方法的线程安全

转载 作者:行者123 更新时间:2023-12-02 18:39:40 25 4
gpt4 key购买 nike

在我的 Spring Boot 应用程序中,我有一个组件应该监视另一个外部系统的健康状态。该组件还提供了一个公共(public)方法, react 链可以订阅该方法以等待外部系统启动。

@Component
public class ExternalHealthChecker {
private static final Logger LOG = LoggerFactory.getLogger(ExternalHealthChecker.class);

private final WebClient externalSystemWebClient = WebClient.builder().build(); // config omitted

private volatile boolean isUp = true;
private volatile CompletableFuture<String> completeWhenUp = new CompletableFuture<>();

@Scheduled(cron = "0/10 * * ? * *")
private void checkExternalSystemHealth() {
webClient.get() //
.uri("/health") //
.retrieve() //
.bodyToMono(Void.class) //
.doOnError(this::handleHealthCheckError) //
.doOnSuccess(nothing -> this.handleHealthCheckSuccess()) //
.subscribe(); //
}

private void handleHealthCheckError(final Throwable error) {
if (this.isUp) {
LOG.error("External System is now DOWN. Health check failed: {}.", error.getMessage());
}
this.isUp = false;
}

private void handleHealthCheckSuccess() {
// the status changed from down -> up, which has to complete the future that might be currently waited on
if (!this.isUp) {
LOG.warn("External System is now UP again.");
this.isUp = true;
this.completeWhenUp.complete("UP");
this.completeWhenUp = new CompletableFuture<>();
}
}


public Mono<String> waitForExternalSystemUPStatus() {
if (this.isUp) {
LOG.info("External System is already UP!");
return Mono.empty();
} else {
LOG.warn("External System is DOWN. Requesting process can now wait for UP status!");
return Mono.fromFuture(completeWhenUp);
}
}
}

方法waitForExternalSystemUPStatus是公共(public)的,可以从许多不同的线程调用。其背后的想法是为应用程序中的一些 react 通量链提供一种暂停其处理直到外部系统启动的方法。当外部系统出现故障时,这些链无法处理其元素。

someFlux
.doOnNext(record -> LOG.info("Next element")
.delayUntil(record -> externalHealthChecker.waitForExternalSystemUPStatus())
... // starting processing

这里的问题是我无法真正理解该代码的哪一部分需要同步。我认为多个线程同时调用 waitForExternalSystemUPStatus 应该不会有问题,因为此方法没有编写任何内容。所以我觉得这个方法不需要同步。但是,用 @Scheduled 注解的方法也将在其自己的线程上运行,并且实际上会写入 isUp 的值,并且还可能更改 completeWhenUp 的引用到一个新的、未完成的 future 实例。我用 volatile 标记了这两个可变属性,因为通过阅读 Java 中的这个关键字,我觉得它将有助于保证读取这两个值的线程看到最新值。但是,我不确定是否还需要将 synchronized 关键字添加到部分代码中。我也不确定同步关键字是否与 react 器代码配合得很好,我很难找到这方面的信息。也许还有一种方法可以以更完整、 react 性的方式提供 ExternalHealthChecker 的功能,但我想不出任何方法。

最佳答案

我强烈建议不要采用这种方法。像这样的线程代码的问题是它变得非常难以遵循和推理。我认为您至少需要同步 handleHealthCheckSuccess() 的部分和waitForExternalSystemUPStatus()引用您的 completeWhenUp字段,否则你可能会遇到种族危险(只有一个写入,但写入后可能会乱序读取) - 但很可能还有其他我遗漏的东西,如果是这样,它可能显示为这些恼人的“百万分之一”类型的错误之一,几乎不可能确定。

不过,应该有一种更可靠、更简单的方法来实现这一目标。当您的 ExternalHealthChecker 时,我会创建一个通量,而不是使用 Spring 调度程序。组件创建如下:

healthCheckStream = Flux.interval(Duration.ofMinutes(10))
.flatMap(i ->
webClient.get().uri("/health")
.retrieve()
.bodyToMono(String.class)
.map(s -> true)
.onErrorResume(e -> Mono.just(false)))
.cache(1);

...哪里healthCheckStreamFlux<Boolean> 类型的字段。 (请注意,它不需要是 volatile 的,因为您永远不会替换它,因此不存在跨线程担忧 - 它是同一个流,将根据运行状况检查状态每 10 分钟更新一次不同的结果,无论线程如何您将从以下位置访问它。)

这实际上每 10 分钟创建一个健康检查响应值流,始终缓存最新的响应,并将其变成热源。这意味着“订阅之前不会发生任何事情”在这种情况下不适用 - Flux 将立即开始执行,任何线程上的任何新订阅者将始终获得最新结果,无论是通过还是失败。 handleHealthCheckSuccess()handleHealthCheckError() , isUp ,和completeWhenUp那么都是多余的,他们可以去 - 然后你的waitForExternalSystemUPStatus()只能变成一行:

return healthCheckStream.filter(x -> x).next();

...然后工作完成,您可以从任何地方调用它,您将得到一个 Mono仅当系统启动时才会完成。

关于java - Java中基于可变属性返回Mono的方法的线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68194116/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com