1. 应用场景

image-20240505182316011

以电商交易场景为例,⽤户⽀付订单这⼀核⼼操作的同时会涉及到下游物流发货、积分变更、购物⻋状态清空等多个⼦系统的变更。

当前业务的处理分⽀包括:

  • 主分⽀订单系统状态更新:由未⽀付变更为⽀付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更⽤户积分,更新⽤户积分表。
  • 购物⻋系统状态变更:清空购物⻋,更新⽤户购物⻋记录。

1.1 传统X事务方案:性能不足

为了保证上述四个分⽀的执⾏结果⼀致性,典型⽅案是基于 XA 协议的分布式事务系统来实现。将四个调⽤分⽀封装成包含四个独⽴事务分⽀的⼤事务。基于 XA 分布式事务的⽅案可以满⾜业务处理结果的正确性,但最⼤的缺点是多分⽀环境下资源锁定范围⼤,并发度低,随着下游分⽀的增加,系统性能会越来越差。

1.2 基于普通消息方案:一致性保证困难

image-20240505182524396

该⽅案中消息下游分⽀和订单系统变更的主分⽀很容易出现不⼀致的现象,例如:

  • 消息发送成功,订单没有执⾏成功,需要回滚整个事务。
  • 订单执⾏成功,消息没有发送成功,需要额外补偿才能发现不⼀致。
  • 消息发送超时未知,此时⽆法判断需要回滚订单还是提交订单变更。

1.3 基于 RocketMQ 分布式事务消息:⽀持最终⼀致性

上述普通消息⽅案中,普通消息和订单事务⽆法保证⼀致的原因,本质上是由于普通消息⽆法像单机数据库事务⼀样,具备提交、回滚和统⼀协调的能⼒。

⽽基于 RocketMQ 实现的分布式事务消息功能,在普通消息基础上,⽀持⼆阶段的提交能⼒。将⼆阶段提交和本地事务绑定,实现全局提交结果的⼀致性。

2. 功能原理

RocketMQ 事务消息是⽀持在分布式场景下保障消息⽣产和本地事务的最终⼀致性。交互流程如下图所示:

image-20240505182751683

1、⽣产者将消息发送⾄ Broker 。

2、Broker 将消息持久化成功之后,向⽣产者返回 Ack 确认消息已经发送成功,此时消息被标记为”暂不能投递“,这种状态下的消息即为半事务消息

3、⽣产者开始执⾏本地事务逻辑

4、⽣产者根据本地事务执⾏结果向服务端提交⼆次确认结果( Commit 或是 Rollback ),Broker 收到确认结果后处理逻辑如下:

  • ⼆次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。

  • ⼆次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。

5、在断⽹或者是⽣产者应⽤重启的特殊情况下,若 Broker 未收到发送者提交的⼆次确认结果,或 Broker 收到的⼆次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息⽣产者即⽣产者集群中任⼀⽣产者实例发起消息回查

  1. ⽣产者收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。

  2. ⽣产者根据检查到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤4对半事务消息进⾏处理。

3. 实战例子

为了便于⼤家理解事务消息 ,笔者新建⼀个⼯程⽤于模拟⽀付订单创建⽀付成功赠送积分的流程。

⾸先,我们创建⼀个真实的订单主题:order-topic 。然后在数据库中创建三张表 订单表事务⽇志表积分表。最后我们创建⼀个 Demo ⼯程,⽣产者模块⽤于创建⽀付订单、修改⽀付订单成功,消费者模块⽤于新增积分记录。

3.1 创建支付订单

调⽤订单⽣产者服务创建订单接⼝ ,在 t_order 表中插⼊⼀条⽀付订单记录。

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/order/insertPayOrder")
@ResponseBody
public ResponseEntity insert(Long userId) {
try {
Long orderId = orderService.insertOrder(userId);
return ResponseEntity.successResult(orderId);
} catch (Exception e) {
logger.error("insertPayOrder error:", e);
return ResponseEntity.failResult("生成订单失败");
}
}

3.2 调用生产者服务修改订单状态接口

接⼝的逻辑就是执⾏事务⽣产者的 sendMessageInTransaction ⽅法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@GetMapping("/order/updatePayOrderSuccess")
@ResponseBody
public ResponseEntity updatePayOrderSuccess(Long orderId) {
try {
OrderPO orderPO = orderService.getOrderById(orderId);
// 发送事务消息
Message message = new Message("order-topic", JSON.toJSONString(orderPO).getBytes());
TransactionSendResult sendResult = this.producer.sendMessageInTransaction(message, null);;
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
return ResponseEntity.successResult(orderId);
}
return ResponseEntity.failResult("修改订单失败");
} catch (Exception e) {
logger.error("updatePayOrderSuccess error:", e);
return ResponseEntity.failResult("修改订单失败");
}
}

