gpt4 book ai didi

java - ThreadPoolExecutor:当一个任务返回错误时从 invokeAll() 取消任务

转载 作者:行者123 更新时间:2023-11-30 11:13:53 25 4
gpt4 key购买 nike

我有一个线程池执行器对成批出现的键列表执行相同的操作。所以我正在使用 invokeall() 方法批量处理键列表。用例是这样的,如果批处理中的任何任务返回错误,则没有必要继续处理其他键。所以

  1. 一旦任务返回错误,我如何取消批量执行的任务。
  2. 但不影响另一批 key 的执行。即取消应按批处理隔离。

感谢您的帮助。

最佳答案

如果不进行一些定制,我看不出如何做到这一点。我能想出的最简单的实现需要:

  • 一个专门的 Future 实现,基本上是 FutureTask 的子类它覆盖 setException() 方法,以便在任务抛出异常时取消所有其他任务
  • 专业ThreadPoolExecutor覆盖 invokeAll() 以使用自定义 future 的实现

它是这样的:

为定制的 future :

import java.util.Collection;
import java.util.concurrent.*;

public class MyFutureTask<V> extends FutureTask<V> {
private Callable<V> task;
private Collection<Future<V>> allFutures;

public MyFutureTask(Callable<V> task, Collection<Future<V>> allFutures) {
super(task);
this.task = task;
this.allFutures = allFutures;
}

@Override
protected void setException(Throwable t) {
super.setException(t);
synchronized(allFutures) {
for (Future<V> future: allFutures) {
if ((future != this) && !future.isDone()) {
future.cancel(true);
}
}
}
}
}

对于自定义线程池:

import java.util.*;
import java.util.concurrent.*;

public class MyThreadPool extends ThreadPoolExecutor {
public MyThreadPool(int size) {
super(size, size, 1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
List<Future<T>> futures = new ArrayList<>(tasks.size());
for (Callable<T> callable: tasks) {
futures.add(new MyFutureTask<>(callable, futures));
}
for (Future<T> future: futures) {
execute((MyFutureTask<T>) future);
}
for (Future<T> future: futures) {
try {
future.get();
} catch (ExecutionException|CancellationException e) {
// ignore this exception
}
}
return futures;
}
}

代码示例来测试它:

import java.util.*;
import java.util.concurrent.*;

public class TestThreadPool {
public static void main(final String[] args) {
ExecutorService executor = null;
try {
int size = 10;
executor = new MyThreadPool(size);
List<Callable<String>> tasks = new ArrayList<>();
int count=1;
tasks.add(new MyCallable(count++, false));
tasks.add(new MyCallable(count++, true));
List<Future<String>> futures = executor.invokeAll(tasks);
System.out.println("results:");
for (int i=0; i<futures.size(); i++) {
Future<String> f = futures.get(i);
try {
System.out.println(f.get());
} catch (CancellationException e) {
System.out.println("CancellationException for task " + (i+1) +
": " + e.getMessage());
} catch (ExecutionException e) {
System.out.println("ExecutionException for task " + (i+1) +
": " + e.getMessage());
}
}
} catch(Exception e) {
e.printStackTrace();
} finally {
if (executor != null) executor.shutdownNow();
}
}

public static class MyCallable implements Callable<String> {
private final int index;
private final boolean simulateFailure;

public MyCallable(int index, boolean simulateFailure) {
this.index = index;
this.simulateFailure = simulateFailure;
}

@Override
public String call() throws Exception {
if (simulateFailure) {
throw new Exception("task " + index + " simulated failure");
}
Thread.sleep(2000L);
return "task " + index + " succesful";
}
}
}

最后是执行测试的结果,如输出控制台中所示:

results:
CancellationException for task 1: null
ExecutionException for task 2: java.lang.Exception: task 2 simulated failure

关于java - ThreadPoolExecutor:当一个任务返回错误时从 invokeAll() 取消任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26185013/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com