RocketMQ 为什么这么快之存储结构
背景
接触过RocketMQ 的人都知道,RocketMQ的几大特性就是吞吐量高,可以堆积海量数据,和保证最少一次的消费,而且所有的消息都会落盘,从小就被老师教育说磁盘很慢的我们真的是很难理解,一个写磁盘的MQ队列能达到这么高的QPS. 而让其达到这么高吞吐量的主要原因就是他的存储结构
存储文件结构
熟悉RocketMQ的童鞋们都知道. RocketMQ的存储文件分别为如下三种,存储在store 目录下
分别是
-
commitlog
所有的生产者生产的消息都顺序的存在此类文件中 -
ConsumeQueue
消息消费队列存储的消息,有RocketMQ服务从commitlog中异步的重做复制到此文件夹下的文件中. -
IndexFile
消息的Key对应的消息消息偏移量.用于通过key快速的从磁盘中获取消息,具体的内容后面详细阐述. -
abort
此文件是在RocketMQ启动时产生,正常关闭时删除,所以当RocketMQ启动时发现有此文件就证明RocketMQ上一次是非正常退出,则要进行数据恢复 -
checkpoint
检查点,记录着RocketMQ的状态消息,如 commitlog 最后一次刷盘时间,consumequeue 最后一次刷盘位置 -
config 配置信息
6.1 consumerFilter.json 主题消息过滤
6.2 consumerOffset.json 集群消费模式消费进度
6.3 delayOffset.json 延时消息队列拉取进度
存储方式
发送过程
发送过程主要是跟commitlog文件打交道,commitlog文件是个 存储着多个 Message 内容的文件,中间没有任何的其他内容,直至commitlog 文件的最大大小1G
|--------|---------|---------|------|
| messge1| message2|message3 |..... |
|--------|---------|---------|------|
每一条消息Message 的机构如下:
|--------------|---------|--------------------------|
| 内容 |长度(byte)|说明 |
|--------------|---------|--------------------------|
|totalSize | 4 |消息总长度 |
|---------------------------------------------------|
|magicCode | 4 |魔数,固定值0xdaa320a7 |
|---------------------------------------------------|
|bodyCrc | 4 |crc 校验码 |
|---------------------------------------------------|
|queueId | 4 |队列ID 最大数(int.max) |
|---------------------------------------------------|
|flag | 4 |消息flag |
|---------------------------------------------------|
|queueOffset | 8 |消息在消费队列的偏移量 |
|---------------------------------------------------|
|physicalOffset| 8 |消息在commitLog的偏移量 |
|---------------------------------------------------|
|sysFlag | 4 |系统flag表示压缩,事物等信息 |
|---------------------------------------------------|
|bornTimeStamp | 8 |消息发送时间戳 |
|---------------------------------------------------|
|bornHost | 8 |发送者ip端口 |
|---------------------------------------------------|
|storeTimeStamp| 8 |消息存储时间戳 |
|---------------------------------------------------|
|storeHostAddress| 8 |broket服务器Ip+端口 |
|---------------------------------------------------|
|reconSumetimes | 4 |重试次数 |
|---------------------------------------------------|
|preparedTransactionOffset | 8 |事物消息偏移量 |
|---------------------------------------------------|
|bodyLength | 4 |消息内容长度(max:65536) |
|---------------------------------------------------|
|body | bodyLength* |消息内容 |
|---------------------------------------------------|
|topicLength | 1 |topic长度 (max:255) |
|---------------------------------------------------|
|topic | topicLengt* |队列名称 |
|---------------------------------------------------|
|propertiesLength| 2 |属性长度 max: 65536 |
|---------------------------------------------------|
|properties | 4 |队列ID 最大数(int.max) |
|---------------------------------------------------|
服务端或者说broket 在处理消息发送请求时的主要的任务就是生成commitlog文件并将响应返回给客户端.只要保证commitlog不丢失,即可保证数据发送的消息不掉.
这里commitlog文件,为了读写快速,使用了操作系统的的mmap技术,对应java的接口则是 net RandomAccessFile(new File(""),"rw").getChannel().map(MapMode.READ_WRITE,position,size);
在写 MappedByteBuffer 时数据不实时同步到磁盘,响应显示的调用 MappedByteBuffer.force(); 但是你如果每一次写入都调用这个方法,那性能就十分的惨不忍睹,会有大量的锁竞争.
然后broket的做法是, 把提交的日志都放到一个缓冲区, 让后等待异步线程去进行force 提交, 假设 位置 10, 1000 这样的位点.
A 生产者提交占用了 10 到 50
B 生产者提交占用了 51 到 100
当前两个生产者都是等待状态,
需要后台线程 一次性把 10 到 100 提交(即调用 force后) A 和 B 生产者才返回提交成功.