什么是事务消息 事务消息就是将发送消息和本地数据库操作融合为同一个事务,二者要么都成功,要么都失败,不能出现一个操作成功另一操作失败的情况。 以用户注册成功时向用户发送欢迎邮件为例。有新用户注册时,Producer 向MQ发送新用户信息,消费者消费消息发送欢迎邮件。 此时 Producer 的操作可以简化为两步:① 将新用户插入到数据库 ② 发送MQ消息 一般情况下,我们会将发送消息的操作写在数据库的事务里,尤其是将发送消息的操作放在最后一步,这样当消息发送异常时可以回滚数据库事务。向下面这样:
1 2 3 4 5 6 7 8 @Transactional public void saveUser(User user){ // .... // 将用户保存到数据库 // ... // 发送MQ消息 }
这样看似没什么问题,如果插入数据库失败也不会发送消息,发送消息失败整个事务也会回滚。 但是有一种情况会导致二者不一致,就是当插入数据库成功,消息也发送成功,但是由于网络等原因,Producer超时未收到 Broker 的确认(rocketMQ 同步发送方式需要接收 Broker 的确认),此时 Producer 会抛出异常,认为消息发送失败,进而导致本地事务回滚。导致的最终结果就是消息被消费,但是数据库中却没有用户的信息。 这种方式还有一种缺陷,当消息发送成功后会立即被消费者消费,但是此时 Producer 本地的数据库事务可能还没有提交,即使我们将发送消息的操作放在最后一步,我们也不能保证消费者拿到消息时 Producer 的本地事务已经提交。如果消费业务依赖于 Producer 的本地事务,此时消费者就不能从数据库中获取到 Producer 保存的数据。
RocketMQ 事务消息的原理 RocketMQ 事务消息将消息的发送分解为两个阶段: 第一阶段:发送消息,Producer把消息发送到 Broker ,但是此时该消息还不能被投递给消费者,此时消息的状态被称为半消息(Half Message) 。 第二阶段:提交消息。类似于数据库事务的提交,当对半消息进行二次确认,对消息进行提交或者回滚,成功提交的消息才可以被投递给消费者,回滚的消息会被删除。 事务状态回查 一般情况下,Producer 根据本地事务的执行结果,主动对半消息发送二次确认(commit 或者 rollback),但是可能由于网络或者程序代码问题等原因 Broker 未收到二次确认的消息。此时 Broker 会主动向 Producer 发起事务状态回查,根据回查的结果决定消息的去留。 Tips:若未收到来自 Producer 的二次确认,Broker默认每隔 1分钟 回查一次,最多回查 15 次,若达到最大次数后仍未提交或者回滚,消息会被删除。 代码示例 以Spring Boot 整合 RocketMQ 为例,业务:新用户注册时发送MQ消息
UserService 提供保存以及查询用户的数据库操作方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Service public class UserService { @Autowired private JdbcTemplate jdbcTemplate; @Transactional public void addUser(User user){ String sql = "INSERT INTO t_user (username, password, email) VALUES (?, ?, ?)" ; jdbcTemplate.update(sql, user.getUsername(), user.getPassword(), user.getEmail()); } public User getByUsername(String username){ String sql = "SELECT * FROM t_user WHERE username = ?" ; return jdbcTemplate.queryForObject(sql, new RowMapper<User>() { @Override public User mapRow(ResultSet resultSet, int i) throws SQLException { User user = new User(); user.setUsername(resultSet.getString("username" )); return user; } }, username); } }
实现 RocketMQLocalTransactionListener 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "user_tx_producer_group" ) public class UserTransactionListener implements RocketMQLocalTransactionListener { @Autowired private UserService userService; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { log.info("开始执行本地事务" ); try { User user = (User) arg; userService.addUser(user); }catch (Exception e){ log.error("本地事务执行异常, 回滚消息" ); return RocketMQLocalTransactionState.ROLLBACK; } log.info("本地事务执行成功, 提交消息" ); return RocketMQLocalTransactionState.COMMIT; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("开始本地事务状态回查" ); RocketMQLocalTransactionState localTransactionState; if (userService.getByUsername(((User) msg.getPayload()).getUsername()) != null) { localTransactionState = RocketMQLocalTransactionState.COMMIT; }else { localTransactionState = RocketMQLocalTransactionState.UNKNOWN; } log.info("本地事务状态回查结果:{}" , localTransactionState); return localTransactionState; } }
RocketMQLocalTransactionListener 接口定义了两个接口。分别是半消息发送成功后的本地事务回调方法,和事务状态回查方法。 其实现类要使用 @RocketMQTransactionListener 注解,并定义其 txProducerGroup 属性值,该属性值可以看作是Listener的标识,发送消息时需要指定该标识,然后才能找到对于的 RocketMQLocalTransactionListener 实现类。
UserController 接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RestController @RequestMapping("/user" ) public class UserController { @Autowired private RocketMQTemplate rocketMQTemplate; @PostMapping("/add" ) public String addUser(@RequestBody User user){ Message<User> message = MessageBuilder.withPayload(user).build(); rocketMQTemplate.sendMessageInTransaction("user_tx_producer_group" , "user-topic" , message, user); return "success" ; } }
发送事务消息使用 rocketMQTemplate.sendMessageInTransaction()方法,传递的四个参数从左至右依次为 ① 本地事务回调的实现类标识(即UserTransactionListener上面的@RocketMQTransactionListener(txProducerGroup = “user_tx_producer_group”))、 ② 消息的topic ③ 消息体 ④ 额外参数,回调本地事务时会传递该参数。即executeLocalTransaction(Message msg, Object arg)方法的第二个参数。 使用postman调用用户接口
事务状态回查模拟 我们先在执行本地事务时打上断点,在返回事务状态前一直阻塞Producer程序,来模拟发送二次确认失败的情况,从而触发Broker的事务状态回查。
使用postman调用接口,当程序执行到断点位置处时阻塞,半消息和本地事务都已执行成功但还未发送二次确认,Broker等到60s的时间间隔后就会触发事务回查。