§ 原理

借助MQ的消息可靠传递,实现业务间解耦、事务强一致

  • 生产者发送消息做可靠性检查,确保消息真正投递出去
  • 消费者做幂等,确保业务没有重复执行
  • 消费者做异常重试,反复出错时需要捕捉异常并记录,以便手工干预

§ 本文目标

借助MQ的消息可靠传递,实现一个分布式事务案例

§ 场景

以支付宝转账到余额宝为例,在支付宝已经扣款成功的情况下,余额宝一定收到转账

  • 支付宝和余额宝是两个微服务
  • 用户用支付宝转1万元到余额宝
支付宝账户先扣除金额,MQ通知余额宝账户添加金额;
>> 支付宝账户表: update A set amount = amount - 10000 where userId = 1;
>> 余额宝账户表: update B set amount = amount + 10000 where userId = 1;
1
2
3

§ 操作步骤

§ 创建队列

创建一个持久化的队列,名称为money

mq_01

§ maven引入

<dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4

§ 添加配置

spring:   
        rabbitmq:
        host: 110.80.17.26 #MQ地址
        port: 56672 #MQ端口
        username: admin #MQ用户名
        password: admin #MQ密码
        publisher-confirms: true #开启消息发送成功监听
        publisher-returns: true #开启消息发送失败监听
        listener:
            simple:
                acknowledge-mode: manual #手动提交事务
1
2
3
4
5
6
7
8
9
10
11

§ 余额宝端代码编写

监听支付宝发到消息队列中的消息,做余额宝账号金额更新

package cn.com.yusys.yusp.echain.server.clientdemo.yuer;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
/**
 * 余额宝端消息监听
 * @author figue
 * @since 2019-03-21
 */
@Component
public class YuErBaoMessageListeners {
          private final Logger log = LoggerFactory.getLogger(this.getClass());
          private ObjectMapper mapper = new ObjectMapper();
 
          /**
           * 监听消息队列 
           * @param message  消息内容
           * @param channel  消息渠道
           * @throws IOException  异常
           */
          @RabbitListener(queues = "money")
          @RabbitHandler
          public void receiveQueue(Message message, Channel channel) throws IOException {
                    String msg = "";
                    try {
                               // 业务处理逻辑
                              msg = new String(message.getBody());
                               Map data = mapper.readValue(msg, HashMap.class);
                               retry(data);
                               channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 手动应答消息已经处理
                    } catch (Exception e) {
                               log.error("MQ接收消息内容[" + msg + "],后处理异常:" + e);
                               channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 手动应答消息已经处理
                    }
          }
 
          /**
           * 更新余额宝账户金额
           * 
           * @param map
           */
          @Transactional
          public void bizOp(Map<String, Object> map) {
                    // B_MESSAGE交易流水表,主键是【交易编号】,重复插入会报异常,类似于幂等操作
                    System.out.println(
                               "【余额宝记账操作】重复插入会报异常,类似于幂等操作 insert into B_MESSAGE(transId,userId,money) values (000001,1,10000)");
                    // 更新余额宝账户金额
                    System.out.println("【余额宝账户入款】 update B set amount = amount +10000 where userId = 1");
 
          }
 
          /**
           * 一直报错,重试次数用完了,保存如下信息供人工干预
           * 
           * @param map
           * @throws JsonProcessingException
           */
          public void failed(Map<String, Object> map) throws JsonProcessingException {
                    System.out.println("一直报错,重试次数用完了,保存如下信息供人工干预:\n" + mapper.writeValueAsString(map));
          }
 
          /**
           * 异常时最多重试 3次,成功为止
           * 
           * @param map
           *            输入参数
           */
          private void retry(Map<String, Object> map) {
                    // 构建重试模板实例
                    RetryTemplate retryTemplate = new RetryTemplate();
                    // 设置重试次数
                    SimpleRetryPolicy policy = new SimpleRetryPolicy(3,
                                         Collections.<Class<? extends Throwable>, Boolean>singletonMap(Exception.class, true));
                    // 设置重试间隔时间
                    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
                    fixedBackOffPolicy.setBackOffPeriod(100);
                    retryTemplate.setRetryPolicy(policy);
                    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
                    // 编写业务处理代码逻辑
                    final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
                               public Object doWithRetry(RetryContext context) throws Exception {
                                         System.out.println("第" + (1 + context.getRetryCount()) + "次处理");
                                         try {
                                                   bizOp(map);
                                         } catch (Exception e) {
                                                   e.printStackTrace();
                                                   throw new Exception("捕捉到业务处理异常,需要抛出");// 这个点特别注意,重试的根源通过Exception返回
                                         }
                                         return null;
                               }
                    };
                    // 重试次数执行完依然报错,走如下逻辑
                    final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
                               public Object recover(RetryContext context) throws Exception {
                                         failed(map);
                                         return null;
                               }
                    };
                    try {
                               // 由retryTemplate 执行execute方法开始逻辑执行
                               retryTemplate.execute(retryCallback, recoveryCallback);
                    } catch (Exception e) {
                               e.printStackTrace();
                    }
          }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

