RocketMQ的数据存储部分也是一个重头戏,他主要用于存储Producer生产的消息,Consume的逻辑队列,索引,以及主从复制,这里也是一个非常好的范例,我们可以看到如何处理数据存储,如何提高IO效率。

东西太多信息量略大有点乱,之后再整理吧......


一、总体结构




队列逻辑结构:

逻辑上来看,每一个Topic有很多queue,他们各自处于自己的ConsumeQueue进行处理



二、基础数据结构


MappedFileQueue:包含了很多MapedFile,以及每个MappedFile的真实大小;

MappedFile:包含了具体的文件信息,包括文件路径,文件名,文件起始偏移,写位移,读位移等等信息,同事使用了虚拟内存映射来提高IO效率;

这两个数据结构是真实的保存了存放在物理机器上的文件信息,后续的很多模块如果涉及到文件存储,都会使用到这两个数据结构。

MappedFileQueue:


    // 每次触发删除文件,最多删除多少个文件  

    private static final int DELETE_FILES_BATCH_MAX = 10;

    // 文件存储位置  

    private final String storePath;

    // 每个文件的大小

    private final int mappedFileSize;

    // 各个文件,这里是一个CopyOnWriteArrayList的使用实例,用于读多写少的线程安全的list容器

    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

   // 预分配MapedFile对象服务  

    private final AllocateMappedFileService allocateMappedFileService;

   // 刷盘刷到哪里  

    private long flushedWhere = 0;

    private long committedWhere = 0;

   // 最后一条消息存储时间  

    private volatile long storeTimestamp = 0;


MappedFile:保存了文件的详细信息,包括:


TotalMapedVitualMemory:JVM中映射的虚拟内存总大小  

TotalMapedFiles:JVM中mmap的数量  

fileName:文件名  

fileFromOffset:文件的起始偏移量  

fileSize:文件大小  

file:文件句柄  

mappedByteBuffer:映射的内存对象  

wrotePostion:当前文件的写位置  

committedPosition:当前文件Flush到的位置  

fileChannel:映射的FileChannel对象  

storeTimestamp:最后一条消息保存时间  

firstCreateInQueue:是不是刚刚创建的Map  



三、主要模块介绍

3.1 CommitLog(物理队列):

CommitLog是用于存储真实的物理消息的结构,ConsumeQueue是逻辑队列,仅仅存储了CommitLog的位移而已,真实的存储都在本结构中。
1. 首先这里会使用CommitLog.this.topicQueueTable.put(key, queueOffset),其中的key是 topic-queueId, queueOffset是当前这个key中的消息数,每增加一个消息增加一(不会自减);
这里queueOffset的用途如下:每次用户请求putMessage的时候,将queueOffset返回给客户端使用

2. 其次就是将消息写入真正的MappedFile,MappedFile中,每条消息的格式如下:



MagicCode:MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8
BodyCRC:对Body的校验码
queueId:队列Id


queueOffset:topic-queueId对应的队列数目

physicalOffset:真实的地址偏移,包括所有文件的总偏移

 

3. putMessage所做操作

3.1 将数据写入mapedFile,并将topic-queueId,queueOffset写入topicQueueTable

3.2 创建DispatchRequest,将消息放入DispatchMessageService的List中,进行后续处理

3.3 进行同步刷盘,或者异步刷盘

FlushCommitLogService:刷盘基类;

GroupCommitService:同步刷盘;每次写入消息时,调用swapRequests进行数据交换,然后将所有的请求进行刷盘

FlushRealTimeService:异步刷盘;调用mappedByteBuffer.force()进行刷盘操作


3.2 ConsumeQueue(逻辑队列):


1.ConsumeQueue的结构

         该结构对应于消费者逻辑队列,为什么要将一个topic抽象出很多的queue呢?这样的话,对集群模式更有好处,可以使多个消费者共同消费,而不用上锁;

         对应于consumeQueueTable <topic, <queueId,ConsumeQueue>>,其中每一个ConsumeQueue的结构如下:


offset: CommitLog中的物理位移

size: CommitLog中的日志大小

tagsCode:和storeTimestamp相关

 

2. ConsumeQueue创建过程:

1. 首先会创建CommitLog,在将数据写入CommitLog之后,会创建DispatchRequest,调用defaultMessageStore.putDispatchRequest

2. DispatchMessageService调用putMessagePostionInfo将数据写入ConsumeQueue

3.3 IndexService(索引)

 IndexService用于创建索引文件集合,当用户想要查询某个topic下某个key的消息时,能够快速响应;这里注意不要与上述的ConsumeQueue混合,ConsumeQueue只是为了抽象出多个queue,方便并发情况下,用户put/get消息。

 

IndexService由一系列的IndexFile文件组成:


// 索引文件集合
private final ArrayList<IndexFile> indexFileList

 

1. IndexFile的结构:

IndexFile的格式如下所示:



其中:

A. IndexHeader的结构如下


beginTimestamp = new AtomicLong(0);  

endTimestamp = new AtomicLong(0);  

beginPhyOffset = new AtomicLong(0);  

endPhyOffset = new AtomicLong(0);  

hashSlotCount = new AtomicInteger(0);  

indexCount = new AtomicInteger(1);  



B. HashSlot里面的每一项保存了这个topic-key计算出的hash的Index,所有的链表,所以他的每个大小是4字节,一共有500W项

 

C.Index的结构如下:



keyHash:topic-key(key是消息的key)的hashCode组成

phyOffset:commitLog真实的物理位移

timeOffset:时间位移

slotValue:下一个记录的slot位置

 

