gpt4 book ai didi

Java 在线程池中安全地存储和恢复未完成的任务

转载 作者:行者123 更新时间:2023-12-01 09:45:44 25 4
gpt4 key购买 nike

我的 Java Web 应用程序中有一个固定的线程池。

 App.aes =  Executors.newFixedThreadPool(3);

它用于执行异步任务。这些任务可能需要几个小时才能完成。因此,如果我需要重新加载应用程序,我需要检查是否有异步任务正在运行,如果有,我需要将这些任务存储在等待队列中的某个位置,并在应用程序重新加载后恢复它们。

我做了一些测试:

public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
for(int i=0; i< 5; i++){
final int c = i + 1;
es.submit(new Runnable(){
@Override
public void run() {
System.out.println("current running " + c);
try {
Thread.sleep(10000); // 10 sec
} catch (InterruptedException e) {
System.out.println("interrupted " + c);
}
}

});
}

Thread.sleep(15000);
List<Runnable> rems = es.shutdownNow();
System.out.println("Remaining " + rems.size());
System.out.println("--------- restore remaining task ----------");

es = Executors.newFixedThreadPool(1);
for(Runnable r : rems){
es.submit(r);
}
}

输出是:

current running 1
current running 2
interrupted 2
Remaining 4
--------- restore remaining task ----------
current running 3
current running 4
current running 5
current running 6

这不是我想要的结果。被中断的任务将不会被恢复。 API 文档证明了这一点:

Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. 

我们如何在java线程池中安全地存储和恢复未完成的任务?我真正的任务是自动防故障的,这意味着每个任务都可以一次又一次地重新运行。并且执行的顺序并不重要。

我的Web应用程序部署在weblogic上,线程池由servlet启动,注册一个ServletContextListener来关闭线程池。

<小时/>

我期待两个选择:

  • 选项 1。无需中断 Activity 任务,等待它们完成,并且然后保存所有等待的任务,然后关闭线程池。

    优点:无需担心中断导致的任何不可预测的情况。
    缺点:这需要等待所有正在运行的任务完成。根据线程池大小和每个任务的时间成本,等待时间可能会很长。

  • 选项 2. 中断 Activity 任务,然后保存所有未完成的任务
    关闭线程池。

选项 1 对我来说是理想的解决方案。

最佳答案

创建一个中断任务列表怎么样?您已经拥有使您能够执行特定中断线程的代码的 catch:

public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
List<Runnable> interruptedTasks = new CopyOnWriteArrayList<Runnable>(); //create the list of interrupted threads/tasks
//Edited - after the right comment by @Luke Lee (AbstractList cannot be instantiated and the operation in the catch block should be thread-safe)
for(int i=0; i< 5; i++){
final int c = i + 1;
es.submit(new Runnable(){
@Override
public void run() {
System.out.println("current running " + c);
try {
Thread.sleep(10000); // 10 sec
} catch (InterruptedException e) {
System.out.println("interrupted " + c);
interruptedTasks.add(this); //add this interrupted instance to the list
}
}

});
}

Thread.sleep(15000);
List<Runnable> rems = es.shutdownNow();
System.out.println("Remaining " + rems.size());
System.out.println("--------- restore remaining task ----------");

es = Executors.newFixedThreadPool(1);
for (Runnable r : interruptedTasks){ //add the interrupted Runnables to the new pool
es.submit(r);
}
for(Runnable r : rems){
es.submit(r);
}
}

我没有对此进行测试 - 但我相信这应该有效。如果您需要在重新运行任务之前进行某种清理,您当然可以在将实例添加到中断任务列表的同时在 catch 中进行清理。

显然,这有助于简化测试 - 但您可以在更复杂的设计中执行相同的操作:您可以将 Runnable 作为一个类,将池的对象作为构造函数中的参数,而不是使用匿名类。仅在创建实例后才将其提交到池中。这样,如果中断,它就能够将自己添加到池中。但是,您需要向 Runnable 添加一个 setPool(ExecutorSerive) 方法并调用它,将池的对象重置为新对象,然后重新运行它(在我添加的 for 循环中,就在提交行之前)。

<小时/>

编辑:刚刚看到您的编辑 - 关于选项。我的建议显然是第二个选择。对于第一个选项,我认为您可以使用文档 here 中的示例代码,具体来说 - 看看 awaitTermination

相关部分:

void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

关于Java 在线程池中安全地存储和恢复未完成的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38053238/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com