§ 支付宝端代码编写

向支付宝账号扣款,同时发消息到队列中,通知余额宝账号金额更新

配置MQ发送过程监听

package cn.com.yusys.yusp.echain.server.clientdemo.zhifu;
import javax.annotation.PostConstruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 配置MQ发送过程监听
 * @author figue
 *
 */
@Configuration
public class RabbmitMqConfig {
          
          @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
          // 设置发送成功回调
          rabbitTemplate.setConfirmCallback(initConfirmCallback());
          // 设置发送失败回调
          rabbitTemplate.setReturnCallback(initReturnCallback());
    }
          
          @Bean
          public ReturnCallback initReturnCallback(){
                    return new FailedListener();
          }
          
          @Bean
          public ConfirmCallback initConfirmCallback(){
                    return new SuccessListener();
          }
}
package cn.com.yusys.yusp.echain.server.clientdemo.zhifu;
import java.util.HashMap;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
 * 消息发送失败时,监听被触发
 * 一般像队列不存在,或者网络中断时会触发,且第一个触发
 * @author figue
 *
 */
public class FailedListener implements ReturnCallback{
          private ObjectMapper mapper = new ObjectMapper();
          @Override
          public void returnedMessage(Message message, int state, String result, String arg3, String queneName) {
                    System.out.println(message+","+state+","+result+","+arg3+","+queneName);
                    
                    try {
                               String msg = new String(message.getBody());
                               HashMap data = mapper.readValue(msg, HashMap.class);// 失败的消息内容
                               System.out.println("失败的转账记录:"+mapper.writeValueAsString(data));
                               System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
                    } catch (Exception e) {
                               e.printStackTrace();
                    }
          }  
    
}
package cn.com.yusys.yusp.echain.server.clientdemo.zhifu;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
 * 消息发送成功时触发
 * 测试发现队列不存在也会触发,但会在FailedListener之后触发
 * @author figue
 *
 */
public class SuccessListener implements ConfirmCallback{
          @Override
          public void confirm(CorrelationData data, boolean success, String result) {
                    System.out.println(data+","+success+","+result);
                    if(null!=data){
                               System.out.println("业务主键:"+data.getId());
                    }
                    if(success){        
                               boolean resultT = true;// 获取流水表状态,为fail则失败
                               if(resultT){
                                         //更新支付宝账户金额
                                         System.out.println("消息发送到MQ成功");
                                         System.out.println("【支付宝账户扣款】  update A set amount = amount - 10000 where userId = 1");
                               }else{
                                         System.out.println("消息发送到MQ失败");
                                         System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
                               }                   
                    }else{
                               System.out.println("消息发送到MQ失败原因:"+result);
                               System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
                    }                   
          }
          
} 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

转账接口编写

package cn.com.yusys.yusp.echain.server.clientdemo.zhifu;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
 * 支付宝端业务测试
 * @author figue
 * @since 2018-03-21
 */
@RestController
@RequestMapping("/api/test")
public class ZhiFuBaoResource {
          /**
           * 测试接口
           * @throws AmqpException
           * @throws JsonProcessingException
           */
    @GetMapping("/mq")
    public void trans1() throws AmqpException, JsonProcessingException {
          trans();
    }
    @Autowired
          private RabbitTemplate rabbitTemplate;
          private ObjectMapper mapper = new ObjectMapper();
          /**
           * 转账方法
           * @return
           * @throws AmqpException
           * @throws JsonProcessingException
           */
          @Transactional
          public boolean trans() throws AmqpException, JsonProcessingException {
                    Map data = new HashMap<String, Object>();
                    String transId = "00000001";//生成业务流水号
                    data.put("transId", transId);
                    data.put("userId", "1");
                    data.put("money", 10000);
                    
                    //A_MESSAGE交易流水表,主键是【交易编号】,重复插入会报异常,类似于幂等操作
                    System.out.println("【支付宝记账操作】重复插入会报异常,类似于幂等操作  insert into A_MESSAGE(transId,userId,money) values (000001,1,10000)");
                               
                    // 流水号传入,方便消息发送失败时做操作
                    CorrelationData correlationData = new CorrelationData(transId);
                    rabbitTemplate.convertAndSend("","money",mapper.writeValueAsString(data),correlationData);
                    return true;
          }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

§ 测试

浏览器直接访问 /api/test/mq

控制台结果如下:

【支付宝记账操作】重复插入会报异常,类似于幂等操作  insert into A_MESSAGE(transId,userId,money) values (000001,1,10000)
【余额宝记账操作】重复插入会报异常,类似于幂等操作 insert into B_MESSAGE(transId,userId,money) values (000001,1,10000)
【余额宝账户入款】 update B set amount = amount + 10000 where userId = 1
【支付宝账户扣款】 update A set amount = amount - 10000 where userId = 1
1
2
3
4
最后更新于: 4/28/2022, 5:10:13 PM