gpt4 book ai didi

java - Java 8 CompletableFuture 中的默认值超时

转载 作者:IT老高 更新时间:2023-10-28 20:22:20 34 4
gpt4 key购买 nike

假设我有一些异步计算,例如:

CompletableFuture
.supplyAsync(() -> createFoo())
.thenAccept(foo -> doStuffWithFoo(foo));

如果异步供应商根据某些指定的超时超时,是否有一种很好的方法可以为 foo 提供默认值?理想情况下,此类功能也会尝试取消运行缓慢的供应商。例如,是否有类似于以下假设代码的标准库功能:

CompletableFuture
.supplyAsync(() -> createFoo())
.acceptEither(
CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
foo -> doStuffWithFoo(foo));

或者甚至更好:

CompletableFuture
.supplyAsync(() -> createFoo())
.withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
.thenAccept(foo -> doStuffWithFoo(foo));

我知道 get(timeout, unit),但我想知道是否有更好的标准方法来按照上面代码中的建议以异步和 react 方式应用超时。

编辑:这是一个受 Java 8: Mandatory checked exceptions handling in lambda expressions. Why mandatory, not optional? 启发的解决方案,但不幸的是它阻塞了一个线程。如果我们依赖 createFoo() 来异步检查超时并抛出它自己的超时异常,它可以在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建异常的成本(这可以没有“快速 throw ”就很贵)

static <T> Supplier<T> wrapped(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (RuntimeException e1) {
throw e1;
} catch (Throwable e2) {
throw new RuntimeException(e2);
}
};
}
CompletableFuture
.supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
.exceptionally(e -> "default")
.thenAcceptAsync(s -> doStuffWithFoo(foo));

最佳答案

CompletableFuture.supplyAsync 只是一个帮助方法,它为您创建 CompletableFuture,并将任务提交到 ForkJoin Pool。

您可以根据自己的要求创建自己的 supplyAsync,如下所示:

private static final ScheduledExecutorService schedulerExecutor = 
Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService =
Executors.newCachedThreadPool();


public static <T> CompletableFuture<T> supplyAsync(
final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
T defaultValue) {

final CompletableFuture<T> cf = new CompletableFuture<T>();

// as pointed out by Peti, the ForkJoinPool.commonPool() delivers a
// ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
// Using Executors.newCachedThreadPool instead in the example
// submit task
Future<?> future = executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});

//schedule watcher
schedulerExecutor.schedule(() -> {
if (!cf.isDone()) {
cf.complete(defaultValue);
future.cancel(true);
}

}, timeoutValue, timeUnit);

return cf;
}

使用该助手创建 CompletableFuture 就像使用 CompletableFuture 中的静态方法一样简单:

    CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
TimeUnit.SECONDS, "default");

测试它:

    a = supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
// ignore
}
return "hi";
}, 1, TimeUnit.SECONDS, "default");

关于java - Java 8 CompletableFuture 中的默认值超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23575067/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com