分布式消息隊列 RocketMQ 源碼分析 —— RPC 通信(二)

                        小編:管理員 88閱讀 2022.08.02

                        • 一、為何要使用Netty作為高性能的通信庫?
                        • 二、RocketMQ中RPC通信的Netty多線程模型
                          • 2.1、Netty的Reactor多線程模型設計概念與簡述
                          • 2.2、RocketMQ中RPC通信的1+N+M1+M2的Reactor多線程設計與實現
                        • 三、總結

                        文章摘要:如何設計RPC通信層模型是任何一款性能強勁的MQ所要重點考慮的問題

                        在(一)篇中主要介紹了RocketMQ的協議格式,消息編解碼,通信方式(同步/異步/單向)、消息發送/接收以及異步回調的主要通信流程。而本篇將主要對RocketMQ消息隊列RPC通信部分的Netty多線程模型進行重點介紹。

                        一、為何要使用Netty作為高性能的通信庫?

                        在看RocketMQ的RPC通信部分時候,可能有不少同學有這樣子的疑問,RocketMQ為何要選擇Netty而不直接使用JDK的NIO進行網絡編程呢?這里有必要先來簡要介紹下Netty。 Netty是一個封裝了JDK的NIO庫的高性能網絡通信開源框架。它提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。 下面主要列舉了下一般系統的RPC通信模塊會選擇Netty作為底層通信庫的理由(作者認為RocketMQ的RPC同樣也是基于此選擇了Netty):

                        (1)Netty的編程API使用簡單,開發門檻低,無需編程者去關注和了解太多的NIO編程模型和概念;

                        (2)對于編程者來說,可根據業務的要求進行定制化地開發,通過Netty的ChannelHandler對通信框架進行靈活的定制化擴展;

                        (3)Netty框架本身支持拆包/解包,異常檢測等機制,讓編程者可以從JAVA NIO的繁瑣細節中解脫,而只需要關注業務處理邏輯;

                        (4)Netty解決了(準確地說應該是采用了另一種方式完美規避了)JDK NIO的Bug(Epoll bug,會導致Selector空輪詢,最終導致CPU 100%);

                        (5)Netty框架內部對線程,selector做了一些細節的優化,精心設計的reactor多線程模型,可以實現非常高效地并發處理;

                        (6)Netty已經在多個開源項目(Hadoop的RPC框架avro使用Netty作為通信框架)中都得到了充分驗證,健壯性/可靠性比較好。

                        二、RocketMQ中RPC通信的Netty多線程模型

                        RocketMQ的RPC通信部分采用了"1+N+M1+M2"的Reactor多線程模式,對網絡通信部分進行了一定的擴展與優化,這一節主要讓我們來看下這一部分的具體設計與實現內容。

                        2.1、Netty的Reactor多線程模型設計概念與簡述

                        這里有必要先來簡要介紹下Netty的Reactor多線程模型。Reactor多線程模型的設計思想是分而治之+事件驅動。

                        (1)分而治之 一般來說,一個網絡請求連接的完整處理過程可以分為接受(accept)、數據讀。╮ead)、解碼/編碼(decode/encode)、業務處理(process)、發送響應(send)這幾步驟。Reactor模型將每個步驟都映射成為一個任務,服務端線程執行的最小邏輯單元不再是一次完整的網絡請求,而是這個任務,且采用以非阻塞方式執行。

                        (2)事件驅動 每個任務對應特定網絡事件。當任務準備就緒時,Reactor收到對應的網絡事件通知,并將任務分發給綁定了對應網絡事件的Handler執行。

                        2.2、RocketMQ中RPC通信的1+N+M1+M2的Reactor多線程設計與實現

                        (1)RocketMQ中RPC通信的Reactor多線程設計與流程 RocketMQ的RPC通信采用Netty組件作為底層通信庫,同樣也遵循了Reactor多線程模型,同時又在這之上做了一些擴展和優化。下面先給出一張RocketMQ的RPC通信層的Netty多線程模型框架圖,讓大家對RocketMQ的RPC通信中的多線程分離設計有一個大致的了解。

                        RocketMQ的RPC通信層—1+N+M1+M2模型.png

                        從上面的框圖中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多線程模型。一個 Reactor 主線程(eventLoopGroupBoss,即為上面的1)負責監聽 TCP網絡連接請求,建立好連接后丟給Reactor 線程池(eventLoopGroupSelector,即為上面的“N”,源碼中默認設置為3),它負責將建立好連接的socket 注冊到 selector上去(RocketMQ的源碼中會自動根據OS的類型選擇NIO和Epoll,也可以通過參數配置),然后監聽真正的網絡數據。拿到網絡數據后,再丟給Worker線程池(defaultEventExecutorGroup,即為上面的“M1”,源碼中默認設置為8)。 為了更為高效的處理RPC的網絡請求,這里的Worker線程池是專門用于處理Netty網絡通信相關的(包括編碼/解碼、空閑鏈接管理、網絡連接管理以及網絡請求處理)。而處理業務操作放在業務線程池中執行(這個內容在“RocketMQ的RPC通信(一)篇”中也有提到),根據 RomotingCommand 的業務請求碼code去processorTable這個本地緩存變量中找到對應的 processor,然后封裝成task任務后,提交給對應的業務processor處理線程池來執行(sendMessageExecutor,以發送消息為例,即為上面的 “M2”)。 下面以表格的方式列舉了下上面所述的“1+N+M1+M2”Reactor多線程模型

                        線程數

                        線程名

                        線程具體說明

                        1

                        NettyBoss_%d

                        N

                        NettyServerEPOLLSelector_%d_%d

                        M1

                        NettyServerCodecThread_%d

                        M2

                        RemotingExecutorThread_%d

                        (2)RocketMQ中RPC通信的Reactor多線程的代碼具體實現 說完了Reactor多線程整體的設計與流程,大家應該就對RocketMQ的RPC通信的Netty部分有了一個比較全面的理解了,那接下來就從源碼上來看下一些細節部分(在看該部分代碼時候需要讀者對JAVA NIO和Netty的相關概念與技術點有所了解)。 在NettyRemotingServer的實例初始化時,會初始化各個相關的變量包括serverBootstrap、nettyServerConfig參數、channelEventListener監聽器并同時初始化eventLoopGroupBoss和eventLoopGroupSelector兩個Netty的EventLoopGroup線程池(這里需要注意的是,如果是Linux平臺,并且開啟了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;否則,就用Java NIO的NioEventLoopGroup。),具體代碼如下:

                        public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
                                final ChannelEventListener channelEventListener) {
                                super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
                                this.serverBootstrap = new ServerBootstrap();
                                this.nettyServerConfig = nettyServerConfig;
                                this.channelEventListener = channelEventListener;
                              //省略部分代碼
                              //初始化時候nThreads設置為1,說明RemotingServer端的Disptacher鏈接管理和分發請求的線程為1,用于接收客戶端的TCP連接
                                this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                                    private AtomicInteger threadIndex = new AtomicInteger(0);
                        
                                    @Override
                                    public Thread newThread(Runnable r) {
                                        return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
                                    }
                                });
                        
                                /**
                                 * 根據配置設置NIO還是Epoll來作為Selector線程池
                                 * 如果是Linux平臺,并且開啟了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;否則,就用Java NIO的NioEventLoopGroup。
                                 *
                                 */
                                if (useEpoll()) {
                                    this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                                        private AtomicInteger threadIndex = new AtomicInteger(0);
                                        private int threadTotal = nettyServerConfig.getServerSelectorThreads();
                        
                                        @Override
                                        public Thread newThread(Runnable r) {
                                            return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                                        }
                                    });
                                } else {
                                    this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                                        private AtomicInteger threadIndex = new AtomicInteger(0);
                                        private int threadTotal = nettyServerConfig.getServerSelectorThreads();
                        
                                        @Override
                                        public Thread newThread(Runnable r) {
                                            return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                                        }
                                    });
                                }
                                //省略部分代碼
                        復制

                        在NettyRemotingServer實例初始化完成后,就會將其啟動。Server端在啟動階段會將之前實例化好的1個acceptor線程(eventLoopGroupBoss),N個IO線程(eventLoopGroupSelector),M1個worker 線程(defaultEventExecutorGroup)綁定上去。前面部分也已經介紹過各個線程池的作用了。 這里需要說明的是,Worker線程拿到網絡數據后,就交給Netty的ChannelPipeline(其采用責任鏈設計模式),從Head到Tail的一個個Handler執行下去,這些 Handler是在創建NettyRemotingServer實例時候指定的。NettyEncoder和NettyDecoder 負責網絡傳輸數據和 RemotingCommand 之間的編解碼。NettyServerHandler 拿到解碼得到的 RemotingCommand 后,根據 RemotingCommand.type 來判斷是 request 還是 response來進行相應處理,根據業務請求碼封裝成不同的task任務后,提交給對應的業務processor處理線程池處理。

                        @Override
                            public void start() {
                                //默認的處理線程池組,使用默認的處理線程池組用于處理后面的多個Netty Handler的邏輯操作
                        
                                this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                                        nettyServerConfig.getServerWorkerThreads(),
                                        new ThreadFactory() {
                        
                                            private AtomicInteger threadIndex = new AtomicInteger(0);
                        
                                            @Override
                                            public Thread newThread(Runnable r) {
                                                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                                            }
                                        });
                                /**
                                 * 首先來看下 RocketMQ NettyServer 的 Reactor 線程模型,
                                 * 一個 Reactor 主線程負責監聽 TCP 連接請求;
                                 * 建立好連接后丟給 Reactor 線程池,它負責將建立好連接的 socket 注冊到 selector
                                 * 上去(這里有兩種方式,NIO和Epoll,可配置),然后監聽真正的網絡數據;
                                 * 拿到網絡數據后,再丟給 Worker 線程池;
                                 *
                                 */
                                //RocketMQ-> Java NIO的1+N+M模型:1個acceptor線程,N個IO線程,M1個worker 線程。
                                ServerBootstrap childHandler =
                                        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                                                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                                                .option(ChannelOption.SO_BACKLOG, 1024)
                                                //服務端處理客戶端連接請求是順序處理的,所以同一時間只能處理一個客戶端連接,多個客戶端來的時候,服務端將不能處理的客戶端連接請求放在隊列中等待處理,backlog參數指定了隊列的大小
                                                .option(ChannelOption.SO_REUSEADDR, true)//這個參數表示允許重復使用本地地址和端口
                                                .option(ChannelOption.SO_KEEPALIVE, false)//當設置該選項以后,如果在兩小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文。
                                                .childOption(ChannelOption.TCP_NODELAY, true)//該參數的作用就是禁止使用Nagle算法,使用于小數據即時傳輸
                                                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//這兩個參數用于操作接收緩沖區和發送緩沖區
                                                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                                                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                                                .childHandler(new ChannelInitializer<SocketChannel>() {
                                                    @Override
                                                    public void initChannel(SocketChannel ch) throws Exception {
                        
                                                        ch.pipeline()
                                                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                                                        new HandshakeHandler(TlsSystemConfig.tlsMode))
                                                                .addLast(defaultEventExecutorGroup,
                                                                        new NettyEncoder(),//rocketmq解碼器,他們分別覆蓋了父類的encode和decode方法
                                                                        new NettyDecoder(),//rocketmq編碼器
                                                                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty自帶的心跳管理器
                                                                        new NettyConnectManageHandler(),//連接管理器,他負責捕獲新連接、連接斷開、異常等事件,然后統一調度到NettyEventExecuter處理器處理。
                                                                        new NettyServerHandler()//當一個消息經過前面的解碼等步驟后,然后調度到channelRead0方法,然后根據消息類型進行分發
                                                                );
                                                    }
                                                });
                        
                                if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                                    childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                                }
                        
                                try {
                                    ChannelFuture sync = this.serverBootstrap.bind().sync();
                                    InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                                    this.port = addr.getPort();
                                } catch (InterruptedException e1) {
                                    throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
                                }
                        
                                if (this.channelEventListener != null) {
                                    this.nettyEventExecutor.start();
                                }
                        
                                //定時掃描responseTable,獲取返回結果,并且處理超時
                                this.timer.scheduleAtFixedRate(new TimerTask() {
                        
                                    @Override
                                    public void run() {
                                        try {
                                            NettyRemotingServer.this.scanResponseTable();
                                        } catch (Throwable e) {
                                            log.error("scanResponseTable exception", e);
                                        }
                                    }
                                }, 1000 * 3, 1000);
                            }
                        復制

                        從上面的描述中可以概括得出RocketMQ的RPC通信部分的Reactor線程池模型框圖。

                        RocketMQ的RPC通信層—Reactor線程池.png

                        整體可以看出RocketMQ的RPC通信借助Netty的多線程模型,其服務端監聽線程和IO線程分離,同時將RPC通信層的業務邏輯與處理具體業務的線程進一步相分離。時間可控的簡單業務都直接放在RPC通信部分來完成,復雜和時間不可控的業務提交至后端業務線程池中處理,這樣提高了通信效率和MQ整體的性能。(ps:其中抽象出NioEventLoop來表示一個不斷循環執行處理任務的線程,每個NioEventLoop有一個selector,用于監聽綁定在其上的socket鏈路。)

                        三、總結

                        仔細閱讀RocketMQ的過程中收獲了很多關于網絡通信設計技術和知識點。對于剛接觸開源版的RocketMQ的童鞋來說,想要自己掌握RPC通信部分的各個技術知識點,還需要不斷地使用本地環境進行debug調試和閱讀源碼反復思考。

                        關聯標簽:
                        少妇各种各样BBBⅩXX