⽣产者端需要配置事务⽣产者事务监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class RocketMQConfig {

// 生产者组
private final static String ORDER_PRODUCER_GROUP = "orderProducerGroup";

private final static Integer CORE_POOL_SIZE = 5;

private final static Integer MAXIMUM_POOL_SIZE = 10;

private final static Integer KEEP_ALIVE_TIME = 10;

// 执行事务任务的线程池
private static ThreadPoolExecutor TRANSACTION_EXECUTOR =
new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));

@Autowired
private TransactionListener transactionListener;

@Bean(value = "transactionMQProducer")
public TransactionMQProducer createTransactionProducer() throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer(ORDER_PRODUCER_GROUP);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendMsgTimeout(Integer.MAX_VALUE);
producer.setExecutorService(TRANSACTION_EXECUTOR);
producer.setTransactionListener(transactionListener);
producer.start();
return producer;
}

}

发送事务消息的⽅法内部包含三个步骤 :

image-20240508221415537

事务⽣产者⾸先发送半事务消息,发送成功后,⽣产者才开始执⾏本地事务逻辑

事务监听器实现了两个功能:执⾏本地事务 Broker 回查事务状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Component
public class TransactionListenerImpl implements TransactionListener {

private static Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);

@Autowired
private OrderService orderService;

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getTransactionId();
logger.info("开始执行本地事务,事务编号:" + transactionId);
try {
String orderPOJSON = new String(msg.getBody(), "UTF-8");
OrderPO orderPO = JSON.parseObject(orderPOJSON, OrderPO.class);
orderService.updateOrder(orderPO.getId(), transactionId);
logger.info("结束执行本地事务,事务编号:" + transactionId);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
logger.info("update order error: ", e);
logger.info("结束执行本地事务,事务编号:" + transactionId);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
try {
logger.info("检测本地事务,事务编号:" + transactionId);
TransactionLogPO transactionLogPO = orderService.getLogById(transactionId);
if (transactionLogPO != null) {
localTransactionState = LocalTransactionState.COMMIT_MESSAGE;
} else {
localTransactionState = LocalTransactionState.UNKNOW;
}
} catch (Exception e) {
logger.error("checkLocalTransaction error:", e);
localTransactionState = LocalTransactionState.UNKNOW;
}
logger.info("检测本地事务,事务编号:" + transactionId + " 事务状态:" + localTransactionState);
return localTransactionState;
}

}

执⾏本地事务的逻辑内部就是执⾏ orderService.updateOrder ⽅法。

⽅法执⾏成功则返回 LocalTransactionState.COMMIT_MESSAGE , 若执⾏失败则返回LocalTransactionState.ROLLBACK_MESSAGE 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Transactional(rollbackFor = Exception.class)
public void updateOrder(Long orderId, String transactionId) {
OrderPO orderPO = orderMapper.getById(orderId);
// 修改订单状态为已支付
orderPO.setOrderStatus(1);
int affectedCount = orderMapper.update(orderPO);
if (affectedCount <= 0) {
throw new RuntimeException("updateOrder error");
}
// 插入到事务日志表
TransactionLogPO transactionLogPO = new TransactionLogPO();
transactionLogPO.setId(transactionId);
transactionLogPO.setBizType(0);
transactionLogPO.setBizId(String.valueOf(orderPO.getId()));
transactionLogPO.setCreateTime(new Date());
transactionLogMapper.insert(transactionLogPO);
}

需要注意的是: orderService.updateOrder ⽅法添加了事务注解,并将修改订单状态和插⼊事务⽇志表放进⼀个事务内,避免订单状态和事务⽇志表的数据不⼀致。

最后,⽣产者根据本地事务执⾏结果向 Broker 提交⼆次确认结果

Broker 收到⽣产者确认结果后处理逻辑如下:

  • ⼆次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。

  • ⼆次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。

3.3 积分消费者消费消息,添加积分记录

当 Broker 将半事务消息标记为可投递时,积分消费者就可以开始消费主题 order-topic 的消息了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class RocketMQConfig {

//消费者组
private final static String ORDER_POINT_CONSUMER_GROUP = "orderPointConsumerGroup";

private final static String PRODUCT_CONSUMER_GROUP = "productConsumerGroup";

// 事务消费者
@Autowired
private OrderPointMessageListener orderPointMessageListener;

// 商品异构到 Elasticsearch
@Autowired
private ProductToESMessageListener productToESMessageListener;

@Bean
public DefaultMQPushConsumer createTransactionConsumer() throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(ORDER_POINT_CONSUMER_GROUP);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setConsumeMessageBatchMaxSize(1);
pushConsumer.subscribe("order-topic", "*");
pushConsumer.registerMessageListener(orderPointMessageListener);
pushConsumer.start();
return pushConsumer;
}
}

