欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rocketmq分布式事务消息发送demo

程序员文章站 2022-07-14 23:38:31
...

一、分布式事务消息流程简图

rocketmq采用2pc实现的分布式事务中消息的原子性。
rocketmq分布式事务消息发送demo

二、实操demo案例

1、pom文件配置

引入rocket4.3+版本都可以,因为4.3+版本才开始支持分布式事务消息发送

2、代码以及流程简述:

producer端重写TransactionListener里的两个方法,

①一个是实现本地一系列事务的实现(executeLocalTransaction),如果本地事务执行时间过长,broker端可以通过回调(checkLocalTransaction)查看producer端本地事务执行状态。

②另一个是为broker端提供定时回调的实现,主要是查询事务执行状态(checkLocalTransaction)。

总之事务执行3种情况:
如果事务执行成功了直接提交即可(在控制台后台界面的消息一栏会出现显示),会告诉broker成功了,broker才好改变半消息状态以及把生产的消息放到可消费队列里。

如果事务执行失败,则告诉broker回滚之前发送的半消息。

半消息起到分布式消息事务一致流程控制的作用。

如果事务执行没返回结果,或者执行时间较长,或者producer所在服务器宕机了,都会导致broker端一直等待;
所以broker会有一个定时检查producer事务执行状态的定时任务

如果连几次都连不上producer端 ,就视为其放弃开启事务生产消息,不再维护其发送来的半消息

如果多次返回未知,也不再维护其发送来的半消息

如果执行本地事务已经提交了,但是broker没收到成功的消息,可以通过broker回调查看,事务执行的状态为成功的话, broker会继续走后续流程

public class Producer {
    public static void main(String[] args) throws MQClientException {
        TransactionMQProducer txMqPro=new TransactionMQProducer("gp1");
        txMqPro.setNamesrvAddr("192.168.252.188:9876");

        //本地事务执行和broker检查的回调都是由producer端来实现的
        txMqPro.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                //执行一系列本地业务逻辑(同步),方法内部写spring事务管理,然后如果出错失败,就抛异常,外层捕获到直接回滚通知broker
                //a()
                //b()
                //c()
                System.out.println("setTransactionListener   txID:"+msg.getTransactionId()+",body "+new String(msg.getBody()));
                return LocalTransactionState.UNKNOW;
                  /*   return LocalTransactionState.COMMIT_MESSAGE;
                return LocalTransactionState.ROLLBACK_MESSAGE;*/
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("checkLocalTransaction   txID:"+msg.getTransactionId()+",body "+new String(msg.getBody()));
                //broker 异步线程 回调 ,回调延迟策略:在控制台后台页面信息里,cluster->config中,messageDelayLevel设置了延时回调策略
                return LocalTransactionState.COMMIT_MESSAGE;

            }
        });

        txMqPro.start();

        TransactionSendResult topic001 = txMqPro.sendMessageInTransaction(new Message("topic001", "测试分布式消息事务流程".getBytes()),null);

        System.out.println("消息回执:"+topic001);

    }
}

3、测试结果

broker的回调策略可以在控制台后台页面里看到,在cluster->config中,messageDelayLevel设置了延时回调策略

setTransactionListener   txID:C0A80065000018B4AAC22C9636830000,body 测试分布式消息事务流程
消息回执:SendResult [sendStatus=SEND_OK, msgId=C0A80065000018B4AAC22C9636830000, offsetMsgId=null, messageQueue=MessageQueue [topic=topic001, brokerName=node02, queueId=3], queueOffset=32]
checkLocalTransaction   txID:C0A80065000018B4AAC22C9636830000,body 测试分布式消息事务流程

rocketmq分布式事务消息发送demo

如果提交,则会出现在消息一栏,如下
rocketmq分布式事务消息发送demo
rocketmq分布式事务消息发送demo