gpt4 book ai didi

@Async注解详解以及可能遇到的各种问题

转载 作者:我是一只小鸟 更新时间:2023-09-13 15:07:12 34 4
gpt4 key购买 nike

1、简介 1)在方法上使用该@Async注解,申明该方法是一个异步任务; 2)在类上面使用该@Async注解,申明该类中的所有方法都是异步任务; 3) 方法上一旦标记了这个@Async注解,当其它线程调用这个方法时,就会开启一个新的子线程去异步处理该业务逻辑。 4)使用此注解的方法的类对象,必须是spring管理下的bean对象; 5) 要想使用异步任务,需要在主类上开启异步配置,即配置上@EnableAsync注解; 。

注意事项

如下方式会使@Async失效 。

  • 异步方法使用static修饰
  • 异步类没有使用@Component注解(或其他注解)导致spring无法扫描到异步类
  • 异步方法不能与被调用的异步方法在同一个类中
  • 类中需要使用@Autowired或@Resource等注解自动注入,不能自己手动new对象
  • 如果使用SpringBoot框架必须在启动类中增加@EnableAsync注解

2、使用

1、基础代码示例

1)启动类中增加@EnableAsync

以Spring boot 为例,启动类中增加@EnableAsync:

                          
                            @EnableAsync
@SpringBootApplication

                          
                          
                            public
                          
                          
                            class
                          
                          
                             ManageApplication {
    
                          
                          
                            //
                          
                          
                            ...
                          
                          
}
                        

2)方法上加@Async注解:

                          
                            @Component

                          
                          
                            public
                          
                          
                            class
                          
                          
                             MyAsyncTask {
     @Async
    
                          
                          
                            public
                          
                          
                            void
                          
                          
                             asyncCpsItemImportTask(Long platformId, String jsonList){
        
                          
                          
                            //
                          
                          
                            ...具体业务逻辑
                          
                          
                                }
}
                          
                        

2、隐含问题一:默认线程池配置不合适,导致系统奔溃

 

@Async注解在使用时,如果不指定线程池的名称,则使用Spring默认的线程池,Spring默认的线程池为SimpleAsyncTaskExecutor.

 

该类型线程池的默认配置:

                               默认核心线程数:8
                              
                                ,
    
    最大线程数:Integet.MAX_VALUE,
    队列使用LinkedBlockingQueue,
    容量是:Integet.MAX_VALUE,
    空闲线程保留时间:60s,
    线程池拒绝策略:AbortPolicy。
                              
                            

从最大线程数的配置上,相信你也看到问题了:并发情况下,会 无限 创建线程 、然后OOM、然后系统崩溃。。.

 

1)问题一解决方法一:

可以通过修改线程池默认配置,来解决上述问题; 。

                                
                                  spring:
  task:
    execution:
      pool:
        max
                                
                                -size: 6
                                
                                  
        core
                                
                                -size: 3
                                
                                  
        keep
                                
                                -
                                
                                  alive: 3s
        queue
                                
                                -capacity: 1000
                                
                                  
        thread
                                
                                -name-prefix: name
                              

  。

2)问题一解决方法二:

@Async注解,支持使用 自定义线程池 ,所以通过自定义线程池解决上述问题。 或者说,有时候、实际开发中就是要求你必修使用指定的线程池,@Async注解是支持的.

                              
                                /**
                              
                              
                                
 * 
                              
                              
                                @author
                              
                              
                                 HWX
 
                              
                              
                                */
                              
                              
                                
