欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【SpringBoot】【分布式事务】【RocketMQ】整合消息队列,从单机到集群

程序员文章站 2022-06-13 19:04:36
...

一、使用:

一、引入依赖:

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

二、举例:生产者创建订单---->生产者发送消息----->MQ服务接受消息----->消费者监听消息并减库存

【生产者】:

application.yml

rocketmq:
  name-server: 192.168.85.128:9876 # rocketMQ地址
  producer:
    group: producer-group-test  # 生产者的组名需要和消费者监听consumerGroup一致

业务代码:

@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, TbOrder> implements OrderService {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void create() {
        //创建订单--> 发送消息 --> 消息发送成功后调用本地事务提交 -->
        TbOrder order = new TbOrder();
        order.setCount(10);
        order.setMoney(BigDecimal.valueOf(10));
        order.setProductId(1L);
        order.setStatus(1);
        order.setUserId(1L);
        sendMsg(order);
    }
    @Override
    public void sendMsg(TbOrder order){
        /**
         * String txProducerGroup, 生产者分组
         * String destination,  topic
         *  Message<?> message, 消息
         *  Object arg  消息参数
         */
        Message<String> build = MessageBuilder.withPayload(JSONObject.toJSONString(order)).build();
        rocketMQTemplate.sendMessageInTransaction("tx-producer-group","txmsg-topic",build , null);
    }
}

创建  ProducerTxmsgListener 并实现 RocketMQLocalTransactionListener:

@Component
// txProducerGroup 的值和发送事务消息指定的 txProducerGroup 相同 
@RocketMQTransactionListener(txProducerGroup = "txmsg-producer-group")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
    @Resource
    private OrderService orderService;

    /**
     * @Description: 执行本地事务提交
     */
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class);
            System.out.println(tbOrder);
            orderService.save(tbOrder);
            return RocketMQLocalTransactionState.COMMIT; //变更消息状态为:可消费
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK; //本地事务执行异常,将消息遗弃
        }
    }

    /**
     * @Description: 检查本地事务是否执行成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class);
        TbOrder order = orderService.getById(tbOrder.getId());
        // 不为null 则表示执行成功
        if (order != null){
            return RocketMQLocalTransactionState.COMMIT; //变更消息状态为:可消费
        }
        // 执行本地事务发生问题或还没执行完成, UNKNOWN 表示会继续回查
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}

【消费者】:

application.yml

rocketmq:
  name-server: 127.0.0.1:9876 # rocketMQ地址
  producer:
    group: producer-test-group  # 生产者的组名需要和消费者监听consumerGroup一致

创建MyListener 并实现 RocketMQListener 接口:

// topic 对应生产者发消息是的topic
@RocketMQMessageListener(topic = "test-topic" , consumerGroup = "consumer-group")
public class MyListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        //执行 减库存业务  如果发生异常,则消息会隔段时间再次消费 
        System.out.println(message);
    }
}

 

原理图:

【SpringBoot】【分布式事务】【RocketMQ】整合消息队列,从单机到集群

 

 

 

  

相关标签: SpringBoot相关