欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ:索引源码分析

程序员文章站 2022-07-14 23:00:54
...

RocketMQ是阿里开源的一款高性能高吞吐的消息中间件,我们来研究下它是如何实现的,重点关注索引。

我们拿一个执行用例来测试,代码如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.example.quickstart;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
    	SimpleDateFormat time=new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); 
    	final DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

    	final int num = 2;
        for (int i = 0; i < 1; i++) {
            try {

            	Message msg = new Message("Topic1" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + num + time.format(new Date()) + " " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                msg.putUserProperty("psly", "psly");

                producer.send(msg, new MessageQueueSelector(){
                	@Override
					public MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg){
                		System.out.println(arg);
                		return mqs.get(((Integer) arg) % mqs.size());
                	}
                }, num);

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

我们接着在DefaultMessageStore里面打个断点,然后执行以上用例。

可以看到代码进入了这个方法。

RocketMQ:索引源码分析

我们跟着它的执行,最后会看到它到了关键的doAppend方法。

RocketMQ:索引源码分析

这个位置会真正开始组织消息数据,并且保存到commit文件对应的内存映射里面。

那么具体来说,消息数据是如何格式化的呢?我们可以直接看calMsgLength方法,注释中详细说明了消息存储所占有的字节数:

RocketMQ:索引源码分析

我们可以重点关注其中的几个重要数据:

  • TOTALSIZE,作为消息的最字节数,作为第一个成员,4个字节数。它用于界定消息的边界。
  • BODYCRC,通过循环冗余校验来查看消息内容是否已出错。
  • QUEUEOFFSET,根据topic名称取得对应的Long(默认0-4),将来作为存储索引文件的目录。
  • PHYSICALOFFSET,用于消息在查找持久化(文件)之后在文件块(MappedFile)中的偏移位置。
然后消息格式化之后,接着又要干什么呢?一般来说此时内存里面已经有了这条消息,但是我们不知道消息何时会被消费,所以我们得持久化这条消息。也就是将将消息flush到文件上。而事实上我们已经构造的消息内存正是关联到一个文件的,截图如下:
RocketMQ:索引源码分析
那么我们所要做的就是去flush这个文件映射,从而确保消息保存到磁盘上。

另一方面,RocketMQ的索引设计采取的方式是
  • 先格式化消息(计算此消息的总大小、topic名字、此key计算得到的queueoffset、放入磁盘中的偏移量等数据),然后放入消息块文件(比较大的文件,默认貌似1G)。
  • 一个线程异步地将上面构造的消息flush进硬盘
  • 一个线程将topic对应的physicaloffset放入索引的文件目录(内存映射)。physicaloffset用于从大块的文件存储中索引该消息。
  • 一个线程将上面构造的文件(含索引),刷新到硬盘中。
所以这里的消息将来怎么取得呢?
方式如下:
  • 首先根据topic直接取得对应的topic目录。
  • 再根据key计算对应的queueoffset值,默认(0-4)。
  • 该目录下的文件内容(默认大小6000000个字节,5860KB)为消息对应commit大文件的索引,默认一个消息20(CQ_STORE_UNIT_SIZE)个字节。
  • 所以先取得 索引文件的内容(20个字节),然后根据其中的offset字段、总长度字段,去commit文件中取得真正的消息内容。
  • (因为采用MappedByteBuffer来实现,所以以上的操作很可能不需要磁盘IO)
这里有个问题,为什么不为每个topic、queue建立一个文件来专门保存此类消息呢?
推测如下:
  • 假如topic过多,会导致文件数量过多,且每个文件都保存着大量数据,不好维护。
  • 分成多个topic文件的方式,并不能提高IO的效率。可能会导致同时打开多个文件I/O。
  • 将消息内容都存在一个目录。这样读取和写入时只需要打开一个文件I/O,提高效率。然后将物理位置的索引放到对应的topic目录。
  • 这种方式可以理解为:一个重量级的目录+多个轻量级的索引目录。
最后我们来看看实现的代码:
从消息内容中提取字段,构造字段存入索引文件(仅在内存中构造ConsumeQueue,以及存入消息索引,很快),由ReputMessageService线程来完成。
RocketMQ:索引源码分析
由于需要快速响应给消费者,可以看到这里轮询的时间间隔非常短(Thread.sleep(1))。

将消息内容commit到磁盘上,由FlushRealTimeService线程来完成:
RocketMQ:索引源码分析
这里的interval稍微久点,默认500毫秒。因为刷一次硬盘比较昂贵,尽量一次多干点活。

将ReputMessageService产生的索引刷到对应的topic目录文件中,由FlushConsumeQueueService线程完成,代码如下:
RocketMQ:索引源码分析
由于前面的ReputMessageService线程已经将索引数据保存在内容中了,所以这里的磁盘操作轮询间隔interval也比较大,默认1000毫秒。

最后还有个重要的问题,这四类线程是如何协作工作的呢,看如图代码:
RocketMQ:索引源码分析
  • SendMessageThread_*线程通过wrotePosition变量来通知ReputMessageService线程和FlushRealTimeService线程。
  • FlushRealTimeService执行消息内容持久化,ReputMessageService执行构建消息索引的内存映射。这两者可同时进行。
  • ReputMessageService完成任务之后,再次通过其对应的wrotePosition来通知FlushConsumeQueueService进行刷新索引的工作。代码如下:
  • RocketMQ:索引源码分析

所以这里的依赖如下:
  • FlushRealTimeService 依赖于SendMessageThread_*,通过wrotePosition变量;
  • ReputMessageService  依赖于SendMessageThread_*,通过wrotePosition变量;
  • FlushConsumeQueueService 依赖于ReputMessageService,通过wrotePosition变量。
以上为索引与存储的服务设计。