RocketMQ 主機磁盤空間有限,如何無限期延長消息存儲?

                        小編:管理員 70閱讀 2022.08.01

                        前言

                        RocketMQ作為國人開源的一款消息引擎,相對kafka也更加適合在線的業務場景,在業內使用的也是非常廣泛,很多同學也是非常熟悉它及它的存儲機制,所以這里不再對它的原理性東西作太多說明。

                        我們也知道,RocketMQ所有的數據如消息信息都是以文件形式保存到broker節點所在主機上指定的分區目錄下,比如消息的數據都是保存在commitlog中,默認保存72小時(在磁盤使用率未達到閾值的情況下)會在指定時間清理過期數據,釋放磁盤空間。

                        當然,如果消息量不大且所在磁盤的分區夠大,我們可以增加消息的保存時間。但受限于磁盤大小,這個保存時間總歸有限,如果消息比較重要,或者我們想保存的更久一些就需要一些其它方案解決。

                        背景

                        我們線上的幾個集群目前消息保存時間在2-3天,實在是磁盤空間大小有限,消息量相對不算小。比如,有個比較核心的集群,部署方式是6個高配物理機采用DLedger模式4主8從交叉部署,發送的tps在10000多,所以每個節點的日消息量目前應該是在600G吧。老大給我說他現在設置的線上保存時間是2天,業務量一直在增加,繼續增長下去,就要設置保存1天了,目前每個節點的磁盤使用率將近50%,年初我搭建監控平臺的時候,注意過還沒這么高。

                        還有其它集群上的業務,有些業務相關開發人員想要他們消息保存7天甚至更久。

                        基于這些原因,所以我們也的確需要一種過期消息備份的解決方案。

                        解決思路

                        如果需要對過期消息進行備份,然后支持過期消息檢索及重新消費的能力,我們想到的,常規的方案有如下兩種:

                        • 將發送到broker的消息持久化一份到第三方存儲介質,如mysql
                        • 備份將要過期的commitlog到其它地方,重新恢復

                        業內大廠是采用哪些更好的方案,時間問題也沒有具體調研過,我不得而知。關于第一種方案,老大也跟我聊過,我是不傾向的,原因如下:

                        • 我們的消息代理平臺還沒有建設出來,業務用的基本都是原生的,如果想要在消息生命周期中鏡像一份出來到其它存儲系統,在不改源碼的情況下,確實沒有很好的切入點
                        • 依賴其它存儲介質,復雜性,開發成本也高,我的開發時間也不充裕,短期內實現這個,有點難
                        • 全量保存的話,消息體的減少很難有質的變化,當然可以在處理的時候,去掉一些元數據信息,消息體也可以壓縮減少存儲空間的占用,但無論存哪,質量守恒,不會換個地方,用的硬盤資源就能等比減少很多倍

                        當然,這種方案的好處也很明顯,可以更精細化的控制保存時間及消息類別,設定對哪些topic或哪類消息的保存時限。另外如果我們的MQ代理層建設完,無論是RocketMQ還是kafka等都可以采用一種通用方案備份。

                        我目前主要采用第2種解決方案并進行實現,備份commitlog,支持檢索和重新消費。主要思路就是,開發一個應用,備份集群里將要過期的commitlog到更大的磁盤空間的主機(一臺主機,備份整個集群的數據,且硬件配置不需要太高,硬盤盡量大即可),并提供接口,支持檢索消息。

                        解決方案基本實現

                        我們的主要目標是讓消息保存的更久一些,不是為了災備什么的,所以不需要雙活、冷備這樣搭建一個同等的部署模型的集群。況且資源有限,不可能再申請同配置或者低配的主機資源解決,比如上面那個4主8從Dledger模式,如果需要同樣的集群來解析commitlog檢索消息,至少也需要4主4從部署8個節點才行,雙活太浪費,冷備維護也不方便。主要原因是資源也不好申請。

                        我用了一周的時間,緊趕趕的寫了一個工具能支持備份commitlog及檢索消息:rocketmq-reput。

                        該工具支持3種模式:客戶端、服務器及混合模式

                        • 客戶端:部署在broker節點,定時掃描上傳將要過期的commitlog
                        • 服務器:保存過期的commitlog并支持消息檢索
                        • 混合模式:同時開啟客戶端和服務器模式,無限期備份的關鍵

                        主要流程如下:

                        • 將reput client部署到rokcetmq集群的各個broker的從節點上,配置監聽的commitlog目錄,定時掃描將要過期的commitlog上傳到reput server上。
                        • reput server接收client傳來的commit log并根據不同的broker存放在不同的目錄下。
                        • 重新分發commit log的消息(所以我起名reput),構建索引文件(消息檢索使用)和邏輯消費隊列。
                        • 在reput server端可以通過restful接口查詢指定topic的歷史消息(根據時間范圍、消息ID[客戶端ID/服務端ID],消息key等)
                        數據上傳

                        從方案到開發,因為時間上的原因,我也沒太多時間花費在這上面,所以在實現上并沒有太注意細節,開發上也比較粗糙。

                        數據上傳這里也是很簡單的壓縮->傳輸->校驗->保存,基本流程如下:

                        如果上傳到一半服務器關閉等原因導致客戶端當前文件上傳失敗,會重置隊列,重新檢查上傳文件,避免有commitlog遺漏。

                        主機配置

                        該工具在執行時,大多情況下不需要太多算力,所以CPU是雙核的即可,內存4G足夠,堆內存配置2G就行,需要留一些物理內存給操作系統的page cache。我目前測試的時候,堆內存只配置了512M,挺好。

                        reput client盡量部署在從節點上,可以減少對master的影響。

                        另外開發的時候,為了節省時間,減少開發的代碼,像文件壓縮和md5檢查,都是直接調用的shell 命令,這也導致不支持在windows平臺下使用,只能在mac 和linux上運行,mac os不檢查md5,只檢查文件長度是否一致。

                        因為執行腳本命令的原因,會占用一些額外的性能,我觀測的有以下幾點:

                        • 壓縮的時候一個cpu的核心使用率達到100%,所以要求最低雙核cpu,單核會影響broker的處理性能
                        • 網絡傳輸帶寬占用在50M/s,其實壓縮比挺高,一般在72%-92%吧,100M-300M之間,所以傳輸時間大概在2-6秒吧,如果本身帶寬是瓶頸,需要注意
                        • 硬盤,硬盤得夠大,畢竟要保存整個集群的commitlog
                        無限期備份方案

                        硬盤即使再大,但空間大小也有上限,所以能保存消息量也有限,比如一個節點消息量600-700G左右,4個節點一天的量就在2.5T左右,即使申請了一個8T的硬盤,也只能保存2天(3天是不可能了)。

                        reput自身也是和rockemq一樣的過期刪除策略(這部分代碼直接copy rocketmq的實現的),所以數據在reput server上過期也要被清除釋放磁盤空間。

                        所以目前reput支持混合模式,可以再申請一臺主機,當前reput作為客戶端,新reput作為server,將快要過期的文件以同樣方式傳輸過去保存,完整流程如下:

                        就以這種接力的方式一直保存下去,一個主機保存2天,想要保存多久,就申請多少主機吧。

                        消息檢索

                        消息檢索,為了方便和省事,我直接在rocketmq-console控制臺新開發一個歷史消息的頁面用來查詢消息,reput server會以心跳的方式將自己可查詢的時間段及地址注冊到控制臺上。

                        在控制臺上選擇topic和時間段,然后根據選擇的時間段符合條件的一個或多個reput server上獲取消息。如果是消息ID或消息key,那就只能到所有的server上一起查了,只要消息還在,總能查到返回。

                        效果如下,我還可以查到4天前的消息(測試的這個集群配置的是保存2天的數據):

                        重新消費

                        重新消費可以將要消費的歷史消息檢索出來,重新發回broker。

                        寫在最后

                        其實開發上還是遇到不少問題點,比如因為commtlog的生成方式和rocketmq自身的生成是不一樣的,rocketmq是在寫入消息的時候,commitlog寫不下了才會創建。在重新構建索引和消息隊列的時候基于原有流程有些場景走不通,無法直接滾到下個文件等。

                        我是每個環節一一開發進行驗證的,最終把所有環節走通,寫了個完整流程的demo。

                        https://github.com/xxd763795151/rocketmq-reput

                        我把基本啟停腳本也簡單補充了下,只是上面有些bug后來就沒在修改。

                        整個流程走通后,我就修改包名提交到私服了,后續的開發包括和rocketmq-console的聯調,支持可視化檢索消息等都是在私服的代碼倉庫上,這部分功能及后續的bug修復,這個demo上是沒有了。但是這份demo代碼已支持消息檢索,也提供的有接口,可以直接調用接口檢索消息看結果,接口說明如下:

                        /**
                             * get the total of message between startTime and endTime.
                             *
                             * @param topic     topic name.
                             * @param startTime start time.
                             * @param endTime   end time.
                             * @return a long value, the total of message between startTime and endTime.
                             */
                            @GetMapping("/total/{topic}/{startTime}/{endTime}")
                            public Object getMessageTotalByTime(@PathVariable String topic, @PathVariable long startTime,
                                @PathVariable long endTime) {
                                return ResponseData.create().success().data(messageService.getMessageTotalByTime(topic, startTime, endTime));
                            }
                         
                            /**
                             * get the message list between startTime and endTime.
                             *
                             * @param topic     topic name.
                             * @param startTime start time.
                             * @param endTime   end time.
                             * @return List(MessageExt),  he message list between startTime and endTime.
                             */
                            @GetMapping("/list/{topic}/{startTime}/{endTime}")
                            public Object getMessageByTime(@PathVariable String topic, @PathVariable long startTime,
                                @PathVariable long endTime) {
                                return ResponseData.create().success().data(messageService.getMessageByTime(topic, startTime, endTime));
                            }
                         
                            /**
                             * get the message list between startTime and endTime. It differs from the above getMessageByTime is that the
                             * message body is null , as a result,  the size is smaller when return the same messages.
                             *
                             * @param topic     topic name.
                             * @param startTime start time.
                             * @param endTime   end time.
                             * @return List(MessageExt),  he message list between startTime and endTime.
                             */
                            @GetMapping("/view/{topic}/{startTime}/{endTime}")
                            public Object viewMessageList(@PathVariable String topic, @PathVariable long startTime,
                                @PathVariable long endTime) {
                                return ResponseData.create().success().data(messageService.viewMessageList(topic, startTime, endTime));
                            }
                         
                            /**
                             * get message by message id(server id(offset id) or client id(unique key)).
                             *
                             * @param topic topic name
                             * @param msgId msg id: server id/ client id.
                             * @return {@link org.apache.rocketmq.common.message.MessageExt}
                             */
                            @GetMapping("/id/{topic}/{msgId}")
                            public Object queryMessageByMsgId(@PathVariable final String topic, @PathVariable final String msgId) {
                                return ResponseData.create().success().data(messageService.queryMessageByMsgId(topic, msgId));
                            }
                         
                            /**
                             * get message by message key.
                             *
                             * @param topic topic name
                             * @param key   msg key: custom business key/ client id.
                             * @return {@link org.apache.rocketmq.common.message.MessageExt}
                             */
                            @GetMapping("/key/{topic}/{key}")
                            public Object queryMessageByKey(@PathVariable final String topic, @PathVariable final String key) {
                                return ResponseData.create().success().data(messageService.queryMessageByKey(topic, key));
                            }
                        復制

                        這個實現是支持Dledger模式與常規的部署模型的。最近在測試環境(2主2從非DLedger模式)運行了幾天,看了下效果,結果挺預期的,可以驗證該方案是完全可行的。

                        關聯標簽:
                        少妇各种各样BBBⅩXX