2. IndexFile的创建过程:

1. 首先在DispatchMessageService写入ConsumeQueue后,会再调用indexService.putRequest,创建索引请求

2. 调用IndexService的buildIndex创建索引

 

 

3.4 HAService(主从复制模块)

他的启动有3个部分:

3.4.1

AcceptSocketService:作为HAServer进行监听

beginAccept,start;

主要就是在socketAddressListen监听Slave的请求服务,如果有请求进入,新建HAConnection,并start;

同时将其加入到HAService的connectionList中

 

3.4.2

HAConnection:用于管理Master接收的连接,readSocketService和writeSocketService分别启动;

有slaveRequestOffset和slaveAckOffset两个变量:

slaveRequestOffset:它只使用一次,即第一次slave发送给master时,表示slave请求的起始offset;

slaveAckOffset:由slave上传到master,表示其确认接收到的offset

 

A. ReadSocketService:接收处理slave的请求

首先接收到slave的请求后,slave的请求是一个8字节的Long:

先判断slaveRequestOffset是否小于0,小于0则表示第一次请求,将slaveRequestOffset置为此值,需要从这个位置进行commitLog拉取;

否则将slaveAckOffset置为slave请求的offset,表示确认接收到该请求;

再调用notifyTransferSome(slaveAckOffset)

 

B. WriteSocketService:发送给slave物理数据CommitLog

A. 初始化:如果nextTransferFromWhere=-1,则表示是第一次传输;slaveRequestOffset如果也为0,表示slave上面没有数据,直接从master的最后一个文件开始上传,将nextTransferFromWhere置为最后一个文件的位移;如果slaveRequestOffset不为0,则从slave请求的位置开始传输;

B. 心跳包:如果本次传输距离上次的时间超过了5S的心跳时间,那么发送心跳包:8(下次传输的位置)+4(固定值0)

C. 数据包:获取从nextTransferFromWhere开始的SelectMapedBufferResult(MapFile,size, startOffset);如果size大于传输的BatchSize,则设置size为BatchSize;接下来:

设置nextTranserFromWhere;

设置byteBufferHeader设置头部:8(传输的offset)+4(内容长度)

调用transferData()传输数据:首先传输头部,再根据selectMapedBufferResult传输body部分;

如果没有数据,则

 

WaitNotifyObject:感觉用于多个线程之间的同步

 

notifyTransferSome(slaveAckOffset)

如果客户端ack的值比当前push2SlaveMaxOffset的值大,那么将push2SLaveMaxOffset的值设置为ack,然后调用groupTranserServer唤起线程

 

groupTransferService:它用于同步slave-master的模式,当master收到commitLog,同时又是同步复制的情况,就会将Requets放入,随后进行复制

push2SlaveMaxOffset的值为slaveAckOffset

 

3.4.3

HaClient

1. 首先连接到master,同时获取到最大的offset,到currentReportedOffset

 

2. 向master发送心跳

3. 不断向master发送当前的offset,同时读取master的答复,将master的答复写入commitLog中;如果master的答复不满足我们slave的offset,则关闭与master之间的连接


四、其他模块


4.1 FlushConsumeQueueService:用于将ConsumeQueue的File文件写入入里磁盘;

         首先判断是否到达了刷盘时间,如果到达了,那么全盘通刷;

         否则,遍历所有的ConsumeQueue,调用cq.commit(flushConsumeQueueLeastPages)进行刷盘,flushConsumeQueueLeastPages是目前文件的未刷盘大小达到flushConsumeQueueLeastPages*OS_PAGE_SIZE(1024*4)个,才进行刷盘

 

4.2 CleanCommitLogService:用于定期删除CommitLog的物理文件;

如果到达删除时间或者磁盘大小达到阈值:

         文件保存时长:72小时

         删除文件间隔时间:100ms

         强制删除文件间隔时间:1000* 120ms

         一次最多删除文件数:10个

 

4.3 CleanConsumeQueueService

获取commitLog的最小offset,遍历所有的consumeQueue,将小于offset的consumeQueue删除;

同时遍历IndexFile,将offset小于commitLog的索引删除

 

4.4 DispatchMessageService:这个服务主要是处理DispatchRequest,当写入CommitLog后,会将Dispatch请求写入Queue,然后该服务进行ConsumeQueue和IndexService的写入

 

 

4.5 AllocateMapedFileService:包含在MapedFileQueue中,主要用于每次getLastMapedFile(startOffset)时,如果最后一个文件满了,那么调用allocateMapedFileService.putRequestAndReturnMapedFile,将文件创建请求放入allocateMapedFileService的requestTable和requestQueue中,后续会进行文件创建,如果超过5S还没创建,则报错;

 

4.6 ReputMessageService:这个用于Slave中,当Slave模式时,会启动这个模块,然后首先设置reputFormOffset为初始值0;不断遍历commitLog,得到其SelectMapedBufferResult,然后对每一个Message创建DispatchRequest,创建对应的ConsumeQueue和IndexFile

 

4.7 ScheduleMessageService

4.8 StoreStatsService

4.9 StoreCheckpoint

4.10 BrokerStatsManager

 

附:

 

A.

同步请求的处理方式和异步请求的处理方式统一:

同步请求和异步请求都是将Request放入Queue中,然后再起一个线程进行处理

B. 双缓冲

可以起2个队列,一个读,一个写,这两个队列不断交换;这样可以解决一个问题,写和读不会出现并发问题,不需要上锁,能提高效率,但是仅仅适用于对put顺序没有要求的情况

C. 为什么要逻辑队列:可以使写效率更高