消息隊列中間件 RocketMQ 源碼分析 —— Message 存儲

                        小編:管理員 53閱讀 2022.08.03

                        • 1、概述
                        • 2、CommitLog 結構
                        • 3、CommitLog 存儲消息
                          • MappedFile#落盤
                          • FlushRealTimeService
                          • CommitRealTimeService
                          • GroupCommitService
                          • CommitLog#putMessage(...)
                          • MappedFileQueue#getLastMappedFile(...)
                          • MappedFile#appendMessage(...)
                          • DefaultAppendMessageCallback#doAppend(...)
                          • FlushCommitLogService
                        1、概述

                        本文接《RocketMQ 源碼分析 —— Message 發送與接收》。 主要解析CommitLog存儲消息部分。

                        2、CommitLog 結構

                        CommitLog、MappedFileQueue、MappedFile的關系如下:

                        CommitLog:MappedFileQueue:MappedFile= 1 : 1 : N。

                        反應到系統文件如下:

                        Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog
                        Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760
                        -rw-r--r--  1 yunai  staff  1073741824  4 21 16:27 00000000000000000000
                        -rw-r--r--  1 yunai  staff  1073741824  4 21 16:29 00000000001073741824
                        -rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000002147483648
                        -rw-r--r--  1 yunai  staff  1073741824  4 21 16:33 00000000003221225472
                        -rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000004294967296
                        復制

                        CommitLog、MappedFileQueue、MappedFile的定義如下:

                        • MappedFile:00000000000000000000、00000000001073741824、00000000002147483648等文件。
                        • MappedFileQueue:MappedFile所在的文件夾,對MappedFile進行封裝成文件隊列,對上層提供可無限使用的文件容量。
                          • 每個MappedFile統一文件大小。
                          • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在CommitLog里默認為 1GB。
                        • CommitLog:針對MappedFileQueue的封裝使用。

                        CommitLog目前存儲在MappedFile有兩種內容類型:

                        1. MESSAGE :消息。
                        2. BLANK :文件不足以存儲消息時的空白占位。

                        CommitLog存儲在MappedFile的結構:

                        MESSAGE[1]

                        MESSAGE[2]

                        ...

                        MESSAGE[n - 1]

                        MESSAGE[n]

                        BLANK

                        MESSAGE在CommitLog存儲結構:

                        第幾位

                        字段

                        說明

                        數據類型

                        字節數

                        1

                        MsgLen

                        消息總長度

                        Int

                        4

                        2

                        MagicCode

                        MESSAGE_MAGIC_CODE

                        Int

                        4

                        3

                        BodyCRC

                        消息內容CRC

                        Int

                        4

                        4

                        QueueId

                        消息隊列編號

                        Int

                        4

                        5

                        Flag

                        flag

                        Int

                        4

                        6

                        QueueOffset

                        消息隊列位置

                        Long

                        8

                        7

                        PhysicalOffset

                        物理位置。在 CommitLog 的順序存儲位置。

                        Long

                        8

                        8

                        SysFlag

                        MessageSysFlag

                        Int

                        4

                        9

                        BornTimestamp

                        生成消息時間戳

                        Long

                        8

                        10

                        BornHost

                        生效消息的地址+端口

                        Long

                        8

                        11

                        StoreTimestamp

                        存儲消息時間戳

                        Long

                        8

                        12

                        StoreHost

                        存儲消息的地址+端口

                        Long

                        8

                        13

                        ReconsumeTimes

                        重新消費消息次數

                        Int

                        4

                        14

                        PreparedTransationOffset

                        Long

                        8

                        15

                        BodyLength + Body

                        內容長度 + 內容

                        Int + Bytes

                        4 + bodyLength

                        16

                        TopicLength + Topic

                        Topic長度 + Topic

                        Byte + Bytes

                        1 + topicLength

                        17

                        PropertiesLength + Properties

                        拓展字段長度 + 拓展字段

                        Short + Bytes

                        2 + PropertiesLength

                        BLANK在CommitLog存儲結構:

                        第幾位

                        字段

                        說明

                        數據類型

                        字節數

                        1

                        maxBlank

                        空白長度

                        Int

                        4

                        2

                        MagicCode

                        BLANK_MAGIC_CODE

                        Int

                        4

                        3、CommitLog 存儲消息
                        CommitLog#putMessage(...)
                        // 省略代碼
                        復制
                        • 說明 :存儲消息,并返回存儲結果。
                        • 第 2 行 :設置存儲時間等。
                        • 第 16 至 36 行 :事務消息相關,暫未了解。
                        • 第 45 & 97 行 :獲取鎖與釋放鎖。
                        • 第 52 行 :再次設置存儲時間。目前會有多處地方設置存儲時間。
                        • 第 55 至 62 行 :獲取MappedFile,若不存在或已滿,則進行創建。詳細解析見:MappedFileQueue#getLastMappedFile(...)。
                        • 第 65 行 :插入消息到MappedFile,解析解析見:MappedFile#appendMessage(...)。
                        • 第 69 至 80 行 :MappedFile已滿,創建新的,再次插入消息。
                        • 第 116 至 140 行 :消息刷盤,即持久化到文件。上面插入消息實際未存儲到硬盤。此處,根據不同的刷盤策略,執行會有不同。詳細解析見:FlushCommitLogService。
                        • 第 143 至 173 行 :Broker主從同步。后面的文章會詳細解析?。
                        MappedFileQueue#getLastMappedFile(...)
                        // 省略代碼
                        復制
                        • 說明 :獲取最后一個MappedFile,若不存在或文件已滿,則進行創建。
                        • 第 5 至 11 行 :計算當文件不存在或已滿時,新創建文件的createOffset。
                        • 第 14 行 :計算文件名。從此處我們可 以得知,MappedFile的文件命名規則:
                        > fileName[n] = fileName[n - 1] + n * mappedFileSize
                        > fileName[0] = startOffset - (startOffset % this.mappedFileSize)
                        
                        目前 `CommitLog` 的 `startOffset` 為 0。
                        此處有個**疑問**,為什么需要 `(startOffset % this.mappedFileSize)`。例如:
                        
                        | startOffset  | mappedFileSize | createOffset |
                        | --- | :-- | :-- |
                        | 5 | 1 | 5 |
                        | 5 | 2 | 4 |
                        | 5 | 3 | 3  |
                        | 5 | 4 | 4 |
                        | 5 | > 5 | 0 |
                        
                        _如果有知道的同學,麻煩提示下。?_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 計算出來的是,以 `this.mappedFileSize` 為每個文件大小時,`startOffset` 所在文件的開始`offset`*
                        復制
                        • 第 30 至 35 行 :設置MappedFile是否是第一個創建的文件。該標識用于ConsumeQueue對應的MappedFile,詳見ConsumeQueue#fillPreBlank。
                        MappedFile#appendMessage(...)
                        // 省略代碼
                        復制
                        • 說明 :插入消息到MappedFile,并返回插入結果。
                        • 第 8 行 :獲取需要寫入的字節緩沖區。為什么會有writeBuffer != null的判斷后,使用不同的字節緩沖區,見:FlushCommitLogService。
                        • 第 9 至 11 行 :設置寫入position,執行寫入,更新wrotePosition(當前寫入位置,下次開始寫入開始位置)。
                        DefaultAppendMessageCallback#doAppend(...)
                        // 省略代碼
                        復制
                        • 說明 :插入消息到字節緩沖區。
                        • 第 45 行 :計算物理位置。在CommitLog的順序存儲位置。
                        • 第 47 至 49 行 :計算CommitLog里的offsetMsgId。這里一定要和msgId區分開。

                        計算方式

                        長度

                        offsetMsgId

                        Broker存儲時生成

                        Hex(storeHostBytes, wroteOffset)

                        32

                        msgId

                        Client發送消息時生成

                        Hex(進程編號, IP, ClassLoader, startTime, currentTime, 自增序列)

                        32

                        《RocketMQ 源碼分析 —— Message 基礎》

                        • 第 51 至 61 行 :獲取隊列位置(offset)。
                        • 第 78 至 95 行 :計算消息總長度。
                        • 第 98 至 112 行 :當文件剩余空間不足時,寫入BLANK占位,返回結果。
                        • 第 114 至 161 行 :寫入MESSAGE。
                        • 第 173 行 :更新隊列位置(offset)。
                        FlushCommitLogService

                        線程服務

                        場景

                        插入消息性能

                        CommitRealTimeService

                        異步刷盤 && 開啟內存字節緩沖區

                        第一

                        FlushRealTimeService

                        異步刷盤 && 關閉內存字節緩沖區

                        第二

                        GroupCommitService

                        同步刷盤

                        第三

                        MappedFile#落盤

                        方式

                        方式一

                        寫入內存字節緩沖區(writeBuffer)

                        從內存字節緩沖區(write buffer)提交(commit)到文件通道(fileChannel)

                        文件通道(fileChannel)flush

                        方式二

                        寫入映射文件字節緩沖區(mappedByteBuffer)

                        映射文件字節緩沖區(mappedByteBuffer)flush

                        flush相關代碼

                        考慮到寫入性能,滿足flushLeastPages * OS_PAGE_SIZE才進行flush。

                        // 省略代碼
                        復制

                        commit相關代碼:

                        考慮到寫入性能,滿足commitLeastPages * OS_PAGE_SIZE才進行commit。

                        // 省略代碼
                        復制FlushRealTimeService

                        消息插入成功時,異步刷盤時使用。

                        // 省略代碼
                        復制
                        • 說明:實時flush線程服務,調用MappedFile#flush相關邏輯。
                        • 第 23 至 29 行 :每flushPhysicQueueThoroughInterval周期,執行一次flush。因為不是每次循環到都能滿足flushCommitLogLeastPages大小,因此,需要一定周期進行一次強制flush。當然,不能每次循環都去執行強制flush,這樣性能較差。
                        • 第 33 行 至 37 行 :根據flushCommitLogTimed參數,可以選擇每次循環是固定周期還是等待喚醒。默認配置是后者,所以,每次插入消息完成,會去調用commitLogService.wakeup()。
                        • 第 45 行 :調用MappedFile進行flush。
                        • 第 61 至 65 行 :Broker關閉時,強制flush,避免有未刷盤的數據。
                        CommitRealTimeService

                        消息插入成功時,異步刷盤時使用。 和FlushRealTimeService類似,性能更好。

                        // 省略代碼
                        復制GroupCommitService

                        消息插入成功時,同步刷盤時使用。

                        // 省略代碼
                        復制
                        • 說明:批量寫入線程服務。
                        • 第 16 至 25 行 :添加寫入請求。方法設置了sync的原因:this.requestsWrite會和this.requestsRead不斷交換,無法保證穩定的同步。
                        • 第 27 至 34 行 :讀寫隊列交換。
                        • 第 38 至 60 行 :循環寫入隊列,進行flush。
                          • 第 43 行 :考慮到有可能每次循環的消息寫入的消息,可能分布在兩個 MappedFile(寫第N個消息時,MappedFile已滿,創建了一個新的),所以需要有循環2次。
                          • 第 51 行 :喚醒等待寫入請求線程,通過CountDownLatch實現
                        • 第 61 至 66 行 :直接刷盤。此處是由于發送的消息的isWaitStoreMsgOK未設置成TRUE,導致未走批量提交。
                        • 第 73 至 80 行 :每 10ms 執行一次批量提交。當然,如果wakeup()時,則會立即進行一次批量提交。當Broker設置成同步落盤 && 消息isWaitStoreMsgOK=true,消息需要略大于 10ms 才能發送成功。當然,性能相對異步落盤較差,可靠性更高,需要我們在實際使用時去取舍。
                        關聯標簽:
                        少妇各种各样BBBⅩXX