完美解决,RocketMQ如何支持多事务消息?

本文解决了在 RocketMQ 2.1.0 版本以后,无法简单使用多个 @RocketMQTransactionListener​ 的问题。通过引入事务消息处理接口 TransactionMessageHandler,我们将原有的事务处理器改造成了一个分发器,使得在 DailyMart 项目中可以轻松处理多事务消息的场景。

本文解决了在 RocketMQ 2.1.0 版本以后,无法简单使用多个 @RocketMQTransactionListener​ 的问题。通过引入事务消息处理接口 TransactionMessageHandler,我们将原有的事务处理器改造成了一个分发器,使得在 DailyMart 项目中可以轻松处理多事务消息的场景。

今天我们将解决使用RocketMQ事务消息时可能遇到的一个常见问题:如何让其支持多事务消息?

1. 问题背景

在实际开发中,我们常常会面临多事务消息的场景,例如在DailyMart的订单模块中,用户支付后需要调用库存服务进行库存扣减,而在订单确认收货后需要调用用户服务实现积分赠送。这两个业务逻辑都需要通过事务消息来保证分布式事务。

为了处理这种情况,我们可能会考虑在订单模块中创建两个事务消息监听器,分别用于处理库存扣减和积分赠送的事务处理和事务回查。

@Component
@Slf4j
//处理订单支付的事务监听器
public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
    //处理订单支付逻辑
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
      //检查订单处理逻辑
   }
}

@Component
@Slf4j
//处理订单收货的事务监听器
public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
   }
}

然而,当我们信心满满地完成业务逻辑编写并启动服务时,可能会遇到如下错误:rocketMQTemplate already exists RocketMQLocalTransactionListener

图片图片

在rocketmq-spring-boot-starter版本低于2.1.0的项目中,可以使用多个@RocketMQTransactionListener监听不同的txProducerGroup来发送不同类型的事务消息到topic。然而,从RocketMQ-Spring 2.1.0版本开始,注解@RocketMQTransactionListener不能设置txProducerGroup、ak、sk,这些值均需与对应的RocketMQTemplate保持一致。通过阅读源码RocketMQTransactionConfiguration#registerTransactionListener()方法,也可得知在RocketMQ如果已经存在了RocketMQTransactionListener则会出现上述错误。

图片图片

2. 如何解决

为了在保证系统只有一个RocketMQTransactionListener的前提下实现多事务消息,我们可以将RocketMQLocalTransactionListener不处理具体业务逻辑,而是将其作为一个分发器使用。

在生产者发送事务消息时指定对应的事务处理器,并将事务处理器放置在消息头上发送出去,在RocketMQTransactionListener中根据消息头选择具体的事务处理器来实现业务逻辑。

具体实现如下:

2.1 定义事务消息处理接口

首先,定义公共的事务消息处理接口,所有事务消息都实现此接口而非 RocketMQ 默认的RocketMQLocalTransactionListener。

public interface TransactionMessageHandler {
    
    /**
    * 执行本地事务
    * @param payload 消息体
    * @param arg 参数
    */
    RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg);
    
    /**
     * 检查本地执行状态
     * @param payload 消息体
     * @return 执行结果
     */
    RocketMQLocalTransactionState checkLocalTransaction(Object payload);
    
}

2.2 修改事务消息发送工具类,指定消息处理器

public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) {  
  if(transactionMessageListener == null){
    throw new IllegalArgumentException("transactionMessageListener must not null");
  }
  
  String destination = buildDestination(topic, tag);

  Message<T> sendMessage = MessageBuilder.withPayload(message)
    .setHeader(RocketMQHeaders.KEYS, message.getKey())
    .setHeader(SOURCE_HEADER, message.getSource())
    .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName())
    .build();
  TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null);

  log.info("[{}]事务消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult));

  return sendResult;
}

2.3 修改RocketMQ事务消息监听器

@Slf4j
@RocketMQTransactionListener
public class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener {
    
    private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap;
    
    public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) {
        this.transactionMessageHandlerMap = transactionMessageHandlerMap;
    }
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("消费者收到事务消息[{}]", JSONObject.toJSON(message));
        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
        
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        
        RocketMQLocalTransactionState state;
        Object payload = message.getPayload();
        try {
            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.executeLocalTransaction(payload, arg);
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        return state;
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("消费者收到事务回查消息[{}]", JsonUtils.obj2String(message.getHeaders()));
        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        RocketMQLocalTransactionState state;
        try {
            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.checkLocalTransaction(message.getPayload());
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        return state;
    }
    
}

在上述代码中,根据消息头中的TRANSACTION_MESSAGE_HEADER参数选择对应的事务处理器来处理事务消息。

在 DailyMart 中有一个公共组件dailymart-rocketmq-spring-boot-starter专门用于 RocketMQ 消息发送监听的封装,因此我们也将事务消息的处理逻辑封装到了此组件中。

图片图片

2.4 修改事务消息处理逻辑

所有的事务消息处理逻辑都实现TransactionMessageHandler接口,以订单支付的处理逻辑为例:

@Component
@Slf4j
public class OrderPaidTransactionConsumer implements TransactionMessageHandler {
    
    @Resource
    private TransactionTemplate transactionTemplate;
    
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
        ...
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Object payload) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
        ...
    }
    
}

2.5 修改事务消息发送逻辑,指定事务处理器

TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);

小结

本文解决了在 RocketMQ 2.1.0 版本以后,无法简单使用多个@RocketMQTransactionListener的问题。通过引入事务消息处理接口TransactionMessageHandler,我们将原有的事务处理器改造成了一个分发器,使得在 DailyMart 项目中可以轻松处理多事务消息的场景。

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2024年2月4日 17:07
下一篇 2024年2月4日 17:09

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信