积分消费者服务,我们定义了消费者组名,以及订阅主题消费监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class OrderPointMessageListener implements MessageListenerConcurrently {

private final Logger logger = LoggerFactory.getLogger(OrderPointMessageListener.class);

@Autowired
private PointsMapper pointsMapper;

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String orderJSON = new String(messageExt.getBody(), "UTF-8");
logger.info("orderJSON:" + orderJSON);
OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
// 首先查询是否处理完成
PointsPO pointsPO = pointsMapper.getByOrderId(orderPO.getId());
if (pointsPO == null) {
Long id = SnowFlakeIdGenerator.getUniqueId(1023, 0);
pointsPO = new PointsPO();
pointsPO.setId(id);
pointsPO.setOrderId(orderPO.getId());
pointsPO.setUserId(orderPO.getUserId());
// 添加积分数 30
pointsPO.setPoints(30);
pointsPO.setCreateTime(new Date());
pointsPO.setRemarks("添加积分数 30");
pointsMapper.insert(pointsPO);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

}

在消费监听器逻辑⾥, 幂等⾮常重要 。当收到订单信息后,⾸先判断该订单是否有积分记录,若没有记录,才插⼊积分记录。⽽且我们在创建积分表时,订单编号也是唯⼀键,数据库中也必然不会存在相同订单的多条积分记录

4. 实现细节

4.1 事务half消息对用户不可见

下图展示了 RocketMQ 的存储模型,RocketMQ 采⽤的是混合型的存储结构,Broker 单个实例下所有的队列共⽤⼀个⽇志数据⽂件(即为 CommitLog )来存储。

消息数据写⼊到 commitLog 后,通过分发线程异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引⽂件)数据。

Broker 在接受到发送消息请求后,如果消息是 half 消息,先备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC 。

⽽该主题并不被消费者订阅,所以对于消费者是不可⻅的。

然后 RocketMQ 会开启⼀个定时任务,从 Topic 为RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进⾏消费,根据⽣产者组获取⼀个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

改变消息主题是 RocketMQ 的常⽤“套路”,延时消息的实现机制也是如此。

4.2 Commit和Rollback操作

RocketMQ 事务消息⽅案中引⼊了 Op 消息的概念,⽤ Op 消息标识事务消息已经确定的状态( Commit 或者 Rollback), Op 消息对应的主题是:RMQ_SYS_TRANS_OP_HALF_TOPIC 。

如果⼀条事务消息没有对应的 Op 消息,说明这个事务的状态还⽆法确定(可能是⼆阶段失败了)。

image-20240508215812897

引⼊ Op 消息后,事务消息⽆论是 Commit 或者 Rollback 都会记录⼀个 Op 操作。

Commit

Broker 写⼊ OP 消息,OP 消息的 body 指定 Commit 消息的 queueOffset,标记之前 Half 消息已被删除;同时,Broker 读取原 Half 消息,把 Topic 还原,重新写⼊ CommitLog,消费者则可以拉取消费;

Rollback

Broker 同样写⼊ OP 消息,流程和 Commit ⼀样。但后续不会读取和还原 Half 消息。这样消费者就不会消费到该消息。

4.3 事务消息状态回查

若⽣产者根据本地事务执⾏结果向 Broker 提交⼆次确认结果时,出现⽹络问题导致提交失败,那么需要通过⼀定的策略使这条消息最终被 Commit 或者 Rollback 。

Broker 采⽤了⼀种补偿机制,称为“状态回查”。

Broker 端对未确定状态的消息发起回查,将消息发送到对应的 Producer 端(同⼀个 Group 的 Producer ),由Producer 根据消息来检查本地事务的状态,进⽽执⾏ Commit 或者 Rollback 。

Broker 端通过对⽐ Half 消息和 Op 消息进⾏事务消息的回查并且推进 CheckPoint(记录那些事务消息的状态是确定的)。

image-20240508220047569

事务消息 check 流程扫描当前的 OP 消息队列,读取已经被标记删除的 Half 消息的 queueOffset 。如果发现某个 Half消息没有 OP 消息对应标记,并且已经超时( transactionTimeOut 默认 6 秒),则读取该 Half 消息重新写⼊ half 队列,并且发送 check 命令到原发送⽅检查事务状态;如果没有超时,则会等待后读取 OP 消息队列,获取新的 OP 消息。

值得注意的是,Broker 并不会⽆休⽌的的信息事务状态回查,默认回查15次,如果15次回查还是⽆法得知事务状态,Broker 默认回滚该消息。

5. 总结

事务消息也具备⼀定的局限性:

1、事务⽣产者和消费者共同协作才能保证最终⼀致性;

2、事务⽣产者需要实现事务监听器,并且保存事务的执⾏结果(⽐如事务⽇志表) ;

3、消费者要保证幂等。消费失败时,通过重试告警**+**⼈⼯介⼊等⼿段保证消费结果正确。

同时,由于事务消息的机制原因,我们在使⽤ RocketMQ 事务功能时,也需要注意如下两点:

1、避免⼤量未决事务导致超时

Broker 在事务提交阶段异常的情况下会发起事务回查,从⽽保证事务⼀致性。但⽣产者应该尽量避免本地事务返回未知结果,⼤量的事务检查会导致系统性能受损,容易导致事务处理延迟。

2、事务超时机制

半事务消息被⽣产者发送 Broker 后,如果在指定时间内服务端⽆法确认提交或者回滚状态,则消息默认会被回滚。


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.