@Configuration
@EnableAsync  

                              
                              
                                public
                              
                              
                                class
                              
                              
                                 ThreadPoolTaskConfig {


                              
                              
                                /*
                              
                              
                                
    默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
     当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
   当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
 
                              
                              
                                */
                              
                              
                                /**
                              
                              
                                 允许线程空闲时间(单位:默认为秒) 
                              
                              
                                */
                              
                              
                                private
                              
                              
                                static
                              
                              
                                final
                              
                              
                                int
                              
                               KEEP_ALIVE_TIME = 60
                              
                                ;
    
                              
                              
                                /**
                              
                              
                                 缓冲队列大小 
                              
                              
                                */
                              
                              
                                private
                              
                              
                                static
                              
                              
                                final
                              
                              
                                int
                              
                               QUEUE_CAPACITY = 1000
                              
                                ;
    
                              
                              
                                /**
                              
                              
                                 线程池名前缀 
                              
                              
                                */
                              
                              
                                private
                              
                              
                                static
                              
                              
                                final
                              
                               String THREAD_NAME_PREFIX = "Async-Service-"
                              
                                ;
  
    @Bean(
                              
                              "taskExecutor") 
                              
                                //
                              
                              
                                 bean的名称,默认为首字母小写的方法名  
                              
                              
                                public
                              
                              
                                 ThreadPoolTaskExecutor taskExecutor(){
        
                              
                              
                                //
                              
                              
                                 获取当前机器CPU核数
                              
                              
                                int
                              
                               cpuProcessors =
                              
                                 Runtime.getRuntime().availableProcessors();
        
                              
                              
                                if
                              
                               (cpuProcessors == 0
                              
                                ) {
            cpuProcessors 
                              
                              = 4
                              
                                ;
        }
        ThreadPoolTaskExecutor executor 
                              
                              = 
                              
                                new
                              
                              
                                 ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(cpuProcessors);
        executor.setMaxPoolSize(cpuProcessors
                              
                              +1
                              
                                );
        executor.setQueueCapacity(QUEUE_CAPACITY);
        executor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
  
        
                              
                              
                                //
                              
                              
                                 线程池对拒绝任务的处理策略  
        
                              
                              
                                //
                              
                              
                                 CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务  
                              
                              
        executor.setRejectedExecutionHandler(
                              
                                new
                              
                              
                                 ThreadPoolExecutor.CallerRunsPolicy());  
        
                              
                              
                                //
                              
                              
                                 初始化  
                              
                              
                                        executor.initialize();  
        
                              
                              
                                return
                              
                              
                                 executor;  
    }  
}  
                              
                            

使用 。

                              
                                @Service  

                              
                              
                                public
                              
                              
                                class
                              
                              
                                 TranTest2Service {  
    Logger log 
                              
                              = LoggerFactory.getLogger(TranTest2Service.
                              
                                class
                              
                              
                                );  
  
    
                              
                              
                                //
                              
                              
                                 发送提醒短信 1  
                              
                              
        @PostConstruct 
                              
                                //
                              
                              
                                 加上该注解项目启动时就执行一次该方法  
                              
                              
                                @Async("taskExecutor")  
    
                              
                              
                                public
                              
                              
                                void
                              
                               sendMessage1() 
                              
                                throws
                              
                              
                                 InterruptedException {  
        log.info(
                              
                              "发送短信方法---- 1   执行开始"
                              
                                );  
        Thread.sleep(
                              
                              5000); 
                              
                                //
                              
                              
                                 模拟耗时  
                              
                              
        log.info("发送短信方法---- 1   执行结束"
                              
                                );  
    }  
  
    
                              
                              
                                //
                              
                              
                                 发送提醒短信 2  
                              
                              
        @PostConstruct 
                              
                                //
                              
                              
                                 加上该注解项目启动时就执行一次该方法  
                              
                              
    @Async("taskExecutor"
                              
                                )  
    
                              
                              
                                public
                              
                              
                                void
                              
                               sendMessage2() 
                              
                                throws
                              
                              
                                 InterruptedException {  
  
        log.info(
                              
                              "发送短信方法---- 2   执行开始"
                              
                                );  
        Thread.sleep(
                              
                              2000); 
                              
                                //
                              
                              
                                 模拟耗时  
                              
                              
        log.info("发送短信方法---- 2   执行结束"
                              
                                );  
    }  
}  
                              
                            

3、隐含问题二:异步任务的事务问题

@Async注解由于是异步执行的,在其进行数据库的操作之时,将无法控制事务管理。 解决办法:可以把 @Transactional注解放到内部的需要进行事务的方法上 ; 即将方法中对数据库的操作集中提取出来、放入一个方法中,对该方法加@Transactional注解进行事务控制 。

4、隐含问题三:在同类方法中调用@Async方法,没有异步执行

@Async的原理概括:

@Async 异步执行,是通过 Spring AOP 动态代理 的方式来实现的。Spring容器启动初始化bean时,判断类中是否使用了@Async注解,如果使用了则为其创建切入点和切入点处理器,根据切入点创建代理,在线程调用@Async注解标注的方法时,会调用代理,执行切入点处理器invoke方法,将方法的执行提交给线程池中的另外一个线程来处理,从而实现了异步执行.

所以,如果a方法调用它同类中的标注@Async的b方法,是不会异步执行的,因为从a方法进入调用的都是该类对象本身,不会进入代理类。因此, 相同类中的方法调用带@Async的方法是无法异步的,这种情况仍然是同步.

3、异步任务的返回结果

异步的业务逻辑处理场景 有两种:一个是不需要返回结果,另一种是需要接收返回结果.

