gpt4 book ai didi

java - 处理 ThreadPoolExecutor 的异常

转载 作者:太空狗 更新时间:2023-10-29 22:39:52 24 4
gpt4 key购买 nike

我有以下代码片段,基本上扫描需要执行的任务列表,然后将每个任务交给执行程序执行。

JobExecutor依次创建另一个执行器(用于执行数据库操作......读取数据并将数据写入队列)并完成任务。

JobExecutor返回 Future<Boolean>对于提交的任务。当其中一项任务失败时,我想优雅地中断所有线程并通过捕获所有异常来关闭执行程序。我需要做哪些改变?

public class DataMovingClass {
private static final AtomicInteger uniqueId = new AtomicInteger(0);

private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();

ThreadPoolExecutor threadPoolExecutor = null ;

private List<Source> sources = new ArrayList<Source>();

private static class IDGenerator extends ThreadLocal<Integer> {
@Override
public Integer get() {
return uniqueId.incrementAndGet();
}
}

public void init(){

// load sources list

}

public boolean execute() {

boolean succcess = true ;
threadPoolExecutor = new ThreadPoolExecutor(10,10,
10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("DataMigration-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());

List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();

for (Source source : sources) {
result.add(threadPoolExecutor.submit(new JobExecutor(source)));
}

for (Future<Boolean> jobDone : result) {
try {
if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
// in case of successful DbWriterClass, we don't need to change
// it.
success = false;
}
} catch (Exception ex) {
// handle exceptions
}
}

}

public class JobExecutor implements Callable<Boolean> {

private ThreadPoolExecutor threadPoolExecutor ;
Source jobSource ;
public SourceJobExecutor(Source source) {
this.jobSource = source;
threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Job Executor-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
}

public Boolean call() throws Exception {
boolean status = true ;
System.out.println("Starting Job = " + jobSource.getName());
try {

// do the specified task ;


}catch (InterruptedException intrEx) {
logger.warn("InterruptedException", intrEx);
status = false ;
} catch(Exception e) {
logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
status = false ;
}
System.out.println("Ending Job = " + jobSource.getName());
return status ;
}
}
}

最佳答案

当您向执行器提交任务时,它会返回一个 FutureTask 。实例。

FutureTask.get() 将重新抛出任务抛出的任何异常作为 ExecutorException .

因此,当您遍历 List<Future> 时并调用 get on each, catch ExecutorException并调用有序关闭。

关于java - 处理 ThreadPoolExecutor 的异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2554549/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com