§ 学习目标

掌握基于注解@Async的异步功能开发,线程池配置等

§ 练习场景

额度管理超额,触发异步预警短信发送,短信异步发送,发送耗时随机1~5秒, 请整体请求的响应时间不超过3秒:

  1. 2s内完成短信发送,提示成功
  2. 2s内未完成短信发送,提示失败

前提:

  • 基于章节CRUD章节,进行本章练习
  • 准备测试数据
insert into limit_info (LIMIT_ID, LIMIT_CODE, LIMIT_AMT, LIMIT_DESC, LIMIT_USER_ID)values ('00000', '101010', '5000', 'airabl', 'air');
1

§ 操作步骤

§ 启用配置

启用Async配置,引入POM依赖:

 <dependency>
            <groupId>cn.com.yusys.yusp</groupId>
            <artifactId>yusp-common-app</artifactId>
</dependency>
1
2
3
4

修改boostrap配置

application:    
    async:
        enabled: true              #功能开关
1
2
3

修改apollo配置中心 appilcation的配置

application:    
    async:
        core-pool-size: 2          #线程的最小数量
        max-pool-size: 50          #线程的最大数量
        queue-capacity: 10000      #队列最大长度
        enabled: true              #功能开关
1
2
3
4
5
6

§ 增加异步消息发送功能

package cn.com.yusys.yusp.service
 
 
import java.util.concurrent.Future;
public interface MessageService {
          
          /**
           * 异步短信发送.
           * 
           * @param context
           * @return 响应短信执行结果
           */
          Future<String> sendMmsByAsync(String context);
}
package cn.com.yusys.yusp.service.impl;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import cn.com.yusys.yusp.service.MessageService;
/**
 * 异步消息发送.
 * 
 * @since 2.1.1
 */
@Component
public class MessageServiceImpl implements MessageService {
          private static final Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
          /**
           * 异步消息发送
           * <p>
           * 模拟消息发送延迟,随机等待0~5秒
           */
          @Override
          @Async("taskExecutor")
          public Future<String> sendMmsByAsync(String context) {
                    logger.info("[开始发送]");
                    StopWatch watch = new StopWatch();
                    watch.start();
                    // 模拟耗时0~5 秒
                    int sleepTime = new Random().nextInt(5);
                    logger.info("模拟耗时[{}]秒", sleepTime);
                    try {
                               TimeUnit.SECONDS.sleep(sleepTime);// do something  ......
                    } catch (InterruptedException e) {
                               logger.error("异常终止发送,msg:[{}]", e.getMessage());
                               e.printStackTrace();
                    }
                    watch.stop();
                    logger.info("[发送结束],{}", watch.shortSummary());
                    return new AsyncResult<String>("success");
          }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

§ 基于LimitInfo服务层、资源层扩展,增加chkamt检查

为LimitInfoResource 增加/chkamt接口

@RestController
@RequestMapping("/api/limitinfo")
public class LimitInfoResource extends CommonResource<LimitInfo, String> {
          @Autowired
          private LimitInfoService limitInfoService;
          @Override
          protected CommonService getCommonService() {
                    // TODO 自动生成的方法存根
                    return limitInfoService;
          }
 
 
          //其他代码已省略....
 
          /**
           * 检查是否超额
           * 
           * @param limitId
           * @return
           */
          @PostMapping(value = "/chkamt")
          public String chkamt(@RequestBody ChkDto chkDto) {
                    boolean chkStatus = limitInfoService.chkLimitInfo(chkDto.getPayMoney(),chkDto.getLimitId());
                    
                    if (!chkStatus) {
                               return "failed";
                    }
                    
                    return "success";
          }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

为接口增加DTO数据传输对象,用于传递limitId和payMoney值

package cn.com.yusys.yusp.domain;
public class ChkDto {
          private String limitId;
          private double payMoney;
          public String getLimitId() {
                    return limitId;
          }
          public void setLimitId(String limitId) {
                    this.limitId = limitId;
          }
          public double getPayMoney() {
                    return payMoney;
          }
          public void setPayMoney(double payMoney) {
                    this.payMoney = payMoney;
          }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

修改额度管理业务逻辑层,增加额度校验逻辑,并处罚短信预警

package cn.com.yusys.yusp.service;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import cn.com.yusys.yusp.commons.mapper.CommonMapper;
import cn.com.yusys.yusp.commons.service.CommonService;
import cn.com.yusys.yusp.domain.LimitInfo;
import cn.com.yusys.yusp.repository.mapper.LimitInfoMapper;
 
@Service
public class LimitInfoService extends CommonService {
          private static final Logger logger = LoggerFactory.getLogger(LimitInfoService.class);
          private static final long SEND_MSG_TIME_OUT = 2L;
          @Autowired
          private LimitInfoMapper limitInfoMapper;
          @Autowired
          private MessageService messageService;
          @Override
          protected CommonMapper<?> getMapper() {
                    return limitInfoMapper;
          }
          
     //已省略其他方法...
 