不需要返回结果的比较简单,就不多说了.

需要接收返回结果的示例如下:

                              @Async("MyExecutor"
                              
                                )

                              
                              
                                public
                              
                               Future<Map<Long, List>>
                              
                                 queryMap(List ids) {
    List
                              
                              <> result =
                              
                                 businessService.queryMap(ids);
    ..............
    Map
                              
                              <Long, List> resultMap =
                              
                                 Maps.newHashMap();
    ...
    
                              
                              
                                return
                              
                              
                                new
                              
                               AsyncResult<>
                              
                                (resultMap);
}
                              
                            

调用异步方法的示例:

                              
                                public
                              
                               Map<Long, List> asyncProcess(List<BindDeviceDO> bindDevices,List<BindStaffDO>
                              
                                 bindStaffs, String dccId) {
        Map
                              
                              <Long, List> finalMap =
                              
                                null
                              
                              
                                ;
        
                              
                              
                                //
                              
                              
                                 返回值:
                              
                              
        Future<Map<Long, List>> asyncResult =
                              
                                 MyService.queryMap(ids);
        
                              
                              
                                try
                              
                              
                                 {
            finalMap 
                              
                              =
                              
                                 asyncResult.get();
        } 
                              
                              
                                catch
                              
                              
                                 (Exception e) {
            ...
        }
        
                              
                              
                                return
                              
                              
                                 finalMap;
}
                              
                            

我个人觉得,异步方法不该设置返回值;因为调用异步方法的地方,还要等待返回结果的话,那就差不多又成了串行执行了,失去了异步的意义.

 

4、检测给@Async配置自定义线程池、会是整个项目共用的吗?

 

1、线程池本身也会消耗内存资源,所以我们要控制线程池的规模,防止它占用过多资源、进而影响项目运行; 2、为了统一规划资源,线程池尽量统一配置,即全项目尽量使用同一个线程池。 3、那么使用@Async,并自定义线程池,会全局公用吗?

我们做如下测试:

                              
                                /**
                              
                              
                                application.yml配置*
                              
                              
                                */
                              
                              
                                
# 自定义线程池参数(用以@Async使用,可选)
execution:
  pool:
    core
                              
                              -size: 3
                              
                                
    queue
                              
                              -capacity: 500
                              
                                
    max
                              
                              -size: 10
                              
                                
    keep
                              
                              -alive: 3
                              
                                
    thread
                              
                              -name-prefix: customize-th-



                              
                                /**
                              
                              
                                线程池配置类*
                              
                              
                                */
                              
                              
                                
@Configuration

                              
                              
                                public
                              
                              
                                class
                              
                              
                                 ExecutorConfig {

    
                              
                              
                                /**
                              
                              
                                
     * 核心线程
     
                              
                              
                                */
                              
                              
                                
    @Value(
                              
                              "${execution.pool.core-size}"
                              
                                )
    
                              
                              
                                private
                              
                              
                                int
                              
                              
                                 corePoolSize;
    
                              
                              
                                /**
                              
                              
                                
     * 队列容量
     
                              
                              
                                */
                              
                              
                                
    @Value(
                              
                              "${execution.pool.queue-capacity}"
                              
                                )
    
                              
                              
                                private
                              
                              
                                int
                              
                              
                                 queueCapacity;
    
                              
                              
                                /**
                              
                              
                                
     * 最大线程
     
                              
                              
                                */
                              
                              
                                
    @Value(
                              
                              "${execution.pool.max-size}"
                              
                                )
    
                              
                              
                                private
                              
                              
                                int
                              
                              
                                 maxPoolSize;
    
                              
                              
                                /**
                              
                              
                                
     * 保持时间
     
                              
                              
                                */
                              
                              
                                
    @Value(
                              
                              "${execution.pool.keep-alive}"
                              
                                )
    
                              
                              
                                private
                              
                              
                                int
                              
                              
                                 keepAliveSeconds;
    
                              
                              
                                /**
                              
                              
                                
     * 名称前缀
     
                              
                              
                                */
                              
                              
                                
    @Value(
                              
                              "${execution.pool.thread-name-prefix}"
                              
                                )
    
                              
                              
                                private
                              
                              
                                 String preFix;

    @Bean(
                              
                              "MyExecutor"
                              
                                )
    
                              
                              
                                public
                              
                              
                                 Executor myExecutor() {
        ThreadPoolTaskExecutor executor 
                              
                              = 
                              
                                new
                              
                              
                                 ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(preFix);
        executor.setRejectedExecutionHandler( 
                              
                              
                                new
                              
                              
                                 ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        
                              
                              
                                return
                              
                              
                                 executor;
    }
}
                              
                            

2)写两个测试类,使用@Async标记方法

                              
                                /**
                              
                              
                                测试类A*
                              
                              
                                */
                              
                              
                                
