gpt4 book ai didi

java - ThreadpoolExecutor 具有重试能力,并在任务失败多次后关闭

转载 作者:行者123 更新时间:2023-12-01 19:38:55 26 4
gpt4 key购买 nike

我需要一个线程池执行器,它需要完成确切数量(相同)的任务。

它必须能够重新提交失败的任务 n 次。如果任何任务失败超过 n,则线程池应关闭并且不再继续处理任何其他任务。

我尝试结合在不同答案中找到的两种方法 - 一种通过重写 ThreadPoolExecutor.afterExecute 重新提交失败的任务,并子类化 CountDownLatch 以便等待锁存器的线程被中断并关闭执行器。

到目前为止,这是子类倒计时锁存器:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AbortableCountDownLatch extends CountDownLatch {
protected boolean aborted = false;

public AbortableCountDownLatch(int count) {
super(count);
}

/**
* Unblocks all threads waiting on this latch and cause them to receive an
* AbortedException. If the latch has already counted all the way down,
* this method does nothing.
*/
public void abort() {
if( getCount() == 0 )
return;

this.aborted = true;
while(getCount() > 0)
countDown();
}

@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
final boolean rtrn = super.await(timeout,unit);
if (aborted)
throw new AbortedException();
return rtrn;
}

@Override
public void await() throws InterruptedException {
super.await();
if (aborted)
throw new AbortedException();
}


public static class AbortedException extends InterruptedException {
public AbortedException() {
}

public AbortedException(String detailMessage) {
super(detailMessage);
}
}
}

以及线程池执行器:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

private static final int RETRY_LIMIT = 3;

private Map<Runnable, Integer> retriedTasks = new ConcurrentHashMap<>();
private AbortableCountDownLatch latch;

public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, AbortableCountDownLatch latch) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.latch = latch;
}

@Override
public void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// If submit() method is called instead of execute()
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException e) {
t = e;
} catch (ExecutionException e) {
t = e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
retriedTasks.put(r, retriedTasks.getOrDefault(r, 0) + 1);
System.out.println("Retries for " + r + " -> " + retriedTasks.get(r));
/* check to see if we have retried this task too many times, if so - shutdown */
if (retriedTasks.containsKey(r) && retriedTasks.get(r) > RETRY_LIMIT) {
System.err.println("Thread failed for more than " + RETRY_LIMIT + " times, aborting everything..");
this.latch.abort();
} else {
System.err.println("Thread threw exception " + t.getMessage() + ". Retry-ing task...");
execute(r);
}
} else {
/* clear any previous retry count for this runnable */
retriedTasks.remove(r);
}
}
}

主要会像这样使用它们:

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MainProcessor {

public static void main(String[] args) {

AbortableCountDownLatch latch = new AbortableCountDownLatch(5);
ThreadPoolExecutor threadPoolExecutor = new MyThreadPoolExecutor(8, 8, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), latch);

for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(() -> {
System.out.println("Started thread " + Thread.currentThread().getName());
Random random = new Random();
try {
Thread.sleep(random.nextInt(7000));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextBoolean()){
System.err.println("Thread " + Thread.currentThread().getName() + " failed - throwing exception..");
throw new RuntimeException("Thread " + Thread.currentThread().getName() + "failed! spectacularly :!");
}
else {
System.out.println("Thread " + Thread.currentThread().getName() + " finished.");
latch.countDown();
}
});
}

try {
latch.await();
} catch (InterruptedException e) {
threadPoolExecutor.shutdownNow();
}

threadPoolExecutor.shutdown();
}
}

这种方法看起来正确吗?我不太喜欢必须将闩锁传递给线程池执行器和实际的 Runnable。有实现这一目标的标准方法吗?我也很喜欢 Scala 版本。

我见过其他人建议任务应该在失败的情况下重新提交到池中,但这似乎不是一个好主意,因为任务应该只负责实际的运行逻辑,而不是执行细节。

最佳答案

您可以使用任务包装器来完成这项工作,那么它会相当简单:

public class TaskWrapper implements Runnable
{
private Runnable task;
private int maxResubmits;
private ThreadPoolExecutor executor;
private CountDownLatch latch;

public TaskWrapper(Runnable task, int maxResubmits, ThreadPoolExecutor executor, CountDownLatch latch) {
this.task=task;
this.maxResubmits=maxResubmits;
this.executor=executor;
this.latch=latch;
executor.submit(this);
}

public void run() {
try {
task.run();
latch.countdoun();
}
catch(Exception e) {
maxResubmits--;
if(maxResubmits>0)
executor.submit(this);
else
{
latch.countdoun();
executor.shutdownNow()
}
}
}
}

您现在只需要创建闩锁,调用您的任务,然后等待执行:


List<Runnable> tasks;
int maxResubmits;

CountDownLatch latch=new CountDownLatch(tasks.size());

tasks.forEach(task->new TaskWrapper(task,maxResubmits,executor,latch));

latch.await();

if(!executor.isShutdown())
executor.shutdown();

关于java - ThreadpoolExecutor 具有重试能力,并在任务失败多次后关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56130677/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com