gpt4 book ai didi

java - 如何防止 CompletableFuture#whenComplete 在上下文线程中执行

转载 作者:行者123 更新时间:2023-11-30 02:24:26 25 4
gpt4 key购买 nike

我有以下代码:

ConcurrentHashMap taskMap= new ConcurrentHashMap();
....
taskMap.compute(key, (k, queue) -> {
CompletableFuture<Void> future = (queue == null)
? CompletableFuture.runAsync(myTask, poolExecutor)
: queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
//to prevent OutOfMemoryError in case if we will have too much keys
future.whenComplete((r, e) -> taskMap.remove(key, future));
return future;
});

此代码的问题是,如果 future 已经完成 whenComplete 函数参数在与 compute 调用相同的线程中调用。在此方法的主体中,我们从 map 中删除条目。但计算方法文档禁止这样做,应用程序会卡住。

如何解决这个问题?

最佳答案

最明显的解决方案是使用 whenCompleteAsync 而不是 whenComplete,因为前者保证使用提供的 Executor 执行操作,而不是调用线程。可以通过以下方式演示

Executor ex = r -> { System.out.println("job scheduled"); new Thread(r).start(); };
for(int run = 0; run<2; run++) {
boolean completed = run==0;
System.out.println("*** "+(completed? "with already completed": "with async"));
CompletableFuture<String> source = completed?
CompletableFuture.completedFuture("created in "+Thread.currentThread()):
CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
return "created in "+Thread.currentThread();
}, ex);

source.thenApplyAsync(s -> s+"\nprocessed in "+Thread.currentThread(), ex)
.whenCompleteAsync((s,t) -> {
if(t!=null) t.printStackTrace(); else System.out.println(s);
System.out.println("consumed in "+Thread.currentThread());
}, ex)
.join();
}

这将打印类似的内容

*** with already completed
job scheduled
job scheduled
created in Thread[main,5,main]
processed in Thread[Thread-0,5,main]
consumed in Thread[Thread-1,5,main]
*** with async
job scheduled
job scheduled
job scheduled
created in Thread[Thread-2,5,main]
processed in Thread[Thread-3,5,main]
consumed in Thread[Thread-4,5,main]

所以你可以使用

taskMap.compute(key, (k, queue) -> {
CompletableFuture<Void> future = (queue == null)
? CompletableFuture.runAsync(myTask, poolExecutor)
: queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
//to prevent OutOfMemoryError in case if we will have too much keys
future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor);
return future;
});

如果提前完成的可能性很大,您可以使用以下方法减少开销

taskMap.compute(key, (k, queue) -> {
CompletableFuture<Void> future = (queue == null)
? CompletableFuture.runAsync(myTask, poolExecutor)
: queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
//to prevent OutOfMemoryError in case if we will have too much keys
if(future.isDone()) future = null;
else future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor);
return future;
});

也许,您没有找到这个明显的解决方案,因为您不喜欢依赖操作始终被安排为池中的新任务,即使完成已经发生在不同的任务中。您可以使用专门的执行程序来解决此问题,该执行程序只会在必要时重新安排任务:

Executor inPlace = Runnable::run;
Thread forbidden = Thread.currentThread();
Executor forceBackground
= r -> (Thread.currentThread()==forbidden? poolExecutor: inPlace).execute(r);



future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), forceBackground);

但是您可能会重新考虑这种复杂的每个映射清理逻辑是否真的需要。它不仅很复杂,而且可能会产生显着的开销,可能会安排大量清理操作,而这些操作在执行时已经过时了,而这些操作并不是真正需要的。

执行起来可能会更简单、更高效

taskMap.values().removeIf(CompletableFuture::isDone);

不时清理整个 map 。

关于java - 如何防止 CompletableFuture#whenComplete 在上下文线程中执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46034531/

25 4 0