gpt4 book ai didi

java - 如何调用新的 @Async 方法但等待一个条件?

转载 作者:行者123 更新时间:2023-12-01 18:14:21 24 4
gpt4 key购买 nike

我有一个实现 AMQP MessageListener 的 Spring Boot 应用程序。此监听器调用由 ThreadPoolTask​​Executor 管理的具有池大小的 @Async 方法。当有很多传入消息时,就会出现问题,因此这些消息会因为没有可用的异步工作线程而丢失。

我正在使用 Spring Core 5.0.7-RELEASE,Java 8

这是我的代码:

异步配置器:

@EnableAsync
@Configuration
public class AsyncConfiguration extends AsyncConfigurerSupport {

@Override
@Bean("docThreadPoolTaskExecutor")
public Executor getAsyncExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("DocWorkerThread-");
executor.initialize();
return executor;
}

我的后台服务 (MyAsyncService):


@Async("docThreadPoolTaskExecutor")
@Override
public void generaDocumento(String idUser, int year) {
//... some heavy and slow process
}

我的消息监听器:

...
@Autowired
private MyAsyncService myAsyncService;

@Override
@EntryPoint
public void onMessage(Message message) {
try {
final String mensaje = new String(message.getBody(), StandardCharsets.UTF_8);
final MyPojo payload = JsonUtils.readFromJson(mensaje , MyPojo.class);

myAsyncService.generaDocumento(payload.getIdUser(), Integer.valueOf(payload.getYear()));

} catch ( Throwable t ) {
throw new AmqpRejectAndDontRequeueException( t );
}
}

我需要有人给我一些想法来解决这个问题。

最佳答案

ThreadPoolTask​​Executor 默认有一个队列。当达到 corePoolSize 时,消息应添加到队列中。当队列已满时,将启动更多工作线程,直到达到 maxPoolSize。您可以通过executor.getThreadPoolExecutor().getQueue().size()检查当前队列大小,以验证消息是否真的丢失或只是卡在队列中。

关于java - 如何调用新的 @Async 方法但等待一个条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60403478/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com