gpt4 book ai didi

java - 执行者 : How to synchronously wait until all tasks have finished if tasks are created recursively?

转载 作者:搜寻专家 更新时间:2023-10-30 21:05:21 28 4
gpt4 key购买 nike

我的问题与 this one here 密切相关.正如在那里发布的那样,我希望主线程等到工作队列为空并且所有任务都已完成。然而,我的情况的问题是,每个任务都可能递归地导致提交新任务进行处理。这使得收集所有这些任务的 future 有点尴尬。

我们当前的解决方案使用忙等待循环来等待终止:

        do { //Wait until we are done the processing
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (!executor.getQueue().isEmpty()
|| numTasks.longValue() > executor.getCompletedTaskCount());

numTasks 是一个随着每个新任务的创建而增加的值。这行得通,但由于等待繁忙,我认为这不是很好。我在想有没有什么好的方法可以让主线程同步等待,直到被显式唤醒。

最佳答案

非常感谢您的所有建议!

最后我选择了我认为相当简单的东西。我发现 CountDownLatch几乎是我需要的。它会阻塞,直到计数器达到 0。唯一的问题是它只能倒计时,不能倒计时,因此在我的动态设置中不起作用,任务可以提交新任务。因此,我实现了一个新类 CountLatch,它提供了额外的功能。 (见下文)我然后按如下方式使用这个类。

主线程调用 latch.awaitZero(),阻塞直到 latch 达到 0。

任何线程,在调用 executor.execute(..) 之前调用 latch.increment()

任何任务,在即将完成之前,都会调用 latch.decrement()

当最后一个任务终止时,计数器将达到 0,从而释放主线程。

欢迎提出进一步的建议和反馈!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected int acquireNonBlocking(int acquires) {
// increment count
for (;;) {
int c = getState();
int nextc = c + 1;
if (compareAndSetState(c, nextc))
return 1;
}
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

public CountLatch(int count) {
this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
sync.acquireNonBlocking(1);
}

public void decrement() {
sync.releaseShared(1);
}

public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}

}

请注意,increment()/decrement() 调用可以按照建议封装到自定义的 Executor 子类中,例如,由 Sami Korhonen 编写,或者按照 impl 的建议使用 beforeExecuteafterExecute。看这里:

public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
numRunningTasks.increment();
super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
numRunningTasks.decrement();
super.afterExecute(r, t);
}

/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion() throws InterruptedException {
numRunningTasks.awaitZero();
}

/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
numRunningTasks.awaitZero(timeout, unit);
}

}

关于java - 执行者 : How to synchronously wait until all tasks have finished if tasks are created recursively?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14535770/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com