gpt4 book ai didi

java - 当 ExecutorService 中的任何 Thread/Runnable/Callable 在等待终止时失败时如何捕获异常

转载 作者:行者123 更新时间:2023-12-03 12:47:57 24 4
gpt4 key购买 nike

我目前的代码看起来像这样:

public void doThings() {
int numThreads = 4;
ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {

final int index = i;
Runnable runnable = () -> {

// do things based on index
};

threadPool.execute(runnable);

}

threadPool.shutdown();

try {
// I'd like to catch exceptions here from any of the runnables
threadPool.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
Utils.throwRuntimeInterruptedException(e);
}
}

基本上,我会同时创建很多工作,然后等待所有工作完成。如果任何处理失败,我需要迅速知道并中止所有处理。 threadPool.awaitTermination似乎没有注意到其中一个线程内是否抛出了异常。我只是在控制台中看到一个堆栈跟踪。

我不太了解并发性,所以我对所有可用的接口(interface)/对象(例如 Callable)有点迷茫, Future , Task

我看到了 threadPool.invokeAll(callables)会给我一个List<Future>Future.get()可以从线程内抛出异常,但如果我调用它(如果可调用对象在其自己的线程中抛出异常)。但是如果我 .get我在顺序集合中拥有的每个可调用对象,然后在所有其他调用完成之前我不会知道最后一个失败。

我最好的猜测是有一个队列,runnables 把一个 Boolean成功或失败然后 take()从队列中取出的次数与线程的数量一样多。

我觉得对于一个看似非常常见、简单的用例来说,这过于复杂(即使只是我粘贴的代码也有点长得令人吃惊)。这甚至不包括在一个失败时中止可运行程序。必须有更好的方法,作为初学者我不知道。

最佳答案

我最终发现 ExecutorCompletionService 就是为此而设计的。然后我编写了以下类来抽象过程并稍微简化使用:

import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Wrapper around an ExecutorService that allows you to easily submit Callables, get results via iteration,
* and handle failure quickly. When a submitted callable throws an exception in its thread this
* will result in a RuntimeException when iterating over results. Typical usage is as follows:
*
* <ol>
* <li>Create an ExecutorService and pass it to the constructor.</li>
* <li>Create Callables and ensure that they respond to interruption, e.g. regularly call: <pre>{@code
* if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException("The thread was interrupted, likely indicating failure in a sibling thread.");
* }}</pre></li>
* <li>Pass the callables to the submit() method.</li>
* <li>Call finishedSubmitting().</li>
* <li>Iterate over this object (e.g. with a foreach loop) to get results from the callables.
* Each iteration will block waiting for the next result.
* If one of the callables throws an unhandled exception or the thread is interrupted during iteration
* then ExecutorService.shutdownNow() will be called resulting in all still running callables being interrupted,
* and a RuntimeException will be thrown </li>
* </ol>
*/
public class ExecutorServiceResultsHandler<V> implements Iterable<V> {

private ExecutorCompletionService<V> completionService;
private ExecutorService executorService;
AtomicInteger taskCount = new AtomicInteger(0);

public ExecutorServiceResultsHandler(ExecutorService executorService) {
this.executorService = executorService;
completionService = new ExecutorCompletionService<V>(executorService);
}

public void submit(Callable<V> task) {
completionService.submit(task);
taskCount.incrementAndGet();
}

public void finishedSubmitting() {
executorService.shutdown();
}

@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
@Override
public boolean hasNext() {
return taskCount.getAndDecrement() > 0;
}

@Override
public V next() {
Exception exception;
try {
return completionService.take().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
exception = e;
} catch (ExecutionException e) {
exception = e;
}
executorService.shutdownNow();
executorService = null;
completionService = null;
throw new RuntimeException(exception);
}
};
}

/**
* Convenience method to wait for the callables to finish for when you don't care about the results.
*/
public void awaitCompletion() {
for (V ignored : this) {
// do nothing
}
}

}

关于java - 当 ExecutorService 中的任何 Thread/Runnable/Callable 在等待终止时失败时如何捕获异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32865615/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com