欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMq系列之Producer顺序消息发送源码分析(四)

程序员文章站 2022-07-14 22:40:43
...

有序消息

消息有序指的是可以按照消息的发送顺序来消费。
RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。

顺序消息生产者

public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTest2", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息时,需要实现MessageQueueSelector , 用来选择合适的queue
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                      	// 
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }

上面实现的顺序消息时,通过orderId来进行顺序消息,同一个订单ID的消息,发送到同一个Queue里面

顺序消息消费者

public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");

        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

需要注意,registerMessageListener 注册的消息监听器 , 需要使用MessageListenerOrderly , ConsumeOrderlyContext , 不可以使用

MessageListenerConcurrently , ConsumeConcurrentlyContext , 否则消费的顺序无法保证。

源码分析

/**
 * @param msg 消息
 * @param selector 消息队列选择器
 * @param arg 分片值 (类似分库分表里面的分片键)
 */
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  msg.setTopic(withNamespace(msg.getTopic()));
  return this.defaultMQProducerImpl.send(msg, selector, arg);
}

实际发送

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
    }

    private SendResult sendSelectImpl(
            Message msg,
            MessageQueueSelector selector,
            Object arg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
				// 1. 获取topic信息,
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
        		
            MessageQueue mq = null;
            try {
            		// 2. 获取当前topic的内部队列信息
                List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(
                        topicPublishInfo.getMessageQueueList());
                // 复制一个消息
                Message userMessage = MessageAccessor.cloneMessage(msg);
                // topic信息
                String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), 
                                    mQClientFactory.getClientConfig().getNamespace());
                userMessage.setTopic(userTopic);
								//3. 获取消息队列
                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
                throw new MQClientException("select message queue throwed exception.", e);
            }

            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTime) {
                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
            }
            if (mq != null) {
            		// 获取到队列了,执行发送消息, 跟普通消息的发送一样的
                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
            } else {
                throw new MQClientException("select message queue return null.", null);
            }
        }

        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }

步骤说明:

  1. 获取当前topic的信息,内部包含消息队列
  2. 获取topic内部的队列信息
  3. 获取消息队列,这个其实就是顺序消息实现的核心点selector.select(messageQueueList, userMessage, arg) ,通过自定义的消息队列选择器,返回相应的队列。 内部完全自定义
  4. 获取到了消息队列之后,执行发送消息,跟普通消息一样,这里就没有什么重试的说法了。

总结: 顺序消息的核心就是将你希望按照顺序的消息,通过某种特定的条件,计算发送到对应的队列里面去。

顺序消息的缺点:

  1. 送顺序消息无法利用集群的Failover特性,因为不能更换broker,MessageQueue进行重试
  2. 存在队列热点问题,当一个场景下消息非常多的情况,会导致个别队列非常繁忙
  3. 消费失败时无法跳过, 会导致消费停止
  4. 消息的并行度依赖于对列数量,不过可以增加队列数量,动态调整

思考: 通过上面那种顺序消息的模式,在broker发生宕机 , 队列数量发生变化时,会造成消费乱序

比如在多master集群的情况下 ,

topic: TP_TEST  总共8个队列

MASTER-1 : 1,2,3,4
MASTER-2 : 5,6,7,8

一个topic分别在多个master上面有队列, 如果其中一个master宕机了,那么队列数会变成4个,那么顺序消息通过 orderId % queueSize 的这种方式,会造成原来往一个队列里面发送的,会发送到另外一个队列里面去,造成消费乱序。

所以如果是要严格的顺序消息,则不要使用rocketMq, 在极端情况下会造成消费乱序。
sharedCode源码交流群,欢迎喜欢阅读源码的朋友加群,添加下面的微信, 备注”加群“ 。

RocketMq系列之Producer顺序消息发送源码分析(四)

RocketMq系列之Producer顺序消息发送源码分析(四)