gpt4 book ai didi

java - Java 8 CompletableFuture中的默认值超时

转载 作者:行者123 更新时间:2023-12-02 00:36:59 26 4
gpt4 key购买 nike

假设我有一些异步计算,例如:

CompletableFuture
.supplyAsync(() -> createFoo())
.thenAccept(foo -> doStuffWithFoo(foo));


如果异步供应商根据某些指定的超时超时,是否有一种很好的方式为foo提供默认值?理想情况下,此类功能也会尝试取消运行缓慢的供应商。例如,是否存在类似于以下假设代码的标准库功能:

CompletableFuture
.supplyAsync(() -> createFoo())
.acceptEither(
CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
foo -> doStuffWithFoo(foo));


甚至更好:

CompletableFuture
.supplyAsync(() -> createFoo())
.withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
.thenAccept(foo -> doStuffWithFoo(foo));


我知道 get(timeout, unit),但是想知道是否有更好的标准方法可以按照上述代码中的建议以异步和响应方式应用超时。

编辑:这是一个受 Java 8: Mandatory checked exceptions handling in lambda expressions. Why mandatory, not optional?启发的解决方案,但不幸的是它阻塞了一个线程。如果我们依靠createFoo()异步检查超时并抛出自己的超时异常,它将在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建异常的费用(没有“快速投掷”就很昂贵)

static <T> Supplier<T> wrapped(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (RuntimeException e1) {
throw e1;
} catch (Throwable e2) {
throw new RuntimeException(e2);
}
};
}
CompletableFuture
.supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
.exceptionally(e -> "default")
.thenAcceptAsync(s -> doStuffWithFoo(foo));

最佳答案

CompletableFuture.supplyAsync只是一个帮助程序方法,可以为您创建一个CompletableFuture并将任务提交到ForkJoin池。

您可以按照如下要求创建自己的supplyAsync:

private static final ScheduledExecutorService schedulerExecutor = 
Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService =
Executors.newCachedThreadPool();


public static <T> CompletableFuture<T> supplyAsync(
final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
T defaultValue) {

final CompletableFuture<T> cf = new CompletableFuture<T>();

// as pointed out by Peti, the ForkJoinPool.commonPool() delivers a
// ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
// Using Executors.newCachedThreadPool instead in the example
// submit task
Future<?> future = executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});

//schedule watcher
schedulerExecutor.schedule(() -> {
if (!cf.isDone()) {
cf.complete(defaultValue);
future.cancel(true);
}

}, timeoutValue, timeUnit);

return cf;
}


使用该助手创建CompletableFuture就像在CompletableFuture中使用静态方法一样容易:

    CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
TimeUnit.SECONDS, "default");


要测试它:

    a = supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
// ignore
}
return "hi";
}, 1, TimeUnit.SECONDS, "default");

关于java - Java 8 CompletableFuture中的默认值超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57967181/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com