分布式消息隊列 RocketMQ 源碼分析 —— Message 順序發送與消費

                        小編:管理員 73閱讀 2022.08.01

                        本文主要基于 RocketMQ 4.0.x 正式版

                        • 1. 概述
                        • 2.Producer順序發送
                        • 3.Consumer嚴格順序消費
                        • 3.1 獲得(鎖定)消息隊列
                        • 3.2 移除消息隊列
                        • 3.3 消費消息隊列
                          • 3.1.1 消費消息
                          • 3.1.2 處理消費結果
                          • 3.1.3 消息處理隊列核心方法

                        1. 概述

                        建議前置閱讀內容:

                        • 《RocketMQ 源碼分析 —— Message 發送與接收》
                        • <《RocketMQ 源碼分析 —— Message 拉取與消費(下)》

                        當然對Message發送與消費已經有一定了解的同學,可以選擇跳過。


                        RocketMQ提供了兩種順序級別:

                        • 普通順序消息 :Producer將相關聯的消息發送到相同的消息隊列。
                        • 完全嚴格順序 :在普通順序消息的基礎上,Consumer嚴格順序消費。

                        絕大部分場景下只需要用到普通順序消息。 例如說:給用戶發送短信消息 + 發送推送消息,將兩條消息發送到不同的消息隊列,若其中一條消息隊列消費較慢造成堵塞,用戶可能會收到兩條消息會存在一定的時間差,帶來的體驗會相對較差。當然類似這種場景,即使有一定的時間差,不會產生系統邏輯上BUG。另外,普通順序消息性能能更加好。 那么什么時候使用使用完全嚴格順序?如下是來自官方文檔的說明:

                        目前已知的應用只有數據庫binlog同步強依賴嚴格順序消息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序消息


                        ?上代碼。!

                        2.Producer順序發送

                        官方發送順序消息的例子

                        1: package org.apache.rocketmq.example.ordermessage;
                          2: 
                          3: import java.io.UnsupportedEncodingException;
                          4: import java.util.List;
                          5: import org.apache.rocketmq.client.exception.MQBrokerException;
                          6: import org.apache.rocketmq.client.exception.MQClientException;
                          7: import org.apache.rocketmq.client.producer.DefaultMQProducer;
                          8: import org.apache.rocketmq.client.producer.MQProducer;
                          9: import org.apache.rocketmq.client.producer.MessageQueueSelector;
                         10: import org.apache.rocketmq.client.producer.SendResult;
                         11: import org.apache.rocketmq.common.message.Message;
                         12: import org.apache.rocketmq.common.message.MessageQueue;
                         13: import org.apache.rocketmq.remoting.common.RemotingHelper;
                         14: import org.apache.rocketmq.remoting.exception.RemotingException;
                         15: 
                         16: public class Producer {
                         17:     public static void main(String[] args) throws UnsupportedEncodingException {
                         18:         try {
                         19:             MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                         20:             producer.start();
                         21: 
                         22:             String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
                         23:             for (int i = 0; i < 100; i++) {
                         24:                 int orderId = i % 10;
                         25:                 Message msg =
                         26:                     new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                         27:                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                         28:                 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                         29:                     @Override
                         30:                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                         31:                         Integer id = (Integer) arg;
                         32:                         int index = id % mqs.size();
                         33:                         return mqs.get(index);
                         34:                     }
                         35:                 }, orderId);
                         36: 
                         37:                 System.out.printf("%s%n", sendResult);
                         38:             }
                         39: 
                         40:             producer.shutdown();
                         41:         } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                         42:             e.printStackTrace();
                         43:         }
                         44:     }
                         45: }
                        復制
                        • 第 28 至 35 行 :實現了根據id%mqs.size()來進行消息隊列的選擇。當前例子,我們傳遞 orderId 作為參數,那么相同的 orderId 能夠進入相同的消息隊列。

                        MessageQueueSelector接口的源碼

                        1: public interface MessageQueueSelector {
                          2: 
                          3:     /**
                          4:      * 選擇消息隊列
                          5:      *
                          6:      * @param mqs 消息隊列
                          7:      * @param msg 消息
                          8:      * @param arg 參數
                          9:      * @return 消息隊列
                         10:      */
                         11:     MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
                         12: }
                        復制

                        Producer選擇隊列發送消息方法的源碼

                        16: private SendResult sendSelectImpl(//
                         17:     Message msg, //
                         18:     MessageQueueSelector selector, //
                         19:     Object arg, //
                         20:     final CommunicationMode communicationMode, //
                         21:     final SendCallback sendCallback, final long timeout//
                         22: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
                         23:     this.makeSureStateOK();
                         24:     Validators.checkMessage(msg, this.defaultMQProducer);
                         25: 
                         26:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
                         27:     if (topicPublishInfo != null && topicPublishInfo.ok()) {
                         28:         MessageQueue mq = null;
                         29:         try {
                         30:             mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
                         31:         } catch (Throwable e) {
                         32:             throw new MQClientException("select message queue throwed exception.", e);
                         33:         }
                         34: 
                         35:         if (mq != null) {
                         36:             return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
                         37:         } else {
                         38:             throw new MQClientException("select message queue return null.", null);
                         39:         }
                         40:     }
                         41: 
                         42:     throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
                         43: }
                        復制
                        • 第 30 行 :選擇消息隊列。
                        • 第 36 行 :發送消息。
                        3.Consumer嚴格順序消費

                        Consumer在嚴格順序消費時,通過 把鎖保證嚴格順序消費。

                        • Broker消息隊列鎖(分布式鎖) :
                          • 集群模式下,Consumer從Broker獲得該鎖后,才能進行消息拉取、消費。
                          • 廣播模式下,Consumer無需該鎖。
                        • Consumer消息隊列鎖(本地鎖) :Consumer獲得該鎖才能操作消息隊列。
                        • Consumer消息處理隊列消費鎖(本地鎖) :Consumer獲得該鎖才能消費消息隊列。

                        可能同學有疑問,為什么有Consumer消息隊列鎖還需要有Consumer消息隊列消費鎖呢??讓我們帶著疑問繼續往下看。


                        3.1 獲得(鎖定)消息隊列

                        集群模式下,Consumer更新屬于自己的消息隊列時,會向Broker鎖定該消息隊列(廣播模式下不需要)。如果鎖定失敗,則更新失敗,即該消息隊列不屬于自己,不能進行消費。核心代碼如下:

                        1: // ??????【RebalanceImpl.java】
                          2: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
                          3: // ..... 此處省略部分代碼 
                          4:     // 增加 不在processQueueTable && 存在于mqSet 里的消息隊列。
                          5:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數組
                          6:     for (MessageQueue mq : mqSet) {
                          7:         if (!this.processQueueTable.containsKey(mq)) {
                          8:             if (isOrder && !this.lock(mq)) { // 順序消息鎖定消息隊列
                          9:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                         10:                 continue;
                         11:             }
                         12: 
                         13:             this.removeDirtyOffset(mq);
                         14:             ProcessQueue pq = new ProcessQueue();
                         15:             long nextOffset = this.computePullFromWhere(mq);
                         16:             if (nextOffset >= 0) {
                         17:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                         18:                 if (pre != null) {
                         19:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                         20:                 } else {
                         21:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                         22:                     PullRequest pullRequest = new PullRequest();
                         23:                     pullRequest.setConsumerGroup(consumerGroup);
                         24:                     pullRequest.setNextOffset(nextOffset);
                         25:                     pullRequest.setMessageQueue(mq);
                         26:                     pullRequest.setProcessQueue(pq);
                         27:                     pullRequestList.add(pullRequest);
                         28:                     changed = true;
                         29:                 }
                         30:             } else {
                         31:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                         32:             }
                         33:         }
                         34:     }
                         35: 
                         36: // ..... 此處省略部分代碼 
                         37: }
                         38: 
                         39: // ??????【RebalanceImpl.java】
                         40: /**
                         41:  * 請求Broker獲得指定消息隊列的分布式鎖
                         42:  *
                         43:  * @param mq 隊列
                         44:  * @return 是否成功
                         45:  */
                         46: public boolean lock(final MessageQueue mq) {
                         47:     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
                         48:     if (findBrokerResult != null) {
                         49:         LockBatchRequestBody requestBody = new LockBatchRequestBody();
                         50:         requestBody.setConsumerGroup(this.consumerGroup);
                         51:         requestBody.setClientId(this.mQClientFactory.getClientId());
                         52:         requestBody.getMqSet().add(mq);
                         53: 
                         54:         try {
                         55:             // 請求Broker獲得指定消息隊列的分布式鎖
                         56:             Set<MessageQueue> lockedMq =
                         57:                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                         58: 
                         59:             // 設置消息處理隊列鎖定成功。鎖定消息隊列成功,可能本地沒有消息處理隊列,設置鎖定成功會在lockAll()方法。
                         60:             for (MessageQueue mmqq : lockedMq) {
                         61:                 ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                         62:                 if (processQueue != null) {
                         63:                     processQueue.setLocked(true);
                         64:                     processQueue.setLastLockTimestamp(System.currentTimeMillis());
                         65:                 }
                         66:             }
                         67: 
                         68:             boolean lockOK = lockedMq.contains(mq);
                         69:             log.info("the message queue lock {}, {} {}",
                         70:                 lockOK ? "OK" : "Failed",
                         71:                 this.consumerGroup,
                         72:                 mq);
                         73:             return lockOK;
                         74:         } catch (Exception e) {
                         75:             log.error("lockBatchMQ exception, " + mq, e);
                         76:         }
                         77:     }
                         78: 
                         79:     return false;
                         80: }
                        復制
                        • ??????
                        • 第 8 至 11 行 :順序消費時,鎖定消息隊列。如果鎖定失敗,新增消息處理隊列失敗。

                        Broker消息隊列鎖會過期,默認配置 30s。因此,Consumer需要不斷向Broker刷新該鎖過期時間,默認配置 20s 刷新一次。核心代碼如下:

                        1: // ??????【ConsumeMessageOrderlyService.java】
                          2: public void start() {
                          3:     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
                          4:         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                          5:             @Override
                          6:             public void run() {
                          7:                 ConsumeMessageOrderlyService.this.lockMQPeriodically();
                          8:             }
                          9:         }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
                         10:     }
                         11: }
                        復制3.2 移除消息隊列

                        集群模式下,Consumer移除自己的消息隊列時,會向Broker解鎖該消息隊列(廣播模式下不需要)。核心代碼如下:

                        1: // ??????【RebalancePushImpl.java】
                          2: /**
                          3:  * 移除不需要的隊列相關的信息
                          4:  * 1. 持久化消費進度,并移除之
                          5:  * 2. 順序消費&集群模式,解鎖對該隊列的鎖定
                          6:  *
                          7:  * @param mq 消息隊列
                          8:  * @param pq 消息處理隊列
                          9:  * @return 是否移除成功
                         10:  */
                         11: @Override
                         12: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
                         13:     // 同步隊列的消費進度,并移除之。
                         14:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
                         15:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
                         16:     // 集群模式下,順序消費移除時,解鎖對隊列的鎖定
                         17:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
                         18:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
                         19:         try {
                         20:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
                         21:                 try {
                         22:                     return this.unlockDelay(mq, pq);
                         23:                 } finally {
                         24:                     pq.getLockConsume().unlock();
                         25:                 }
                         26:             } else {
                         27:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
                         28:                     mq, //
                         29:                     pq.getTryUnlockTimes());
                         30: 
                         31:                 pq.incTryUnlockTimes();
                         32:             }
                         33:         } catch (Exception e) {
                         34:             log.error("removeUnnecessaryMessageQueue Exception", e);
                         35:         }
                         36: 
                         37:         return false;
                         38:     }
                         39:     return true;
                         40: }
                         41: 
                         42: // ??????【RebalancePushImpl.java】
                         43: /**
                         44:  * 延遲解鎖 Broker 消息隊列鎖
                         45:  * 當消息處理隊列不存在消息,則直接解鎖
                         46:  *
                         47:  * @param mq 消息隊列
                         48:  * @param pq 消息處理隊列
                         49:  * @return 是否解鎖成功
                         50:  */
                         51: private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
                         52:     if (pq.hasTempMessage()) { // TODO 疑問:為什么要延遲移除
                         53:         log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
                         54:         this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
                         55:             @Override
                         56:             public void run() {
                         57:                 log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
                         58:                 RebalancePushImpl.this.unlock(mq, true);
                         59:             }
                         60:         }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
                         61:     } else {
                         62:         this.unlock(mq, true);
                         63:     }
                         64:     return true;
                         65: }
                        復制
                        • ??????
                        • 第 20 至 32 行 :獲取消息隊列消費鎖,避免和消息隊列消費沖突。如果獲取鎖失敗,則移除消息隊列失敗,等待下次重新分配消費隊列時,再進行移除。如果未獲得鎖而進行移除,則可能出現另外的Consumer和當前Consumer同時消費該消息隊列,導致消息無法嚴格順序消費。
                        • 第 51 至 64 行 :解鎖Broker消息隊列鎖。如果消息處理隊列存在剩余消息,則延遲解鎖Broker消息隊列鎖。?為什么消息處理隊列存在剩余消息不能直接解鎖呢??我也不知道,百思不得其解。如果有知道的同學麻煩教育下俺。
                        3.3 消費消息隊列

                        ?本節會類比并發消費消費隊列,建議對照 PushConsumer并發消費消息 一起理解。

                        3.1.1 消費消息
                        1: // ??????【ConsumeMessageOrderlyService.java】
                          2: class ConsumeRequest implements Runnable {
                          3: 
                          4:     /**
                          5:      * 消息處理隊列
                          6:      */
                          7:     private final ProcessQueue processQueue;
                          8:     /**
                          9:      * 消息隊列
                         10:      */
                         11:     private final MessageQueue messageQueue;
                         12: 
                         13:     @Override
                         14:     public void run() {
                         15:         if (this.processQueue.isDropped()) {
                         16:             log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                         17:             return;
                         18:         }
                         19: 
                         20:         // 獲得 Consumer 消息隊列鎖
                         21:         final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                         22:         synchronized (objLock) {
                         23:             // (廣播模式) 或者 (集群模式 && Broker消息隊列鎖有效)
                         24:             if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                         25:                 || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                         26:                 final long beginTime = System.currentTimeMillis();
                         27:                 // 循環
                         28:                 for (boolean continueConsume = true; continueConsume; ) {
                         29:                     if (this.processQueue.isDropped()) {
                         30:                         log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                         31:                         break;
                         32:                     }
                         33: 
                         34:                     // 消息隊列分布式鎖未鎖定,提交延遲獲得鎖并消費請求
                         35:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                         36:                         && !this.processQueue.isLocked()) {
                         37:                         log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                         38:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                         39:                         break;
                         40:                     }
                         41:                     // 消息隊列分布式鎖已經過期,提交延遲獲得鎖并消費請求
                         42:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                         43:                         && this.processQueue.isLockExpired()) {
                         44:                         log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                         45:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                         46:                         break;
                         47:                     }
                         48: 
                         49:                     // 當前周期消費時間超過連續時長,默認:60s,提交延遲消費請求。默認情況下,每消費1分鐘休息10ms。
                         50:                     long interval = System.currentTimeMillis() - beginTime;
                         51:                     if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                         52:                         ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                         53:                         break;
                         54:                     }
                         55: 
                         56:                     // 獲取消費消息。此處和并發消息請求不同,并發消息請求已經帶了消費哪些消息。
                         57:                     final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                         58:                     List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                         59:                     if (!msgs.isEmpty()) {
                         60:                         final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
                         61: 
                         62:                         ConsumeOrderlyStatus status = null;
                         63: 
                         64:                         // ....省略代碼:Hook:before
                         65: 
                         66:                         // 執行消費
                         67:                         long beginTimestamp = System.currentTimeMillis();
                         68:                         ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                         69:                         boolean hasException = false;
                         70:                         try {
                         71:                             this.processQueue.getLockConsume().lock(); // 鎖定隊列消費鎖
                         72: 
                         73:                             if (this.processQueue.isDropped()) {
                         74:                                 log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                         75:                                     this.messageQueue);
                         76:                                 break;
                         77:                             }
                         78: 
                         79:                             status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                         80:                         } catch (Throwable e) {
                         81:                             log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
                         82:                                 RemotingHelper.exceptionSimpleDesc(e), //
                         83:                                 ConsumeMessageOrderlyService.this.consumerGroup, //
                         84:                                 msgs, //
                         85:                                 messageQueue);
                         86:                             hasException = true;
                         87:                         } finally {
                         88:                             this.processQueue.getLockConsume().unlock(); // 鎖定隊列消費鎖
                         89:                         }
                         90: 
                         91:                         // ....省略代碼:解析消費結果狀態
                         92: 
                         93:                         // ....省略代碼:Hook:after
                         94: 
                         95:                         ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                         96:                             .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                         97: 
                         98:                         // 處理消費結果
                         99:                         continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        100:                     } else {
                        101:                         continueConsume = false;
                        102:                     }
                        103:                 }
                        104:             } else {
                        105:                 if (this.processQueue.isDropped()) {
                        106:                     log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        107:                     return;
                        108:                 }
                        109: 
                        110:                 ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                        111:             }
                        112:         }
                        113:     }
                        114: 
                        115: }
                        復制
                        • ??????
                        • 第 20 行 :獲得Consumer消息隊列鎖。
                        • 第 58 行 :從消息處理隊列順序獲得消息。和并發消費獲得消息不同。并發消費請求在請求創建時,已經設置好消費哪些消息。
                        • 第 71 行 :獲得Consumer消息處理隊列消費鎖。相比【Consumer消息隊列鎖】,其粒度較小。這就是上文提到的?為什么有 Consumer消息隊列鎖還需要有 Consumer 消息隊列消費鎖呢的原因。
                        • 第 79 行 :執行消費。
                        • 第 99 行 :處理消費結果。
                        3.1.2 處理消費結果

                        順序消費消息結果 (ConsumeOrderlyStatus) 有四種情況:

                        • SUCCESS:消費成功但不提交。
                        • ROLLBACK:消費失敗,消費回滾。
                        • COMMIT:消費成功提交并且提交。
                        • SUSPEND_CURRENT_QUEUE_A_MOMENT:消費失敗,掛起消費隊列一會會,稍后繼續消費。

                        考慮到ROLLBACK、COMMIT暫時只使用在MySQLbinlog場景,官方將這兩狀態標記為@Deprecated。當然,相應的實現邏輯依然保留。

                        并發消費場景時,如果消費失敗,Consumer會將消費失敗消息發回到Broker重試隊列,跳過當前消息,等待下次拉取該消息再進行消費。

                        但是在完全嚴格順序消費消費時,這樣做顯然不行。也因此,消費失敗的消息,會掛起隊列一會會,稍后繼續消費。

                        不過消費失敗的消息一直失敗,也不可能一直消費。當超過消費重試上限時,Consumer會將消費失敗超過上限的消息發回到Broker死信隊列。

                        讓我們來看看代碼:

                        1: // ??????【ConsumeMessageOrderlyService.java】
                          2: /**
                          3:  * 處理消費結果,并返回是否繼續消費
                          4:  *
                          5:  * @param msgs 消息
                          6:  * @param status 消費結果狀態
                          7:  * @param context 消費Context
                          8:  * @param consumeRequest 消費請求
                          9:  * @return 是否繼續消費
                         10:  */
                         11: public boolean processConsumeResult(//
                         12:     final List<MessageExt> msgs, //
                         13:     final ConsumeOrderlyStatus status, //
                         14:     final ConsumeOrderlyContext context, //
                         15:     final ConsumeRequest consumeRequest//
                         16: ) {
                         17:     boolean continueConsume = true;
                         18:     long commitOffset = -1L;
                         19:     if (context.isAutoCommit()) {
                         20:         switch (status) {
                         21:             case COMMIT:
                         22:             case ROLLBACK:
                         23:                 log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue());
                         24:             case SUCCESS:
                         25:                 // 提交消息已消費成功到消息處理隊列
                         26:                 commitOffset = consumeRequest.getProcessQueue().commit();
                         27:                 // 統計
                         28:                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                         29:                 break;
                         30:             case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                         31:                 // 統計
                         32:                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                         33:                 if (checkReconsumeTimes(msgs)) { // 計算是否暫時掛起(暫停)消費N毫秒,默認:10ms
                         34:                     // 設置消息重新消費
                         35:                     consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                         36:                     // 提交延遲消費請求
                         37:                     this.submitConsumeRequestLater(//
                         38:                         consumeRequest.getProcessQueue(), //
                         39:                         consumeRequest.getMessageQueue(), //
                         40:                         context.getSuspendCurrentQueueTimeMillis());
                         41:                     continueConsume = false;
                         42:                 } else {
                         43:                     commitOffset = consumeRequest.getProcessQueue().commit();
                         44:                 }
                         45:                 break;
                         46:             default:
                         47:                 break;
                         48:         }
                         49:     } else {
                         50:         switch (status) {
                         51:             case SUCCESS:
                         52:                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                         53:                 break;
                         54:             case COMMIT:
                         55:                 // 提交消息已消費成功到消息處理隊列
                         56:                 commitOffset = consumeRequest.getProcessQueue().commit();
                         57:                 break;
                         58:             case ROLLBACK:
                         59:                 // 設置消息重新消費
                         60:                 consumeRequest.getProcessQueue().rollback();
                         61:                 this.submitConsumeRequestLater(//
                         62:                     consumeRequest.getProcessQueue(), //
                         63:                     consumeRequest.getMessageQueue(), //
                         64:                     context.getSuspendCurrentQueueTimeMillis());
                         65:                 continueConsume = false;
                         66:                 break;
                         67:             case SUSPEND_CURRENT_QUEUE_A_MOMENT: // 計算是否暫時掛起(暫停)消費N毫秒,默認:10ms
                         68:                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                         69:                 if (checkReconsumeTimes(msgs)) {
                         70:                     // 設置消息重新消費
                         71:                     consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                         72:                     // 提交延遲消費請求
                         73:                     this.submitConsumeRequestLater(//
                         74:                         consumeRequest.getProcessQueue(), //
                         75:                         consumeRequest.getMessageQueue(), //
                         76:                         context.getSuspendCurrentQueueTimeMillis());
                         77:                     continueConsume = false;
                         78:                 }
                         79:                 break;
                         80:             default:
                         81:                 break;
                         82:         }
                         83:     }
                         84: 
                         85:     // 消息處理隊列未dropped,提交有效消費進度
                         86:     if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                         87:         this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
                         88:     }
                         89: 
                         90:     return continueConsume;
                         91: }
                         92: 
                         93: private int getMaxReconsumeTimes() {
                         94:     // default reconsume times: Integer.MAX_VALUE
                         95:     if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
                         96:         return Integer.MAX_VALUE;
                         97:     } else {
                         98:         return this.defaultMQPushConsumer.getMaxReconsumeTimes();
                         99:     }
                        100: }
                        101: 
                        102: /**
                        103:  * 計算是否要暫停消費
                        104:  * 不暫停條件:存在消息都超過最大消費次數并且都發回broker成功
                        105:  *
                        106:  * @param msgs 消息
                        107:  * @return 是否要暫停
                        108:  */
                        109: private boolean checkReconsumeTimes(List<MessageExt> msgs) {
                        110:     boolean suspend = false;
                        111:     if (msgs != null && !msgs.isEmpty()) {
                        112:         for (MessageExt msg : msgs) {
                        113:             if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
                        114:                 MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
                        115:                 if (!sendMessageBack(msg)) { // 發回失敗,中斷
                        116:                     suspend = true;
                        117:                     msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        118:                 }
                        119:             } else {
                        120:                 suspend = true;
                        121:                 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        122:             }
                        123:         }
                        124:     }
                        125:     return suspend;
                        126: }
                        127: 
                        128: /**
                        129:  * 發回消息。
                        130:  * 消息發回broker后,對應的消息隊列是死信隊列。
                        131:  *
                        132:  * @param msg 消息
                        133:  * @return 是否發送成功
                        134:  */
                        135: public boolean sendMessageBack(final MessageExt msg) {
                        136:     try {
                        137:         // max reconsume times exceeded then send to dead letter queue.
                        138:         Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
                        139:         String originMsgId = MessageAccessor.getOriginMessageId(msg);
                        140:         MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
                        141:         newMsg.setFlag(msg.getFlag());
                        142:         MessageAccessor.setProperties(newMsg, msg.getProperties());
                        143:         MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
                        144:         MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
                        145:         MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
                        146:         newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
                        147: 
                        148:         this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
                        149:         return true;
                        150:     } catch (Exception e) {
                        151:         log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
                        152:     }
                        153: 
                        154:     return false;
                        155: }
                        復制
                        • ??????
                        • 第 21 至 29 行 :消費成功。在自動提交進度(AutoCommit)的情況下,COMMIT、ROLLBACK、SUCCESS邏輯已經統一。
                        • 第 30 至 45 行 :消費失敗。當消息重試次數超過上限(默認 :16次)時,將消息發送到Broker死信隊列,跳過這些消息。此時,消息隊列無需掛起,繼續消費后面的消息。
                        • 第 85 至 88 行 :提交消費進度。
                        3.13 消息處理隊列核心方法

                        ?涉及到的四個核心方法的源碼:

                        1: // ??????【ProcessQueue.java】
                          2: /**
                          3:  * 消息映射
                          4:  * key:消息隊列位置
                          5:  */
                          6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();    /**
                          7:  * 消息映射臨時存儲(消費中的消息)
                          8:  */
                          9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();
                         10: 
                         11: /**
                         12:  * 回滾消費中的消息
                         13:  * 邏輯類似于{@link #makeMessageToCosumeAgain(List)}
                         14:  */
                         15: public void rollback() {
                         16:     try {
                         17:         this.lockTreeMap.writeLock().lockInterruptibly();
                         18:         try {
                         19:             this.msgTreeMap.putAll(this.msgTreeMapTemp);
                         20:             this.msgTreeMapTemp.clear();
                         21:         } finally {
                         22:             this.lockTreeMap.writeLock().unlock();
                         23:         }
                         24:     } catch (InterruptedException e) {
                         25:         log.error("rollback exception", e);
                         26:     }
                         27: }
                         28: 
                         29: /**
                         30:  * 提交消費中的消息已消費成功,返回消費進度
                         31:  *
                         32:  * @return 消費進度
                         33:  */
                         34: public long commit() {
                         35:     try {
                         36:         this.lockTreeMap.writeLock().lockInterruptibly();
                         37:         try {
                         38:             // 消費進度
                         39:             Long offset = this.msgTreeMapTemp.lastKey();
                         40: 
                         41:             //
                         42:             msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
                         43: 
                         44:             //
                         45:             this.msgTreeMapTemp.clear();
                         46: 
                         47:             // 返回消費進度
                         48:             if (offset != null) {
                         49:                 return offset + 1;
                         50:             }
                         51:         } finally {
                         52:             this.lockTreeMap.writeLock().unlock();
                         53:         }
                         54:     } catch (InterruptedException e) {
                         55:         log.error("commit exception", e);
                         56:     }
                         57: 
                         58:     return -1;
                         59: }
                         60: 
                         61: /**
                         62:  * 指定消息重新消費
                         63:  * 邏輯類似于{@link #rollback()}
                         64:  *
                         65:  * @param msgs 消息
                         66:  */
                         67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
                         68:     try {
                         69:         this.lockTreeMap.writeLock().lockInterruptibly();
                         70:         try {
                         71:             for (MessageExt msg : msgs) {
                         72:                 this.msgTreeMapTemp.remove(msg.getQueueOffset());
                         73:                 this.msgTreeMap.put(msg.getQueueOffset(), msg);
                         74:             }
                         75:         } finally {
                         76:             this.lockTreeMap.writeLock().unlock();
                         77:         }
                         78:     } catch (InterruptedException e) {
                         79:         log.error("makeMessageToCosumeAgain exception", e);
                         80:     }
                         81: }
                         82: 
                         83: /**
                         84:  * 獲得持有消息前N條
                         85:  *
                         86:  * @param batchSize 條數
                         87:  * @return 消息
                         88:  */
                         89: public List<MessageExt> takeMessags(final int batchSize) {
                         90:     List<MessageExt> result = new ArrayList<>(batchSize);
                         91:     final long now = System.currentTimeMillis();
                         92:     try {
                         93:         this.lockTreeMap.writeLock().lockInterruptibly();
                         94:         this.lastConsumeTimestamp = now;
                         95:         try {
                         96:             if (!this.msgTreeMap.isEmpty()) {
                         97:                 for (int i = 0; i < batchSize; i++) {
                         98:                     Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                         99:                     if (entry != null) {
                        100:                         result.add(entry.getValue());
                        101:                         msgTreeMapTemp.put(entry.getKey(), entry.getValue());
                        102:                     } else {
                        103:                         break;
                        104:                     }
                        105:                 }
                        106:             }
                        107: 
                        108:             if (result.isEmpty()) {
                        109:                 consuming = false;
                        110:             }
                        111:         } finally {
                        112:             this.lockTreeMap.writeLock().unlock();
                        113:         }
                        114:     } catch (InterruptedException e) {
                        115:         log.error("take Messages exception", e);
                        116:     }
                        117: 
                        118:     return result;
                        119: }
                        復制
                        關聯標簽:
                        少妇各种各样BBBⅩXX