源碼分析 RocketMQ DLedger多副本 之日志復制-下篇

                        小編:管理員 58閱讀 2022.08.03

                        3、EntryHandler 詳解

                        EntryHandler 同樣是一個線程,當節點狀態為從節點時激活。

                        3.1 核心類圖

                        其核心屬性如下:

                        • long lastCheckFastForwardTimeMs 上一次檢查主服務器是否有 push 消息的時間戳。
                        • ConcurrentMap>> writeRequestMap append 請求處理隊列。
                        • BlockingQueue>> compareOrTruncateRequests COMMIT、COMPARE、TRUNCATE 相關請求
                        3.2 handlePush

                        從上文得知,主節點會主動向從節點傳播日志,從節點會通過網絡接受到請求數據進行處理,其調用鏈如圖所示:

                        最終會調用 EntryHandler 的 handlePush 方法。

                        EntryHandler#handlePush

                        public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
                            //The timeout should smaller than the remoting layer's request timeout
                            CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>();      // @1
                            switch (request.getType()) {
                                case APPEND:                                                                                                          // @2
                                    PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                                    long index = request.getEntry().getIndex();
                                    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));
                                    if (old != null) {
                                        logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
                                        future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
                                    }
                                    break;
                                case COMMIT:                                                                                                           // @3
                                    compareOrTruncateRequests.put(new Pair<>(request, future));
                                    break;
                                case COMPARE:
                                case TRUNCATE:                                                                                                     // @4
                                    PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                                    writeRequestMap.clear();
                                    compareOrTruncateRequests.put(new Pair<>(request, future));
                                    break;
                                default:
                                    logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());
                                    future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
                                    break;
                            }
                            return future;
                        }
                        復制

                        從幾點處理主節點的 push 請求,其實現關鍵點如下。

                        代碼@1:首先構建一個響應結果Future,默認超時時間 1s。

                        代碼@2:如果是 APPEND 請求,放入到 writeRequestMap 集合中,如果已存在該數據結構,說明主節點重復推送,構建返回結果,其狀態碼為 REPEATED_PUSH。放入到 writeRequestMap 中,由 doWork 方法定時去處理待寫入的請求。

                        代碼@3:如果是提交請求, 將請求存入 compareOrTruncateRequests 請求處理中,由 doWork 方法異步處理。

                        代碼@4:如果是 COMPARE 或 TRUNCATE 請求,將待寫入隊列 writeRequestMap 清空,并將請求放入 compareOrTruncateRequests 請求隊列中,由 doWork 方法異步處理。

                        接下來,我們重點來分析 doWork 方法的實現。

                        3.3 doWork 方法詳解

                        EntryHandler#doWork

                        public void doWork() {
                            try {
                                if (!memberState.isFollower()) {     // @1
                                    waitForRunning();
                                    return;
                                }
                                if (compareOrTruncateRequests.peek() != null) {    // @2
                                    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll();
                                    PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);
                                    switch (pair.getKey().getType()) {
                                        case TRUNCATE:
                                            handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                                            break;
                                        case COMPARE:
                                            handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                                            break;
                                        case COMMIT:
                                            handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());
                                            break;
                                        default:
                                            break;
                                    }
                                } else { // @3
                                    long nextIndex = dLedgerStore.getLedgerEndIndex() + ;
                                    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex);
                                    if (pair == null) {
                                        checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
                                        waitForRunning();
                                        return;
                                    }
                                    PushEntryRequest request = pair.getKey();
                                    handleDoAppend(nextIndex, request, pair.getValue());
                                }
                            } catch (Throwable t) {
                                DLedgerEntryPusher.logger.error("Error in {}", getName(), t);
                                DLedgerUtils.sleep();
                            }
                        }
                        復制

                        代碼@1:如果當前節點的狀態不是從節點,則跳出。

                        代碼@2:如果 compareOrTruncateRequests 隊列不為空,說明有COMMIT、COMPARE、TRUNCATE 等請求,這類請求優先處理。值得注意的是這里使用是 peek、poll 等非阻塞方法,然后根據請求的類型,調用對應的方法。稍后詳細介紹。

                        代碼@3:如果只有 append 類請求,則根據當前節點最大的消息序號,嘗試從 writeRequestMap 容器中,獲取下一個消息復制請求(ledgerEndIndex + 1) 為 key 去查找。如果不為空,則執行 doAppend 請求,如果為空,則調用 checkAbnormalFuture 來處理異常情況。

                        接下來我們來重點分析各個處理細節。

                        3.3.1 handleDoCommit

                        處理提交請求,其處理比較簡單,就是調用 DLedgerStore 的 updateCommittedIndex 更新其已提交偏移量,故我們還是具體看一下DLedgerStore 的 updateCommittedIndex 方法。

                        DLedgerMmapFileStore#updateCommittedIndex

                        public void updateCommittedIndex(long term, long newCommittedIndex) {   // @1
                            if (newCommittedIndex == -
                                    || ledgerEndIndex == -
                                    || term < memberState.currTerm()
                                    || newCommittedIndex == this.committedIndex) {                               // @2
                                    return;
                            }
                            if (newCommittedIndex < this.committedIndex
                                    || newCommittedIndex < this.ledgerBeginIndex) {                             // @3
                                logger.warn("[MONITOR]Skip update committed index for new={} < old={} or new={} < beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex);
                                return;
                            }
                            long endIndex = ledgerEndIndex;
                            if (newCommittedIndex > endIndex) {                                                       // @4
                                    //If the node fall behind too much, the committedIndex will be larger than enIndex.
                                newCommittedIndex = endIndex;
                            }
                            DLedgerEntry dLedgerEntry = get(newCommittedIndex);                        // @5                
                            PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.DISK_ERROR);
                            this.committedIndex = newCommittedIndex;
                            this.committedPos = dLedgerEntry.getPos() + dLedgerEntry.getSize();     // @6
                        }
                        復制

                        代碼@1:首先介紹一下方法的參數:

                        • long term 主節點當前的投票輪次。
                        • long newCommittedIndex: 主節點發送日志復制請求時的已提交日志序號。

                        代碼@2:如果待更新提交序號為 -1 或 投票輪次小于從節點的投票輪次或主節點投票輪次等于從節點的已提交序號,則直接忽略本次提交動作。

                        代碼@3:如果主節點的已提交日志序號小于從節點的已提交日志序號或待提交序號小于當前節點的最小有效日志序號,則輸出警告日志[MONITOR],并忽略本次提交動作。

                        代碼@4:如果從節點落后主節點太多,則重置 提交索引為從節點當前最大有效日志序號。

                        代碼@5:嘗試根據待提交序號從從節點查找數據,如果數據不存在,則拋出 DISK_ERROR 錯誤。

                        代碼@6:更新 commitedIndex、committedPos 兩個指針,DledgerStore會定時將已提交指針刷入 checkpoint 文件,達到持久化 commitedIndex 指針的目的。

                        3.3.2 handleDoCompare

                        處理主節點發送過來的 COMPARE 請求,其實現也比較簡單,最終調用 buildResponse 方法構造響應結果。

                        EntryHandler#buildResponse

                        private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
                            PushEntryResponse response = new PushEntryResponse();
                            response.setGroup(request.getGroup());
                            response.setCode(code);
                            response.setTerm(request.getTerm());
                            if (request.getType() != PushEntryRequest.Type.COMMIT) {
                                response.setIndex(request.getEntry().getIndex());
                            }
                            response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());
                            response.setEndIndex(dLedgerStore.getLedgerEndIndex());
                            return response;
                        }
                        復制

                        主要也是返回當前從幾點的 ledgerBeginIndex、ledgerEndIndex 以及投票輪次,供主節點進行判斷比較。

                        3.3.3 handleDoTruncate

                        handleDoTruncate 方法實現比較簡單,刪除從節點上 truncateIndex 日志序號之后的所有日志,具體調用dLedgerStore 的 truncate 方法,由于其存儲與 RocketMQ 的存儲設計基本類似故本文就不在詳細介紹,簡單介紹其實現要點:根據日志序號,去定位到日志文件,如果命中具體的文件,則修改相應的讀寫指針、刷盤指針等,并將所在在物理文件之后的所有文件刪除。大家如有興趣,可以查閱筆者的《RocketMQ技術內幕》第4章:RocketMQ 存儲相關內容。

                        3.3.4 handleDoAppend
                        private void handleDoAppend(long writeIndex, PushEntryRequest request,
                            CompletableFuture<PushEntryResponse> future) {
                            try {
                                PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
                                DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
                                PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);
                                future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
                                dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
                            } catch (Throwable t) {
                                logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t);
                                future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
                            }
                        }
                        復制

                        其實現也比較簡單,調用DLedgerStore 的 appendAsFollower 方法進行日志的追加,與appendAsLeader 在日志存儲部分相同,只是從節點無需再轉發日志。

                        3.3.5 checkAbnormalFuture

                        該方法是本節的重點,doWork 的從服務器存儲的最大有效日志序號(ledgerEndIndex) + 1 序號,嘗試從待寫請求中獲取不到對應的請求時調用,這種情況也很常見,例如主節點并么有將最新的數據 PUSH 給從節點。接下來我們詳細來看看該方法的實現細節。

                        EntryHandler#checkAbnormalFuture

                        if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < ) {
                            return;
                        }
                        lastCheckFastForwardTimeMs  = System.currentTimeMillis();
                        if (writeRequestMap.isEmpty()) {
                            return;
                        }
                        復制

                        Step1:如果上一次檢查的時間距現在不到1s,則跳出;如果當前沒有積壓的append請求,同樣跳出,因為可以同樣明確的判斷出主節點還未推送日志。

                        EntryHandler#checkAbnormalFuture

                        for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {
                            long index = pair.getKey().getEntry().getIndex();             // @1
                            //Fall behind
                            if (index <= endIndex) {                                                   // @2
                                try {
                                    DLedgerEntry local = dLedgerStore.get(index);
                                    PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
                                    pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
                                    logger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex);
                                } catch (Throwable t) {
                                    logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t);
                                    pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
                                }
                                writeRequestMap.remove(index);
                                continue;
                            }
                            //Just OK
                            if (index ==  endIndex + ) {    // @3
                                //The next entry is coming, just return
                                return;
                            }
                            //Fast forward
                            TimeoutFuture<PushEntryResponse> future  = (TimeoutFuture<PushEntryResponse>) pair.getValue();    // @4
                            if (!future.isTimeOut()) {
                                continue;
                            }
                            if (index < minFastForwardIndex) {                                                                                                                // @5
                                minFastForwardIndex = index;
                            }
                        }
                        復制

                        Step2:遍歷當前待寫入的日志追加請求(主服務器推送過來的日志復制請求),找到需要快速快進的的索引。其關鍵實現點如下:

                        • 代碼@1:首先獲取待寫入日志的序號。
                        • 代碼@2:如果待寫入的日志序號小于從節點已追加的日志(endIndex),并且日志的確已存儲在從節點,則返回成功,并輸出警告日志【PushFallBehind】,繼續監測下一條待寫入日志。
                        • 代碼@3:如果待寫入 index 等于 endIndex + 1,則結束循環,因為下一條日志消息已經在待寫入隊列中,即將寫入。
                        • 代碼@4:如果待寫入 index 大于 endIndex + 1,并且未超時,則直接檢查下一條待寫入日志。
                        • 代碼@5:如果待寫入 index 大于 endIndex + 1,并且已經超時,則記錄該索引,使用 minFastForwardIndex 存儲。

                        EntryHandler#checkAbnormalFuture

                        if (minFastForwardIndex == Long.MAX_VALUE) {
                            return;
                        }
                        Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.get(minFastForwardIndex);
                        if (pair == null) {
                            return;
                        }
                        復制

                        Step3:如果未找到需要快速失敗的日志序號或 writeRequestMap 中未找到其請求,則直接結束檢測。

                        EntryHandler#checkAbnormalFuture

                        logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);
                        pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
                        復制

                        Step4:則向主節點報告從節點已經與主節點發生了數據不一致,從節點并沒有寫入序號 minFastForwardIndex 的日志。如果主節點收到此種響應,將會停止日志轉發,轉而向各個從節點發送 COMPARE 請求,從而使數據恢復一致。

                        行為至此,已經詳細介紹了主服務器向從服務器發送請求,從服務做出響應,那接下來就來看一下,服務端收到響應結果后的處理,我們要知道主節點會向它所有的從節點傳播日志,主節點需要在指定時間內收到超過集群一半節點的確認,才能認為日志寫入成功,那我們接下來看一下其實現過程。

                        4、QuorumAckChecker

                        日志復制投票器,一個日志寫請求只有得到集群內的的大多數節點的響應,日志才會被提交。

                        4.1 類圖

                        其核心屬性如下:

                        • long lastPrintWatermarkTimeMs 上次打印水位線的時間戳,單位為毫秒。
                        • long lastCheckLeakTimeMs 上次檢測泄漏的時間戳,單位為毫秒。
                        • long lastQuorumIndex 已投票仲裁的日志序號。
                        4.2 doWork 詳解

                        QuorumAckChecker#doWork

                        if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > ) {    
                            logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
                                    memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
                            lastPrintWatermarkTimeMs = System.currentTimeMillis();
                        }
                        復制

                        Step1:如果離上一次打印 watermak 的時間超過3s,則打印一下當前的 term、ledgerBegin、ledgerEnd、committed、peerWaterMarksByTerm 這些數據日志。

                        QuorumAckChecker#doWork

                        if (!memberState.isLeader()) {   // @2
                            waitForRunning();
                            return;
                        }
                        復制

                        Step2:如果當前節點不是主節點,直接返回,不作為。

                        QuorumAckChecker#doWork

                        if (pendingAppendResponsesByTerm.size() > ) {   // @1
                            for (Long term : pendingAppendResponsesByTerm.keySet()) {
                                if (term == currTerm) {
                                    continue;
                                }
                                for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {
                                    AppendEntryResponse response = new AppendEntryResponse();
                                    response.setGroup(memberState.getGroup());
                                    response.setIndex(futureEntry.getKey());
                                    response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
                                    response.setLeaderId(memberState.getLeaderId());
                                    logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);
                                    futureEntry.getValue().complete(response);
                                }
                                pendingAppendResponsesByTerm.remove(term);
                            }
                        }
                        if (peerWaterMarksByTerm.size() > ) {
                            for (Long term : peerWaterMarksByTerm.keySet()) {
                                if (term == currTerm) {
                                    continue;
                                }
                                logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);
                                peerWaterMarksByTerm.remove(term);
                            }
                        }
                        復制

                        Step3:清理pendingAppendResponsesByTerm、peerWaterMarksByTerm 中本次投票輪次的數據,避免一些不必要的內存使用。

                        Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
                        long quorumIndex = -;
                        for (Long index : peerWaterMarks.values()) {  // @1
                            int num = ;
                            for (Long another : peerWaterMarks.values()) {  // @2
                                if (another >= index) {
                                    num++;
                                }
                            }
                            if (memberState.isQuorum(num) && index > quorumIndex) {  // @3
                                quorumIndex = index;
                            }
                        }
                        dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);  // @4
                        復制

                        Step4:根據各個從節點反饋的進度,進行仲裁,確定已提交序號。為了加深對這段代碼的理解,再來啰嗦一下 peerWaterMarks 的作用,存儲的是各個從節點當前已成功追加的日志序號。例如一個三節點的 DLedger 集群,peerWaterMarks 數據存儲大概如下:

                        {
                        “dledger_group_01_0” : ,
                        "dledger_group_01_1" : ,
                        }
                        復制

                        其中 dledger_group_01_0 為從節點1的ID,當前已復制的序號為 100,而 dledger_group_01_1 為節點2的ID,當前已復制的序號為 101。再加上主節點,如何確定可提交序號呢?

                        • 代碼@1:首先遍歷 peerWaterMarks 的 value 集合,即上述示例中的 {100, 101},用臨時變量 index 來表示待投票的日志序號,需要集群內超過半數的節點的已復制序號超過該值,則該日志能被確認提交。
                        • 代碼@2:遍歷 peerWaterMarks 中的所有已提交序號,與當前值進行比較,如果節點的已提交序號大于等于待投票的日志序號(index),num 加一,表示投贊成票。
                        • 代碼@3:對 index 進行仲裁,如果超過半數 并且 index 大于 quorumIndex,更新 quorumIndex 的值為 index。quorumIndex 經過遍歷的,得出當前最大的可提交日志序號。
                        • 代碼@4:更新 committedIndex 索引,方便 DLedgerStore 定時將 committedIndex 寫入 checkpoint 中。
                        ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
                        boolean needCheck = false;
                        int ackNum = ;
                        if (quorumIndex >= ) {
                            for (Long i = quorumIndex; i >= ; i--) {  // @1
                                try {
                                    CompletableFuture<AppendEntryResponse> future = responses.remove(i);   // @2
                                    if (future == null) {                                                                                              // @3
                                        needCheck = lastQuorumIndex != - && lastQuorumIndex != quorumIndex && i != lastQuorumIndex;
                                        break;
                                    } else if (!future.isDone()) {                                                                                // @4
                                        AppendEntryResponse response = new AppendEntryResponse();
                                        response.setGroup(memberState.getGroup());
                                        response.setTerm(currTerm);
                                        response.setIndex(i);
                                        response.setLeaderId(memberState.getSelfId());
                                        response.setPos(((AppendFuture) future).getPos());
                                        future.complete(response);
                                    }
                                    ackNum++;                                                                                                      // @5
                                } catch (Throwable t) {
                                    logger.error("Error in ack to index={} term={}", i, currTerm, t);
                                }
                            }
                        }
                        復制

                        Step5:處理 quorumIndex 之前的掛起請求,需要發送響應到客戶端,其實現步驟:

                        • 代碼@1:從 quorumIndex 開始處理,沒處理一條,該序號減一,直到大于0或主動退出,請看后面的退出邏輯。
                        • 代碼@2:responses 中移除該日志條目的掛起請求。
                        • 代碼@3:如果未找到掛起請求,說明前面掛起的請求已經全部處理完畢,準備退出,退出之前再 設置 needCheck 的值,其依據如下(三個條件必須同時滿足):
                          • 最后一次仲裁的日志序號不等于-1
                          • 并且最后一次不等于本次新仲裁的日志序號
                          • 最后一次仲裁的日志序號不等于最后一次仲裁的日志。正常情況一下,條件一、條件二通常為true,但這一條大概率會返回false。
                        • 代碼@4:向客戶端返回結果。
                        • 代碼@5:ackNum,表示本次確認的數量。
                        if (ackNum == ) {
                            for (long i = quorumIndex + ; i < Integer.MAX_VALUE; i++) {
                                TimeoutFuture<AppendEntryResponse> future = responses.get(i);
                                if (future == null) {
                                    break;
                                } else if (future.isTimeOut()) {
                                    AppendEntryResponse response = new AppendEntryResponse();
                                    response.setGroup(memberState.getGroup());
                                    response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
                                    response.setTerm(currTerm);
                                    response.setIndex(i);
                                    response.setLeaderId(memberState.getSelfId());
                                    future.complete(response);
                                } else {
                                    break;
                                }
                            }
                            waitForRunning();
                        }
                        復制

                        Step6:如果本次確認的個數為0,則嘗試去判斷超過該仲裁序號的請求,是否已經超時,如果已超時,則返回超時響應結果。

                        if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) >  || needCheck) {
                            updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
                            for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) {
                                if (futureEntry.getKey() < quorumIndex) {
                                    AppendEntryResponse response = new AppendEntryResponse();
                                    response.setGroup(memberState.getGroup());
                                    response.setTerm(currTerm);
                                    response.setIndex(futureEntry.getKey());
                                    response.setLeaderId(memberState.getSelfId());
                                    response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
                                    futureEntry.getValue().complete(response);
                                    responses.remove(futureEntry.getKey());
                                }
                            }
                            lastCheckLeakTimeMs = System.currentTimeMillis();
                        }
                        復制

                        Step7:檢查是否發送泄漏。其判斷泄漏的依據是如果掛起的請求的日志序號小于已提交的序號,則移除。

                        Step8:一次日志仲裁就結束了,最后更新 lastQuorumIndex 為本次仲裁的的新的提交值。

                        關聯標簽:
                        少妇各种各样BBBⅩXX