@Service

                              
                              
                                public
                              
                              
                                class
                              
                               TestServiceAImpl 
                              
                                implements
                              
                              
                                 TestServiceA {

    @Async(
                              
                              "MyExecutor"
                              
                                )
    @Override
    
                              
                              
                                public
                              
                              
                                void
                              
                              
                                 testMethod1() {
        
                              
                              
                                for
                              
                               (
                              
                                int
                              
                               i = 0; i < 10; i++
                              
                                ) {
            
                              
                              
                                final
                              
                              
                                int
                              
                               index =
                              
                                 i;
            System.out.println(
                              
                              "A类方法一的 " + index + " 被执行,线程名:" +
                              
                                 Thread.currentThread().getName());
            
                              
                              
                                try
                              
                              
                                 {
                Thread.sleep(
                              
                              1000
                              
                                );
            } 
                              
                              
                                catch
                              
                              
                                 (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Async(
                              
                              "MyExecutor"
                              
                                )
    @Override
    
                              
                              
                                public
                              
                              
                                void
                              
                              
                                 testMethod2() {
        
                              
                              
                                for
                              
                               (
                              
                                int
                              
                               i = 0; i < 10; i++
                              
                                ) {
            
                              
                              
                                final
                              
                              
                                int
                              
                               index =
                              
                                 i;
            System.out.println(
                              
                              "A类方法二的 " + index + " 被执行,线程名:" +
                              
                                 Thread.currentThread().getName());
            
                              
                              
                                try
                              
                              
                                 {
                Thread.sleep(
                              
                              1000
                              
                                );
            } 
                              
                              
                                catch
                              
                              
                                 (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


                              
                              
                                /**
                              
                              
                                测试类B*
                              
                              
                                */
                              
                              
                                
@Service

                              
                              
                                public
                              
                              
                                class
                              
                               TestServiceBImpl 
                              
                                implements
                              
                              
                                 TestServiceB {

    @Async(
                              
                              "MyExecutor"
                              
                                )
    @Override
    
                              
                              
                                public
                              
                              
                                void
                              
                              
                                 testMethod1() {
        
                              
                              
                                for
                              
                               (
                              
                                int
                              
                               i = 0; i < 10; i++
                              
                                ) {
            
                              
                              
                                final
                              
                              
                                int
                              
                               index =
                              
                                 i;
            System.out.println(
                              
                              "B类方法一的 " + index + " 被执行,线程名:" +
                              
                                 Thread.currentThread().getName());
            
                              
                              
                                try
                              
                              
                                 {
                Thread.sleep(
                              
                              1000
                              
                                );
            } 
                              
                              
                                catch
                              
                              
                                 (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Async(
                              
                              "MyExecutor"
                              
                                )
    @Override
    
                              
                              
                                public
                              
                              
                                void
                              
                              
                                 testMethod2() {
        
                              
                              
                                for
                              
                               (
                              
                                int
                              
                               i = 0; i < 10; i++
                              
                                ) {
            
                              
                              
                                final
                              
                              
                                int
                              
                               index =
                              
                                 i;
            System.out.println(
                              
                              "B类方法二的 " + index + " 被执行,线程名:" +
                              
                                 Thread.currentThread().getName());
            
                              
                              
                                try
                              
                              
                                 {
                Thread.sleep(
                              
                              1000
                              
                                );
            } 
                              
                              
                                catch
                              
                              
                                 (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
                              
                            

3)写一个测试Controller接口,异步调用两个测试类的方法

                              
                                @RestController

                              
                              
                                public
                              
                              
                                class
                              
                              
                                 TestController{

    @Autowired
    
                              
                              
                                private
                              
                              
                                 TestServiceA testServiceA;

    @Autowired
    
                              
                              
                                private
                              
                              
                                 TestServiceB testServiceB;

    
                              
                              
                                /**
                              
                              
                                
     * 测试线程01
     
                              
                              
                                */
                              
                              
                                
    @GetMapping(value 
                              
                              = "/threadTest"
                              
                                )
    
                              
                              
                                public
                              
                              
                                void
                              
                              
                                 threadTest01() {
        System.out.println(
                              
                              "【线程一】" + "group:"+ Thread.currentThread().getThreadGroup() + "; id:" +Thread.currentThread().getId()+"; name:"+
                              
                                 Thread.currentThread().getName());

        testServiceA.testMethod1();
        testServiceA.testMethod2();

        testServiceB.testMethod1();
        testServiceB.testMethod2();
    }
}
                              
                            

  。

4)分析执行结果

 
                                2023-04-12 14:03:38.945 [http-nio-8085-exec-2] INFO  o.a.c.c.C.[.[.[/] - [log,173] - Initializing Spring DispatcherServlet 'dispatcherServlet'
                                
                                  
【线程一】group:java.lang.ThreadGroup[name
                                
                                =main,maxpri=10]; id:85; name:http-nio-8085-exec-2
                                
                                  
A类方法一的 
                                
                                0 被执行,线程名:customize-th-1
                                
                                  
A类方法二的 
                                
                                0 被执行,线程名:customize-th-2
                                
                                  
B类方法一的 
                                
                                0 被执行,线程名:customize-th-3
                                
                                  
A类方法一的 
                                
                                1 被执行,线程名:customize-th-1
                                
                                  
B类方法一的 
                                
                                1 被执行,线程名:customize-th-3
                                
                                  
A类方法二的 
                                
                                1 被执行,线程名:customize-th-2
                                
                                  
B类方法一的 
                                
                                2 被执行,线程名:customize-th-3
                                
                                  
A类方法二的 
                                
                                2 被执行,线程名:customize-th-2
                                
                                  
A类方法一的 
                                
                                2 被执行,线程名:customize-th-1
                                
                                  
A类方法一的 
                                
                                3 被执行,线程名:customize-th-1
                                
                                  
A类方法二的 
                                
                                3 被执行,线程名:customize-th-2
                                
                                  
B类方法一的 
                                
                                3 被执行,线程名:customize-th-3
                                
                                  
A类方法二的 
                                
                                4 被执行,线程名:customize-th-2
                                
                                  
A类方法一的 
                                
                                4 被执行,线程名:customize-th-1
                                
                                  
B类方法一的 
                                
                                4 被执行,线程名:customize-th-3
                                
                                  
A类方法二的 
                                
                                5 被执行,线程名:customize-th-2
                                
                                  
B类方法一的 
                                
                                5 被执行,线程名:customize-th-3
                                
                                  
A类方法一的 
                                
                                5 被执行,线程名:customize-th-1
                                
                                  
A类方法一的 
                                
                                6 被执行,线程名:customize-th-1
                                
                                  
B类方法一的 
                                
                                6 被执行,线程名:customize-th-3
                                
                                  
A类方法二的 
                                
                                6 被执行,线程名:customize-th-2
                                
                                  
A类方法一的 
                                
                                7 被执行,线程名:customize-th-1
                                
                                  
A类方法二的 
                                
                                7 被执行,线程名:customize-th-2
                                
                                  
B类方法一的 
                                
                                7 被执行,线程名:customize-th-3
                                
                                  
B类方法一的 
                                
                                8 被执行,线程名:customize-th-3
                                
                                  
A类方法二的 
                                
                                8 被执行,线程名:customize-th-2
                                
                                  
A类方法一的 
                                
                                8 被执行,线程名:customize-th-1
                                
                                  
B类方法一的 
                                
                                9 被执行,线程名:customize-th-3
                                
                                  
A类方法一的 
                                
                                9 被执行,线程名:customize-th-1
                                
                                  
A类方法二的 
                                
                                9 被执行,线程名:customize-th-2
                                
                                  
B类方法二的 
                                
                                0 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                1 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                2 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                3 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                4 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                5 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                6 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                7 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                8 被执行,线程名:customize-th-1
                                
                                  
B类方法二的 
                                
                                9 被执行,线程名:customize-th-1
                              

  。

【分析】 从结果可以看到,A类、B类的方法交替执行,但是他们的线程都来自同一个线程池“customize-th-”、也就是我自己配置的线程池。 不仅如此,它们还遵循我对线程池的配置(核心线程数3),每当正在运行的线程满3,不论是A类还是B类、接下来的任务就先放入队列,等有空余线程再执行。 从以上两点可以确认,A类和B类用的是同一个线程池 ,@Async注解使用自定义线程池异步执行任务,只要在注解后添加线程池配置名称@Async(“MyExecutor”)、就可以实现整个项目公用同一个线程池.

最后此篇关于@Async注解详解以及可能遇到的各种问题的文章就讲到这里了,如果你想了解更多关于@Async注解详解以及可能遇到的各种问题的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com