          /**
           * 根据配置的限制金额,检查本次支付是否超限,如果超限,触发短信告警,提示用户.
           * <p>
           * 短信发送,超过3秒,则失败.
           * 
           * @param limitInfo
           * @return false 检查失败 true 检查成功
           */
          public boolean chkLimitInfo(double payMony, String limitId) {
                    LimitInfo limitInfo = limitInfoMapper.selectByPrimaryKey(limitId);
                    if (limitInfo == null) {
                               return false;
                    }
                    // 检查是否超出限额
                    if (payMony > Double.parseDouble(limitInfo.getLimitAmt())) {
                               Future<String> result = messageService.sendMmsByAsync("超出限额,检查失败");
                               // do otherthing ...
                               logger.info("do otherthing ...");
                               
                               try {
                                         logger.info("短信发送结果:{}", result.get(SEND_MSG_TIME_OUT, TimeUnit.SECONDS));
                               } catch (Exception e) {
                                         logger.info("短信发送失败:{}", e.getMessage());
                                         e.printStackTrace();
                               }
                               return false;
                    }
                    // do otherthing ...
                    logger.info("do otherthing ...");
                    
                    return true;
          }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

§ 测试

  • 启动应用
  • url:http://localhost:6001/api/limitinfo/chkamt
  • post方法

§ 测试场景1,校验规则0000,支付金额2000,预计结果:success,耗时不超过3秒

{
 "limitId":"00000",
 "payMoney": 2000
 }
1
2
3
4

Async_01

Async_02

[custmng, 172.16.20.208] INFO  2019-03-18 11:58:31.212 [XNIO-2 task-1, 75a03534a08f55ed, 75a03534a08f55ed] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:124 - do otherthing ...
1

§ 测试场景2,校验规则0000,支付金额5001,预计结果:校验失败,短信超时(耗时4秒),请求整体耗时不超过3秒

{
"limitId":"00000",
"payMoney": 5001
}
1
2
3
4

Async_03

[custmng, 172.16.20.208] INFO  2019-03-18 12:00:55.793 [XNIO-2 task-2, b0e06b994a1cceb8, b0e06b994a1cceb8] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:111 - do otherthing ...
[custmng, 172.16.20.208] INFO  2019-03-18 12:00:55.804 [microservice-Executor-1, b0e06b994a1cceb8, b918eac6552ee422] cn.com.yusys.yusp.service.impl.MessageServiceImpl.sendMmsByAsync:33 - [开始发送]
[custmng, 172.16.20.208] INFO  2019-03-18 12:00:55.805 [microservice-Executor-1, b0e06b994a1cceb8, b918eac6552ee422] cn.com.yusys.yusp.service.impl.MessageServiceImpl.sendMmsByAsync:39 - 模拟耗时[4][custmng, 172.16.20.208] INFO  2019-03-18 12:00:57.795 [XNIO-2 task-2, b0e06b994a1cceb8, b0e06b994a1cceb8] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:116 - 短信发送失败:null
[custmng, 172.16.20.208] INFO  2019-03-18 12:00:57.795 [XNIO-2 task-2, b0e06b994a1cceb8, b0e06b994a1cceb8] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:116 - 短信发送失败:null
java.util.concurrent.TimeoutException
          at java.util.concurrent.FutureTask.get(Unknown Source)
          at cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo(LimitInfoService.java:114)
1
2
3
4
5
6
7
8

§ 测试场景3,校验规则0000,支付金额5001,预计结果:校验失败,短信正常下发,请求整体耗时不超过3秒

Async_04

[custmng, 172.16.20.208] INFO  2019-03-18 12:03:20.550 [XNIO-2 task-3, 21e236c0c46c9425, 21e236c0c46c9425] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:111 - do otherthing ...
[custmng, 172.16.20.208] INFO  2019-03-18 12:03:20.551 [microservice-Executor-2, 21e236c0c46c9425, d6ce728df83e1667] cn.com.yusys.yusp.service.impl.MessageServiceImpl.sendMmsByAsync:39 - 模拟耗时[1][custmng, 172.16.20.208] INFO  2019-03-18 12:03:21.552 [microservice-Executor-2, 21e236c0c46c9425, d6ce728df83e1667] cn.com.yusys.yusp.service.impl.MessageServiceImpl.sendMmsByAsync:48 - [发送结束],StopWatch '': running time (millis) = 1001
[custmng, 172.16.20.208] INFO  2019-03-18 12:03:21.552 [XNIO-2 task-3, 21e236c0c46c9425, 21e236c0c46c9425] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:114 - 短信发送结果:success
1
2
3
4

§ Async 进阶

§ Spring 异步线程池接口类

异步执行的前提,需要线程池提供执行保障,而Spring基于java.util.concurrent.Executor,提供了5类线程池的实现:

  1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程
  2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
  3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
  4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类
  5. ThreadPoolTaskExecutor :推荐。对java.util.concurrent.ThreadPoolExecutor的包装

§ @Async为方法提供异步支持

相对常用Runnable或Thread的方式,使用@Async方式更加便捷,支持如下三种调用:

  1. 异步调用,返回值为void
  2. 带参数的异步调用 异步方法可以传入参数
  3. 异常调用返回Future(get() 、get(long timeOut,TimeUnit timeUnit))

§ 异步方法处理

在异步中主要有有两种异常处理方法:

1.对于方法返回值是Futrue的异步方法:

  • 在调用future的get时捕获异常
  • 在异步方法中直接捕获异常

2.对于返回值是void的异步方法:通过AsyncUncaughtExceptionHandler处理异常

最后更新于: 4/28/2022, 5:10:13 PM