RocketMQ为什么这么快之存储结构

RocketMQ 为什么这么快之存储结构

背景

接触过RocketMQ 的人都知道,RocketMQ的几大特性就是吞吐量高,可以堆积海量数据,和保证最少一次的消费,而且所有的消息都会落盘,从小就被老师教育说磁盘很慢的我们真的是很难理解,一个写磁盘的MQ队列能达到这么高的QPS. 而让其达到这么高吞吐量的主要原因就是他的存储结构

alt

存储文件结构

熟悉RocketMQ的童鞋们都知道. RocketMQ的存储文件分别为如下三种,存储在store 目录下
分别是

  1. commitlog
    所有的生产者生产的消息都顺序的存在此类文件中

  2. ConsumeQueue
    消息消费队列存储的消息,有RocketMQ服务从commitlog中异步的重做复制到此文件夹下的文件中.

  3. IndexFile
    消息的Key对应的消息消息偏移量.用于通过key快速的从磁盘中获取消息,具体的内容后面详细阐述.

  4. abort
    此文件是在RocketMQ启动时产生,正常关闭时删除,所以当RocketMQ启动时发现有此文件就证明RocketMQ上一次是非正常退出,则要进行数据恢复

  5. checkpoint
    检查点,记录着RocketMQ的状态消息,如 commitlog 最后一次刷盘时间,consumequeue 最后一次刷盘位置

  6. config 配置信息
    6.1 consumerFilter.json 主题消息过滤
    6.2 consumerOffset.json 集群消费模式消费进度
    6.3 delayOffset.json 延时消息队列拉取进度

存储方式

发送过程

发送过程主要是跟commitlog文件打交道,commitlog文件是个 存储着多个 Message 内容的文件,中间没有任何的其他内容,直至commitlog 文件的最大大小1G
|--------|---------|---------|------|
| messge1| message2|message3 |..... |
|--------|---------|---------|------|

每一条消息Message 的机构如下:
|--------------|---------|--------------------------|
| 内容          |长度(byte)|说明                      |
|--------------|---------|--------------------------|
|totalSize     | 4       |消息总长度                  | 
|---------------------------------------------------|
|magicCode     | 4       |魔数,固定值0xdaa320a7       | 
|---------------------------------------------------|
|bodyCrc       | 4       |crc 校验码                 | 
|---------------------------------------------------|
|queueId       | 4       |队列ID 最大数(int.max)      | 
|---------------------------------------------------|
|flag          | 4       |消息flag                   |  
|---------------------------------------------------|
|queueOffset   | 8       |消息在消费队列的偏移量        | 
|---------------------------------------------------|
|physicalOffset| 8       |消息在commitLog的偏移量      | 
|---------------------------------------------------|
|sysFlag       | 4       |系统flag表示压缩,事物等信息    | 
|---------------------------------------------------|
|bornTimeStamp | 8       |消息发送时间戳               | 
|---------------------------------------------------|
|bornHost      | 8       |发送者ip端口                | 
|---------------------------------------------------|
|storeTimeStamp| 8       |消息存储时间戳               | 
|---------------------------------------------------|
|storeHostAddress| 8     |broket服务器Ip+端口         | 
|---------------------------------------------------|
|reconSumetimes  | 4       |重试次数                  | 
|---------------------------------------------------|
|preparedTransactionOffset | 8    |事物消息偏移量      | 
|---------------------------------------------------|
|bodyLength    | 4       |消息内容长度(max:65536)      | 
|---------------------------------------------------|
|body          | bodyLength*  |消息内容              | 
|---------------------------------------------------|
|topicLength   | 1       |topic长度 (max:255)        | 
|---------------------------------------------------|
|topic         | topicLengt* |队列名称                | 
|---------------------------------------------------|
|propertiesLength| 2       |属性长度 max: 65536       | 
|---------------------------------------------------|
|properties      | 4       |队列ID 最大数(int.max)      | 
|---------------------------------------------------|

服务端或者说broket 在处理消息发送请求时的主要的任务就是生成commitlog文件并将响应返回给客户端.只要保证commitlog不丢失,即可保证数据发送的消息不掉.

这里commitlog文件,为了读写快速,使用了操作系统的的mmap技术,对应java的接口则是 net RandomAccessFile(new File(""),"rw").getChannel().map(MapMode.READ_WRITE,position,size);

在写 MappedByteBuffer 时数据不实时同步到磁盘,响应显示的调用 MappedByteBuffer.force(); 但是你如果每一次写入都调用这个方法,那性能就十分的惨不忍睹,会有大量的锁竞争.

然后broket的做法是, 把提交的日志都放到一个缓冲区, 让后等待异步线程去进行force 提交, 假设 位置 10, 1000 这样的位点.
A 生产者提交占用了 10 到 50
B 生产者提交占用了 51 到 100
当前两个生产者都是等待状态,
需要后台线程 一次性把 10 到 100 提交(即调用 force后) A 和 B 生产者才返回提交成功.

消费过程

落盘方式