§ 学习目标
掌握基于注解@Async的异步功能开发,线程池配置等
§ 练习场景
额度管理超额,触发异步预警短信发送,短信异步发送,发送耗时随机1~5秒, 请整体请求的响应时间不超过3秒:
- 2s内完成短信发送,提示成功
- 2s内未完成短信发送,提示失败
前提:
- 基于章节CRUD章节,进行本章练习
- 准备测试数据
insert into limit_info (LIMIT_ID, LIMIT_CODE, LIMIT_AMT, LIMIT_DESC, LIMIT_USER_ID)values ('00000', '101010', '5000', 'airabl', 'air');
1
§ 操作步骤
§ 启用配置
启用Async配置,引入POM依赖:
<dependency>
<groupId>cn.com.yusys.yusp</groupId>
<artifactId>yusp-common-app</artifactId>
</dependency>
1
2
3
4
2
3
4
修改boostrap配置
application:
async:
enabled: true #功能开关
1
2
3
2
3
修改apollo配置中心 appilcation的配置
application:
async:
core-pool-size: 2 #线程的最小数量
max-pool-size: 50 #线程的最大数量
queue-capacity: 10000 #队列最大长度
enabled: true #功能开关
1
2
3
4
5
6
2
3
4
5
6
§ 增加异步消息发送功能
package cn.com.yusys.yusp.service
import java.util.concurrent.Future;
public interface MessageService {
/**
* 异步短信发送.
*
* @param context
* @return 响应短信执行结果
*/
Future<String> sendMmsByAsync(String context);
}
package cn.com.yusys.yusp.service.impl;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import cn.com.yusys.yusp.service.MessageService;
/**
* 异步消息发送.
*
* @since 2.1.1
*/
@Component
public class MessageServiceImpl implements MessageService {
private static final Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
/**
* 异步消息发送
* <p>
* 模拟消息发送延迟,随机等待0~5秒
*/
@Override
@Async("taskExecutor")
public Future<String> sendMmsByAsync(String context) {
logger.info("[开始发送]");
StopWatch watch = new StopWatch();
watch.start();
// 模拟耗时0~5 秒
int sleepTime = new Random().nextInt(5);
logger.info("模拟耗时[{}]秒", sleepTime);
try {
TimeUnit.SECONDS.sleep(sleepTime);// do something ......
} catch (InterruptedException e) {
logger.error("异常终止发送,msg:[{}]", e.getMessage());
e.printStackTrace();
}
watch.stop();
logger.info("[发送结束],{}", watch.shortSummary());
return new AsyncResult<String>("success");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
§ 基于LimitInfo服务层、资源层扩展,增加chkamt检查
为LimitInfoResource 增加/chkamt接口
@RestController
@RequestMapping("/api/limitinfo")
public class LimitInfoResource extends CommonResource<LimitInfo, String> {
@Autowired
private LimitInfoService limitInfoService;
@Override
protected CommonService getCommonService() {
// TODO 自动生成的方法存根
return limitInfoService;
}
//其他代码已省略....
/**
* 检查是否超额
*
* @param limitId
* @return
*/
@PostMapping(value = "/chkamt")
public String chkamt(@RequestBody ChkDto chkDto) {
boolean chkStatus = limitInfoService.chkLimitInfo(chkDto.getPayMoney(),chkDto.getLimitId());
if (!chkStatus) {
return "failed";
}
return "success";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
为接口增加DTO数据传输对象,用于传递limitId和payMoney值
package cn.com.yusys.yusp.domain;
public class ChkDto {
private String limitId;
private double payMoney;
public String getLimitId() {
return limitId;
}
public void setLimitId(String limitId) {
this.limitId = limitId;
}
public double getPayMoney() {
return payMoney;
}
public void setPayMoney(double payMoney) {
this.payMoney = payMoney;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
修改额度管理业务逻辑层,增加额度校验逻辑,并处罚短信预警
package cn.com.yusys.yusp.service;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import cn.com.yusys.yusp.commons.mapper.CommonMapper;
import cn.com.yusys.yusp.commons.service.CommonService;
import cn.com.yusys.yusp.domain.LimitInfo;
import cn.com.yusys.yusp.repository.mapper.LimitInfoMapper;
@Service
public class LimitInfoService extends CommonService {
private static final Logger logger = LoggerFactory.getLogger(LimitInfoService.class);
private static final long SEND_MSG_TIME_OUT = 2L;
@Autowired
private LimitInfoMapper limitInfoMapper;
@Autowired
private MessageService messageService;
@Override
protected CommonMapper<?> getMapper() {
return limitInfoMapper;
}
//已省略其他方法...
/**
* 根据配置的限制金额,检查本次支付是否超限,如果超限,触发短信告警,提示用户.
* <p>
* 短信发送,超过3秒,则失败.
*
* @param limitInfo
* @return false 检查失败 true 检查成功
*/
public boolean chkLimitInfo(double payMony, String limitId) {
LimitInfo limitInfo = limitInfoMapper.selectByPrimaryKey(limitId);
if (limitInfo == null) {
return false;
}
// 检查是否超出限额
if (payMony > Double.parseDouble(limitInfo.getLimitAmt())) {
Future<String> result = messageService.sendMmsByAsync("超出限额,检查失败");
// do otherthing ...
logger.info("do otherthing ...");
try {
logger.info("短信发送结果:{}", result.get(SEND_MSG_TIME_OUT, TimeUnit.SECONDS));
} catch (Exception e) {
logger.info("短信发送失败:{}", e.getMessage());
e.printStackTrace();
}
return false;
}
// do otherthing ...
logger.info("do otherthing ...");
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
§ 测试
- 启动应用
- url:http://localhost:6001/api/limitinfo/chkamt
- post方法
§ 测试场景1,校验规则0000,支付金额2000,预计结果:success,耗时不超过3秒
{
"limitId":"00000",
"payMoney": 2000
}
1
2
3
4
2
3
4


[custmng, 172.16.20.208] INFO 2019-03-18 11:58:31.212 [XNIO-2 task-1, 75a03534a08f55ed, 75a03534a08f55ed] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:124 - do otherthing ...
1
§ 测试场景2,校验规则0000,支付金额5001,预计结果:校验失败,短信超时(耗时4秒),请求整体耗时不超过3秒
{
"limitId":"00000",
"payMoney": 5001
}
1
2
3
4
2
3
4

[custmng, 172.16.20.208] INFO 2019-03-18 12:00:55.793 [XNIO-2 task-2, b0e06b994a1cceb8, b0e06b994a1cceb8] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:111 - do otherthing ...
[custmng, 172.16.20.208] INFO 2019-03-18 12:00:55.804 [microservice-Executor-1, b0e06b994a1cceb8, b918eac6552ee422] cn.com.yusys.yusp.service.impl.MessageServiceImpl.sendMmsByAsync:33 - [开始发送]
[custmng, 172.16.20.208] INFO 2019-03-18 12:00:55.805 [microservice-Executor-1, b0e06b994a1cceb8, b918eac6552ee422] cn.com.yusys.yusp.service.impl.MessageServiceImpl.sendMmsByAsync:39 - 模拟耗时[4]秒
[custmng, 172.16.20.208] INFO 2019-03-18 12:00:57.795 [XNIO-2 task-2, b0e06b994a1cceb8, b0e06b994a1cceb8] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:116 - 短信发送失败:null
[custmng, 172.16.20.208] INFO 2019-03-18 12:00:57.795 [XNIO-2 task-2, b0e06b994a1cceb8, b0e06b994a1cceb8] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:116 - 短信发送失败:null
java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(Unknown Source)
at cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo(LimitInfoService.java:114)
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
§ 测试场景3,校验规则0000,支付金额5001,预计结果:校验失败,短信正常下发,请求整体耗时不超过3秒

[custmng, 172.16.20.208] INFO 2019-03-18 12:03:20.550 [XNIO-2 task-3, 21e236c0c46c9425, 21e236c0c46c9425] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:111 - do otherthing ...
[custmng, 172.16.20.208] INFO 2019-03-18 12:03:20.551 [microservice-Executor-2, 21e236c0c46c9425, d6ce728df83e1667] cn.com.yusys.yusp.service.impl.MessageServiceImpl.sendMmsByAsync:39 - 模拟耗时[1]秒
[custmng, 172.16.20.208] INFO 2019-03-18 12:03:21.552 [microservice-Executor-2, 21e236c0c46c9425, d6ce728df83e1667] cn.com.yusys.yusp.service.impl.MessageServiceImpl.sendMmsByAsync:48 - [发送结束],StopWatch '': running time (millis) = 1001
[custmng, 172.16.20.208] INFO 2019-03-18 12:03:21.552 [XNIO-2 task-3, 21e236c0c46c9425, 21e236c0c46c9425] cn.com.yusys.yusp.service.LimitInfoService.chkLimitInfo:114 - 短信发送结果:success
1
2
3
4
2
3
4
§ Async 进阶
§ Spring 异步线程池接口类
异步执行的前提,需要线程池提供执行保障,而Spring基于java.util.concurrent.Executor,提供了5类线程池的实现:
- SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程
- SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
- ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
- SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类
- ThreadPoolTaskExecutor :推荐。对java.util.concurrent.ThreadPoolExecutor的包装
§ @Async为方法提供异步支持
相对常用Runnable或Thread的方式,使用@Async方式更加便捷,支持如下三种调用:
- 异步调用,返回值为void
- 带参数的异步调用 异步方法可以传入参数
- 异常调用返回Future(get() 、get(long timeOut,TimeUnit timeUnit))
§ 异步方法处理
在异步中主要有有两种异常处理方法:
1.对于方法返回值是Futrue的异步方法:
- 在调用future的get时捕获异常
- 在异步方法中直接捕获异常
2.对于返回值是void的异步方法:通过AsyncUncaughtExceptionHandler处理异常