關于RocketMQ消息拉取與重平衡

                        小編:管理員 122閱讀 2022.07.29

                        其實最好的學習方式就是互相交流,最近也有跟網友討論了一些關于 RocketMQ 消息拉取與重平衡的問題,我姑且在這里寫下我的一些總結。

                        關于 push 模式下的消息循環拉取問題

                        之前發表了一篇關于重平衡的文章:「Kafka重平衡機制」,里面有說到 RocketMQ 重平衡機制是每隔 20s 從任意一個 Broker 節點獲取消費組的消費 ID 以及訂閱信息,再根據這些訂閱信息進行分配,然后將分配到的信息封裝成 pullRequest 對象 pull 到 pullRequestQueue 隊列中,拉取線程喚醒后執行拉取任務,流程圖如下:

                        但是其中有一些是沒有詳細說的,比如每次拉消息都要等 20s 嗎?真的有個網友問了我如下問題:

                        很顯然他的項目是用了 push 模式進行消息拉取,要回答這個問題,就要從 RockeMQ 的消息拉取說起:

                        RocketMQ 的 push 模式的實現是基于 pull 模式,只不過在 pull 模式上套了一層,所以RocketMQ push 模式并不是真正意義上的 ”推模式“,因此,在 push 模式下,消費者拉取完消息后,立馬就有開始下一個拉取任務,并不會真的等 20s 重平衡后才拉取,至于 push 模式是怎么實現的,那就從源碼去找答案。

                        之前有寫過一篇文章:「RocketMQ為什么要保證訂閱關系的一致性?」,里面有說過 消息拉取是從 PullRequestQueue 阻塞隊列中取出 PullRequest 拉取任務進行消息拉取的,但 PullRequest 是怎么放進 PullRequestQueue 阻塞隊列中的呢?

                        RocketMQ 一共提供了以下方法:

                        org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:

                        public void executePullRequestImmediately(final PullRequest pullRequest) {
                          try {
                            this.pullRequestQueue.put(pullRequest);
                          } catch (InterruptedException e) {
                            log.error("executePullRequestImmediately pullRequestQueue.put", e);
                          }
                        }
                        復制

                        從調用鏈發現,除了重平衡會調用該方法之外,在 push 模式下,PullCallback 回調對象中的 onSuccess 方法在消息消費時,也調用了該方法:

                        org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

                        case FOUND:

                        // 如果本次拉取消息為空,則繼續將pullRequest放入阻塞隊列中
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                          DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        } else {
                          // 將消息放入消費者消費線程去執行
                          DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                            pullResult.getMsgFoundList(), //
                            processQueue, //
                            pullRequest.getMessageQueue(), //
                            dispathToConsume);
                          // 將pullRequest放入阻塞隊列中
                          DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);  
                        }
                        復制

                        當從 broker 拉取到消息后,如果消息被過濾掉,則繼續將pullRequest放入阻塞隊列中繼續循環執行消息拉取任務,否則將消息放入消費者消費線程去執行,在pullRequest放入阻塞隊列中。

                        case NO_NEW_MESSAGE:

                        case NO_MATCHED_MSG:

                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        復制

                        如果從 broker 端沒有可拉取的新消息或者沒有匹配到消息,則將pullRequest放入阻塞隊列中繼續循環執行消息拉取任務。

                        從以上消息消費邏輯可以看出,當消息處理完后,立即將 pullRequest 重新放入阻塞隊列中,因此這就很好解釋為什么 push 模式可以持續拉取消息了:

                        在 push 模式下消息消費完后,還會調用該方法重新將 PullRequest 對象放進 PullRequestQueue 阻塞隊列中,不斷地從 broker 中拉取消息,實現 push 效果。

                        重平衡后隊列被其它消費者分配后如何處理?

                        繼續再想一個問題,如果重平衡后,發現某個隊列被新的消費者分配了,怎么辦,總不能繼續從該隊列中拉取消息吧?

                        RocketMQ 重平衡后會檢查 pullRequest 是否還在新分配的列表中,如果不在,則丟棄,調用 isDrop() 可查出該pullRequest是否已丟棄:

                        org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:

                        final ProcessQueue processQueue = pullRequest.getProcessQueue();
                        if (processQueue.isDropped()) {
                          log.info("the pull request[{}] is dropped.", pullRequest.toString());
                          return;
                        }
                        復制

                        在消息拉取之前,首先判斷該隊列是否被丟棄,如果已丟棄,則直接放棄本次拉取任務。

                        那什么時候隊列被丟棄呢?

                        org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

                        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
                        while (it.hasNext()) {
                          Entry<MessageQueue, ProcessQueue> next = it.next();
                          MessageQueue mq = next.getKey();
                          ProcessQueue pq = next.getValue();
                        
                          if (mq.getTopic().equals(topic)) {
                            // 判斷當前緩存 MessageQueue 是否包含在最新的 mqSet 中,如果不存在則將隊列丟棄
                            if (!mqSet.contains(mq)) {
                              pq.setDropped(true);
                              if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                              }
                            } else if (pq.isPullExpired()) {
                              // 如果隊列拉取過期則丟棄
                              switch (this.consumeType()) {
                                case CONSUME_ACTIVELY:
                                  break;
                                case CONSUME_PASSIVELY:
                                  pq.setDropped(true);
                                  if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                    it.remove();
                                    changed = true;
                                    log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                              consumerGroup, mq);
                                  }
                                  break;
                                default:
                                  break;
                              }
                            }
                          }
                        }
                        復制

                        updateProcessQueueTableInRebalance 方法在重平衡時執行,用于更新 processQueueTable,它是當前消費者的隊列緩存列表,以上方法邏輯判斷當前緩存 MessageQueue 是否包含在最新的 mqSet 中,如果不包含其中,則說明經過這次重平衡后,該隊列被分配給其它消費者了,或者拉取時間間隔太大過期了,則調用 setDropped(true) 方法將隊列置為丟棄狀態。

                        可能你會問,processQueueTable 跟 pullRequest 里面 processQueue 有什么關聯,往下看:

                        org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

                        // 新建 ProcessQueue 
                        ProcessQueue pq = new ProcessQueue();
                        long nextOffset = this.computePullFromWhere(mq);
                        if (nextOffset >= 0) {
                          // 將ProcessQueue放入processQueueTable中
                          ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                          if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                          } else {
                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            // 將ProcessQueue放入pullRequest拉取任務對象中
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                          }
                        }
                        復制

                        可以看出,重平衡時會創建 ProcessQueue 對象,將其放入 processQueueTable 緩存隊列表中,再將其放入 pullRequest 拉取任務對象中,也就是 processQueueTable 中的 ProcessQueue 與 pullRequest 的中 ProcessQueue 是同一個對象。

                        重平衡后會導致消息重復消費嗎?

                        之前在群里有個網友提了這個問題:

                        我當時回答他 RocketMQ 正常也是沒有重復消費,但后來發現其實 RocketMQ 在某些情況下,也是會出現消息重復消費的現象。

                        前面講到,RocketMQ 消息消費時,會將消息放進消費線程中去執行,代碼如下:

                        org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                          pullResult.getMsgFoundList(), //
                          processQueue, //
                          pullRequest.getMessageQueue(), //
                          dispathToConsume);
                        復制

                        ConsumeMessageService 類實現消息消費的邏輯,它有兩個實現類:

                        // 并發消息消費邏輯實現類
                        org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
                        // 順序消息消費邏輯實現類
                        org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
                        復制

                        先看并發消息消費相關處理邏輯:

                        ConsumeMessageConcurrentlyService:

                        org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:

                        if (this.processQueue.isDropped()) {
                          log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                          return;
                        }
                        
                        // 消息消費邏輯
                        // ...
                        
                        // 如果隊列被設置為丟棄狀態,則不提交消息消費進度
                        if (!processQueue.isDropped()) {
                            ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                        } else {
                            log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                        }
                        復制

                        ConsumeRequest 是一個繼承了 Runnable 的類,它是消息消費核心邏輯的實現類,submitConsumeRequest 方法將 ConsumeRequest 放入 消費線程池中執行消息消費,從它的 run 方法中可看出,如果在執行消息消費邏輯中有節點加入,重平衡后該隊列被分配給其它節點進行消費了,此時的隊列被丟棄,則不提交消息消費進度,因為之前已經消費了,此時就會造成消息重復消費的情況。

                        再來看看順序消費相關處理邏輯:

                        ConsumeMessageOrderlyService:

                        org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:

                        public void run() {
                          // 判斷隊列是否被丟棄
                          if (this.processQueue.isDropped()) {
                            log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            return;
                          }
                        
                          final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                          synchronized (objLock) {
                            // 如果不是廣播模式,且隊列已加鎖且鎖沒有過期
                            if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                                || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                              final long beginTime = System.currentTimeMillis();
                              for (boolean continueConsume = true; continueConsume; ) {
                                // 再次判斷隊列是否被丟棄
                                if (this.processQueue.isDropped()) {
                                  log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                                  break;
                                }
                        
                                // 消息消費處理邏輯
                                // ...
                        
                                  continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                                } else {
                                  continueConsume = false;
                                }
                              }
                            } else {
                              if (this.processQueue.isDropped()) {
                                log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                                return;
                              }
                              ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                            }
                          }
                        }
                        復制

                        RocketMQ 順序消息消費會將隊列鎖定,當隊列獲取鎖之后才能進行消費,所以,即使消息在消費過程中有節點加入,重平衡后該隊列被分配給其它節點進行消費了,此時的隊列被丟棄,依然不會造成重復消費。

                        關聯標簽:
                        少妇各种各样BBBⅩXX