gpt4 book ai didi

java - 使用默认值中断 CompletableFuture

转载 作者:行者123 更新时间:2023-12-02 02:32:36 25 4
gpt4 key购买 nike

假设我有 3 项服务。首先,我调用 serviceA,它返回一个 CompletableFuture。之后,我将结果并行调用 serviceBserviceC (thenCompose())。在获得所有结果后,我想合并所有 3 个结果并将其返回给某个调用者。在调用者中,我想等待整个过程 X 毫秒,以便:

  • 如果我在 serviceA 调用正在进行时中断进程:抛出一些异常(因此这是强制性的)
  • 如果我在 serviceBserviceC 调用正在进行时中断进程:返回一些默认值(它们是可选的)。这就是我尝试使用 CompletableFuture
  • getNow(fallback) 方法的原因

请检查下面的代码片段,如果我在 serviceBserviceC 调用中使用较长的延迟,我总是会遇到 TimeoutException。我怎样才能做到这一点?

public CompletableFuture<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
return CompletableFuture.allOf(resultB, resultC)
.thenApply(ignoredVoid -> combine(
resultA.join(),
resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}

public Result extractFuture(CompletableFuture<Result> future) {
Result result;
try {
result = future.get(timeOut, MILLISECONDS);
} catch (ExecutionException ex) {
...
} catch (InterruptedException | TimeoutException ex) {
// I always ends up here...
}
return result;
}

最佳答案

.allOf(resultB, resultC) 返回的 future 仅当 resultBresultC 都完成时才完成,因此,依赖函数 ignoredVoid ->combine(resultA.join(), resultB.getNow(fallbackB), resultC.getNow(fallbackC) 仅当 resultB 时才会被求值resultC 已完成,提供后备根本没有任何效果。

通常不可能对这些函数中的 get() 调用使用react。考虑到将来可以在不同时间以不同的超时对任意数量的 get() 调用,这一点应该是显而易见的,但传递给 thenApply 的函数仅被评估一次。

getFuture() 中处理消费者指定超时的唯一方法是将其更改为返回接收超时的函数:

interface FutureFunc<R> {
R get(long time, TimeUnit u) throws ExecutionException;
}
public FutureFunc<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
CompletableFuture<Result> optimistic = CompletableFuture.allOf(resultB, resultC)
.thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(), resultC.join()));
return (t,u) -> {
try {
return optimistic.get(t, u);
} catch (InterruptedException | TimeoutException ex) {
return combine(resultA.join(), resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}
};
}

public Result extractFuture(FutureFunc<Result> future) {
Result result;
try {
result = future.get(timeOut, MILLISECONDS);
} catch (ExecutionException ex) {
...
}
return result;
}

现在,只要 B 或 C 尚未完成,就可以进行具有不同超时的不同调用,可能会产生不同的结果。并不是说 combine 方法存在一些含糊之处,这可能也需要一些时间。

您可以将函数更改为

return (t,u) -> {
try {
if(resultB.isDone() && resultC.isDone()) return optimistic.get();
return optimistic.get(t, u);
} catch (InterruptedException | TimeoutException ex) {
return combine(resultA.join(), resultB.getNow(fallbackB),
resultC.getNow(fallbackC));
}
};

等待可能已经运行的组合完成。无论哪种情况,都无法保证在指定时间内交付结果,因为即使使用了 B 和 C 的后备值,也会执行 combine,这可能需要任意时间时间量。

如果您想要类似取消的行为,即所有结果查询都返回相同的结果,即使它是使用第一个查询的后备值计算的,您也可以使用

public FutureFunc<Result> getFuture() {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC);
CompletableFuture<Result> result = bAndC
.thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(),
resultC.join()));
return (t,u) -> {
try {
bAndC.get(t, u);
} catch (InterruptedException|TimeoutException ex) {
resultB.complete(fallbackB);
resultC.complete(fallbackC);
}
try {
return result.get();
} catch (InterruptedException ex) {
throw new ExecutionException(ex);
}
};
}

这样,单个 FutureFunc 上的所有查询将始终返回相同的结果,即使它基于由于第一次超时而导致的回退值。此变体还始终将 combine 的执行排除在超时之外。

当然,如果根本没有打算使用不同的超时,您可以重构 getFuture() 以提前获得所需的超时,例如作为参数。这将大大简化实现,并且可以再次返回 future :

public CompletableFuture<Result> getFuture(long timeOut, TimeUnit u) {
CompletableFuture<A> resultA = serviceA.call();
CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a));
CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a));
ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.schedule(() -> resultB.complete(fallbackB), timeOut, u);
e.schedule(() -> resultC.complete(fallbackC), timeOut, u);
CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC);
bAndC.thenRun(e::shutdown);
return bAndC.thenApply(ignoredVoid ->
combine(resultA.join(), resultB.join(), resultC.join()));
}

关于java - 使用默认值中断 CompletableFuture,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46851235/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com