gpt4 book ai didi

spring - ThreadPoolTask​​Executor中的TaskRejectedException

转载 作者:行者123 更新时间:2023-12-04 16:23:01 29 4
gpt4 key购买 nike

我试图使用Spring的Async异步调用API,并在我的Thread Config中使用ThreadPoolTask​​Executor进行调用:

@Configuration
@EnableAsync
public class ThreadConfig extends AsyncConfigurerSupport {

@Value("${core.pool.size}")
private int corePoolSize;

@Value("${max.pool.size}")
private int maxPoolSize;

@Value("${queue.capacity}")
private int queueCapacity;

@Override
@Bean
public Executor getAsyncExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.initialize();

return executor;

}

此处的设置为:
corePoolSize = 5;
maxPoolSize = 10;
QueueCapacity = 10;

我正在调用异步服务,如下所示:
for (String path : testList) {
Future<Boolean> pro = services.invokeAPI(path);
}

testList大约有50条记录。

当我运行此命令时,编译器将处理10个线程并在其给出10次之后调用invokeAPI方法10次:
org.springframework.core.task.TaskRejectedException: Executor[java.util.concurrent.ThreadPoolExecutor@3234ad78[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$1@5c17b70

我以为它将等待当前任务完成并重新分配线程,而不是抛出异常并终止程序。

要使我的所有50条记录都调用invokeAPI方法,该怎么办?

编辑:testList中的记录数可以更改。

有什么建议吗?

最佳答案

发生这种情况是因为您要使用该池的大小。由于队列的大小为10,而您可以拥有的最大线程数为10,因此在执行20个任务(10个正在运行且队列中有10个)后,执行程序开始拒绝任务。

有多种方法可以解决此问题。

  • 使用无限制队列。即,不指定队列的大小,因此它将能够容纳所有任务。线程释放后,将提交任务。
  • 提供一个RejectedExecutionHandler来完成任务。即在调用者线程上运行它们,或者丢弃它们或其他(取决于用例)。 Java已经提供了其中的一些功能,例如CallerRunsPolicyAbortPolicyDiscardPolicyDiscardOldestPolicy。您可以像使用executor#setRejectedExecutionHandler一样指定它们。
  • 提供您自己的阻塞线程池执行器,该阻塞器将阻塞直到有更多任务空间(使用信号量)为止。

  • 这是阻止执行程序的示例
    public class BlockingExecutor extends ThreadPoolExecutor {

    private final Semaphore semaphore;

    public BlockingExecutor(final int corePoolSize, final int poolSize, final int queueSize) {
    super(corePoolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    semaphore = new Semaphore(poolSize + queueSize);
    }

    @Override
    public void execute(final Runnable task) {
    boolean acquired = false;
    do {
    try {
    semaphore.acquire();
    acquired = true;
    } catch (final InterruptedException e) {
    //do something here
    }
    } while (!acquired);

    try {
    super.execute(task);
    } catch (final RejectedExecutionException e) {
    semaphore.release();
    throw e;
    }
    }

    protected void afterExecute(final Runnable r, final Throwable t) {
    super.afterExecute(r, t);
    semaphore.release();
    }
    }

    关于spring - ThreadPoolTask​​Executor中的TaskRejectedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49290054/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com