深入理解RocketMq普通消息和順序消息使用,原理,優化

                        小編:管理員 94閱讀 2022.08.02

                        1. 背景

                        最近一直再做一些系統上的壓測,并對一些問題做了優化,從這些里面收獲了一些很多好的優化經驗,后續的文章都會以這方面為主。

                        這次打壓的過程中收獲比較的大的是,對RocketMq的一些優化。最開始我們公司使用的是RabbitMq,在一些流量高峰的場景下,發現隊列堆積比較嚴重,導致RabbitMq掛了。為了應對這個場景,最終我們引入了阿里云的RocketMq,RocketMq可以處理可以處理很多消息堆積,并且服務的穩定不掛也可以由阿里云保證。引入了RocketMq了之后,的確解決了隊列堆積導致消息隊列宕機的問題。

                        本來以為使用了RocketMq之后,可以萬事無憂,但是其實在打壓過程中發現了不少問題,這里先提幾個問題,大家帶著這幾個問題在文中去尋找答案:

                        1. 在RocketMq中,如果消息隊列發生堆積,consumer會發生什么樣的影響?
                        2. 在RocketMq中,普通消息和順序消息有沒有什么辦法提升消息消費速度?
                        3. 消息失敗重試次數怎么設置較為合理?順序消息和普通消息有不同嗎?
                        2. 普通消息 VS 順序消息

                        在RocketMq中提供了多種消息類型讓我們進行配置:

                        • 普通消息:沒有特殊功能的消息。
                        • 分區順序消息:以分區緯度保持順序進行消費的消息。
                        • 全局順序消息:全局順序消息可以看作是只分一個區,始終在同一個分區上進行消費。
                        • 定時/延時消息:消息可以延遲一段特定時間進行消費。
                        • 事務消息:二階段事務消息,先進行prepare投遞消息,此時不能進行消息消費,當二階段發出commit或者rollback的時候才會進行消息的消費或者回滾。

                        雖然配置種類比較繁多,但是使用的還是普通消息和分區順序消息。后續主要講的也是這兩種消息。

                        2.1 發送消息2.1.1 普通消息

                        普通消息的發送的代碼比較簡單,如下所示:

                        public static void main(String[] args) throws MQClientException, InterruptedException {
                                DefaultMQProducer producer = new DefaultMQProducer("test_group_producer");
                                producer.setNamesrvAddr("127.0.0.1:9876");
                                producer.start();
                        
                                Message msg =
                                        new Message("Test_Topic", "test_tag", ("Hello World").getBytes(RemotingHelper.DEFAULT_CHARSET));
                                SendResult sendResult = producer.send(msg);
                                System.out.printf("%s%n", sendResult);
                                producer.shutdown();
                            }
                        復制

                        其內部核心代碼為:

                        private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout
                            ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
                                // 1. 根據 topic找到publishInfo
                                TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
                                if (topicPublishInfo != null && topicPublishInfo.ok()) {
                                    boolean callTimeout = false;
                                    MessageQueue mq = null;
                                    Exception exception = null;
                                    SendResult sendResult = null;
                                    // 如果是同步 就三次 否則就1次
                                    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                                    int times = 0;
                                    String[] brokersSent = new String[timesTotal];
                                    // 循環
                                    for (; times < timesTotal; times++) {
                                        String lastBrokerName = null == mq ? null : mq.getBrokerName();
                                        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                                        if (mqSelected != null) {
                                            mq = mqSelected;
                                            brokersSent[times] = mq.getBrokerName();
                                            try {
                                                beginTimestampPrev = System.currentTimeMillis();
                                                if (times > 0) {
                                                    //Reset topic with namespace during resend.
                                                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                                                }
                                                long costTime = beginTimestampPrev - beginTimestampFirst;
                                                if (timeout < costTime) {
                                                    callTimeout = true;
                                                    break;
                                                }
                        
                                                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                                                endTimestamp = System.currentTimeMillis();
                                                // 更新延遲
                                                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                                                switch (communicationMode) {
                                                    case ASYNC:
                                                        return null;
                                                    case ONEWAY:
                                                        return null;
                                                    case SYNC:
                                                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                                            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                                                continue;
                                                            }
                                                        }
                        
                                                        return sendResult;
                                                    default:
                                                        break;
                                                }
                                            } 
                                        } else {
                                            break;
                                        }
                                    }
                                // 省略
                        
                            }
                        復制

                        主要流程如下:

                        • Step 1:根據Topic 獲取TopicPublishInfo,TopicPublishInfo中有我們的Topic發布消息的信息(),這個數據先從本地獲取如果本地沒有,則從NameServer去拉取,并且定時每隔20s會去獲取TopicPublishInfo。
                        • Step 2:獲取總共執行次數(用于重試),如果發送方式是同步,那么總共次數會有3次,其他情況只會有1次。
                        • Step 3: 從MessageQueue中選取一個進行發送,MessageQueue的概念可以等同于Kafka的partion分區,看作發送消息的最小粒度。這個選擇有兩種方式:
                          • 根據發送延遲進行選擇,如果上一次發送的Broker是可用的,則從當前Broker選擇遍歷循環選擇一個,如果不可用那么需要選擇一個延遲最低的Broker從當前Broker上選擇MessageQueue。
                          • 通過輪訓的方式進行選擇MessageQueue。
                        • Step 4: 將Message發送至選擇出來的MessageQueue上的Broker。
                        • Step 5: 更新Broker的延遲。
                        • Step 6: 根據不同的發送方式來處理結果:
                          • Async: 異步發送,通過callBack關心結果,所以這里不進行處理。
                          • OneWay: 顧名思義,就是單向發送,只需要發給broker,不需要關心結果,這里連callback都不需要。
                          • Sync: 同步發送,需要關心結果,根據結果判斷是否需要進行重試,然后回到Step3。

                        可以看見Rocketmq發送普通消息的流程比較清晰簡單,下面來看看順序消息。

                        2.1.2 順序消息

                        順序消息分為分區順序消息和全局順序消息,全局順序消息比較容易理解,也就是哪條消息先進入,哪條消息就會先被消費,符合我們的FIFO,很多時候全局消息的實現代價很大,所以就出現了分區順序消息。分區順序消息的概念可以如下圖所示:

                        我們通過對消息的key,進行hash,相同hash的消息會被分配到同一個分區里面,當然如果要做全局順序消息,我們的分區只需要一個即可,所以全局順序消息的代價是比較大的。

                        對RocketMq熟悉的小伙伴會發現,它其實并沒有提供順序消息發送相關的API,但是在阿里云的RocketMq版本提供了順序消息的API,原理比較簡單,其實也是對現有API的一個封裝:

                        SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ, new MessageQueueSelector() {
                                        @Override
                                        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                            Object shardingKey) {
                                            int select = Math.abs(shardingKey.hashCode());
                                            if (select < 0) {
                                                select = 0;
                                            }
                                            return mqs.get(select % mqs.size());
                                        }
                                    }, shardingKey);
                        復制

                        可以看見順序消息將MessageQueue的選擇交由我們發送方去做,所以我們直接利用我們shardingKey的hashCode進行發送分區。

                        3.1 消費消息3.1.1 普通消息

                        普通消息使用比較簡單,如下面代碼所示:

                        public static void main(String[] args) throws InterruptedException, MQClientException {
                                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Test_Consumer");
                                consumer.subscribe("TopicTest", "*");
                                consumer.setNamesrvAddr("127.0.0.1:9876");
                                consumer.registerMessageListener(new MessageListenerConcurrently() {
                        
                                    @Override
                                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                });
                                consumer.setConsumeThreadMin(10);
                                consumer.setConsumeThreadMax(10);
                                consumer.start();
                                System.out.printf("Consumer Started.%n");
                            }
                        復制
                        • Step1:首先新建一個DefaultMQPushConsumer,并注冊對應的Topic和NameServer的信息。
                        • Step2: 注冊消息監聽器,在RocketMq中有兩種消息監聽器,一個是MessageListenerConcurrently,用于我們普通消息并發消費,還有一個是MessageListenerOrderly,用于我們順序消息。這里我們使用的MessageListenerConcurrently。
                        • Step3: 設置ConsumeThread大小,用于控制我們的線程池去消費他。
                        • Step4: 啟動Consumer。

                        啟動Consumer之后,我們就開始真正的從Broker去進行消費了,但是我們如何從Broker去消費的呢?首先在我們的第一步里面我們訂閱了一個Topic,我們就會定時去刷新Topic的相關信息比如MessageQueue的變更,然后將對應的MessageQueue分配給當前Consumer:

                        // 這個數據 是10s更新一次 從內存中獲取
                                        Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                                        // 這個數據實時去拉取
                                        List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                                        if (null == mqSet) {
                                            if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                                                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                                            }
                                        }
                        
                                        if (null == cidAll) {
                                            log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                                        }
                        
                                        if (mqSet != null && cidAll != null) {
                                            List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                                            mqAll.addAll(mqSet);
                        
                                            Collections.sort(mqAll);
                                            Collections.sort(cidAll);
                        
                                            AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                        
                                            List<MessageQueue> allocateResult = null;
                                            try {
                                            //通過默認的分配策略進行分配
                                                allocateResult =
                                                        strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll,
                                                                cidAll);
                                            } catch (Throwable e) {
                                                log.error(
                                                        "AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
                                                        strategy.getName(), e);
                                                return;
                                            }
                        
                                            Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                                            if (allocateResult != null) {
                                                allocateResultSet.addAll(allocateResult);
                                            }
                        
                                            boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        復制

                        這里首先向Broker拿到當前消費所有的ConsumerId默認是對應機器的Ip+實例名字,Broker中的ConsumerId信息是Consumer通過心跳定時進行上報得來的,然后根據消費分配策略將消息分配給Consumer,這里默認是平均分配,將我們分配到的消息隊列,記錄在 processQueueTable中,如果出現了新增,那么我們需要創建一個PullRequst代表這拉取消息的請求,異步去處理:

                        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
                                for (MessageQueue mq : mqSet) {
                                    if (!this.processQueueTable.containsKey(mq)) {
                                        if (isOrder && !this.lock(mq)) {
                                            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                                            continue;
                                        }
                        
                                        this.removeDirtyOffset(mq);
                                        ProcessQueue pq = new ProcessQueue();
                                        // 這里就是獲取我們第一次應該拿什么offset
                                        long nextOffset = this.computePullFromWhere(mq);
                                        if (nextOffset >= 0) {
                                            ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                                            if (pre != null) {
                                                log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                                            } else {
                                                log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                                                PullRequest pullRequest = new PullRequest();
                                                pullRequest.setConsumerGroup(consumerGroup);
                                                pullRequest.setNextOffset(nextOffset);
                                                pullRequest.setMessageQueue(mq);
                                                pullRequest.setProcessQueue(pq);
                                                pullRequestList.add(pullRequest);
                                                changed = true;
                                            }
                                        } else {
                                            log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                                        }
                                    }
                                }
                        
                                this.dispatchPullRequest(pullRequestList);
                        復制

                        在PullService中會不斷的從PullRequestQueue拿取數據,然后進行拉取數據。

                        while (!this.isStopped()) {
                                    try {
                                        // rebalance 之后第一次向這個隊列放數據 后續消費的時候會繼續放
                                        PullRequest pullRequest = this.pullRequestQueue.take();
                                        this.pullMessage(pullRequest);
                                    } catch (InterruptedException ignored) {
                                    } catch (Exception e) {
                                        log.error("Pull Message Service Run Method exception", e);
                                    }
                                }
                        復制

                        拉取數據之后,這里會給PullCallBack進行響應:

                        PullCallback pullCallback = new PullCallback() {
                                    @Override
                                    public void onSuccess(PullResult pullResult) {
                                        if (pullResult != null) {
                                            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                                                subscriptionData);
                        
                                            switch (pullResult.getPullStatus()) {
                                                case FOUND:
                                                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                                            pullResult.getMsgFoundList(),
                                                            processQueue,
                                                            pullRequest.getMessageQueue(),
                                                            dispatchToConsume);
                        
                                                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                                        } else {
                                                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                                        }
                        復制

                        如果這里成功拉取到消息的話,我們首先將拉取的消息存入到我們的ProcessQueue中,ProcessQueue用于我們消費者處理的狀態以及待處理的消息,然后提交到我們的Consumer線程池中進行真正的業務邏輯消費,然后再提交一個PullRequest用于我們下次消費。

                        大家看到這里有沒有發現這個模式和我們的netty中的單線程accpet,多個線程來處理業務邏輯很相似,其原理都是一樣,由一個線程不斷的去拉取,然后由我們業務上定義的線程池進行處理。如下圖所示:

                        我們發現我們拉取消息其實是一個循環的過程,這里就來到了第一個問題,如果消息隊列消費的速度跟不上消息發送的速度,那么就會出現消息堆積,很多同學根據過程來看可能會以為,我們的拉取消息一直在進行,由于我們的消費速度比較慢,會有很多message以隊列的形式存在于我們的內存中,那么會導致我們的JVM出現OOM也就是內存溢出。

                        那么到底會不會出現OOM呢?其實是不會的,RocketMq對安全性方面做得很好,有下面兩段代碼:

                        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
                                    System.out.println(cachedMessageCount + ":"+pullRequest);
                                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                                    return;
                                }
                        
                                if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
                                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                                    return;
                                }
                        復制

                        首先是會判斷當前內存緩存的Message數量是否大于限制的值默認是1000,如果大于則延遲一段時間再次提交pullRequest。 然后判斷當前內存緩存的Size大小是否大于了某個值,默認是100M,如果大于也會延遲一段時間再次提交pullRequest。 所以在我們consumer上如果出現消息堆積,基本也沒有什么影響。

                        那我們想想第二個問題應該怎么解決呢?在普通消息的場景下,如何提升消費速度?

                        • 首先肯定是需要提升我們本身的處理速度,處理速度提升,消費速度自然就會提升。
                        • 其次是要設置一個合理大小的consumer線程池,太小的話機器的資源得不到充分利用,太大的話線程的上下文切換可能會很快,一般來說根據消費者的業務來判斷,如果是cpu密集型線程設置cpu大小就好,如果是io密集型設置兩倍cpu大小。
                        • 還有個就是MessageQueue,細心的同學肯定在上面看見我們消費者消費消息之前,會被分配MessageQueue來進行獲取消費,所以自然而然就會想到,如果多分配一點MessageQueue數量是不是就會加快我們的消費速度,其實MessageQueue對于我們的普通消息消費提升幫助是很小的,因為所有的消費請求會被提交到線程池里面去消費,MessageQueue再多也無濟于事,除非當我們的Consumer機器很多的時候,MessageQueue數量小于Consumer機器的時候,這個時候增加MessageQueue才會有提升效果,正所謂讓我們的機器雨露均沾嘛。
                        3.1.1.1普通消息-消費結果處理

                        在rocketmq中對消息的消費結果處理也比較重要,這里還是先提三個問題:

                        • 我們的普通消息是怎么處理結果的呢?
                        • 如果消費失敗會怎么辦呢?
                        • 在普通消息消費的時候,是并發處理,如果出現offset靠后的消息先被消費完,但是我們的offset靠前的還沒有被消費完,這個時候出現了宕機,我們的offset靠前的這部分數據是否會丟失呢?也就是下次消費的時候是否會從offset靠后的沒有被消費的開始消費呢?如果不是的話,rocketmq是怎么做到的呢?

                        首先我們來看第一個問題,怎么處理消費結果,在processResult中有如下代碼:

                        public void processConsumeResult(
                                final ConsumeConcurrentlyStatus status,
                                final ConsumeConcurrentlyContext context,
                                final ConsumeRequest consumeRequest
                            ) {
                                int ackIndex = context.getAckIndex();
                                switch (status) {
                                    case CONSUME_SUCCESS:
                                        int ok = ackIndex + 1;
                                        int failed = consumeRequest.getMsgs().size() - ok;
                                        this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                                        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                                        break;
                                    case RECONSUME_LATER:
                                        ackIndex = -1;
                                        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                                            consumeRequest.getMsgs().size());
                                        break;
                                    default:
                                        break;
                                }
                        
                                switch (this.defaultMQPushConsumer.getMessageModel()) {
                                    case BROADCASTING:
                                        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                                            MessageExt msg = consumeRequest.getMsgs().get(i);
                                            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                                        }
                                        break;
                                    case CLUSTERING:
                                        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                                        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                                            MessageExt msg = consumeRequest.getMsgs().get(i);
                                            boolean result = this.sendMessageBack(msg, context);
                                            if (!result) {
                                                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                                                msgBackFailed.add(msg);
                                            }
                                        }
                        
                                        if (!msgBackFailed.isEmpty()) {
                                            consumeRequest.getMsgs().removeAll(msgBackFailed);
                        
                                            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                                        }
                                        break;
                                    default:
                                        break;
                                }
                        
                                long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
                                if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                                    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
                                }
                            }
                        復制
                        • Step 1: 首先獲取ackIndex,即確認成功的數量,默認是int的最大數,代表著全部成功。
                        • Step 2: 獲取 ConsumeConcurrentlyStatus,根據不同的狀態進行處理,ConsumeConcurrentlyStatus有兩個:
                        • CONSUME_SUCCESS: 代表著消費成功,記錄成功的TPS和失敗的TPS。
                        • RECONSUME_LATER: 代表著需要重新消費,一般是失敗才會返回這個狀態,記錄失敗的TPS。
                        • Step 3: 然后根據消息類型,進行不同的邏輯重試,消息消費類型有兩種:
                        • BROADCASTING: 廣播消費,廣播消費不會進行重試,這里會直接打一個warn日志然后丟棄。
                        • CLUSTERING:集群消費,這里會首先將失敗的消息發送回當前的topic,如果發送失敗,這里會繼續進行本地消費重試。如果在Broker中發現這個消息重試次數已經達到上限,就會將這個消息發送至RetryTopic,然后由RetryTopic發送至死信隊列。
                        • Step 4: 獲取message的offset,更新當前消費進度

                        在上面的第四步中,如果不深入進去看內部邏輯,這里會誤以為,他會將當前消息的offset給更新到最新的消費進度,那問題三中說的中間的offset是有可能被丟失的,但實際上是不會發生的,具體的邏輯保證在removeMessage中:

                        public long removeMessage(final List<MessageExt> msgs) {
                                long result = -1;
                                final long now = System.currentTimeMillis();
                                try {
                                    this.lockTreeMap.writeLock().lockInterruptibly();
                                    this.lastConsumeTimestamp = now;
                                    try {
                                        if (!msgTreeMap.isEmpty()) {
                                            result = this.queueOffsetMax + 1;
                                            int removedCnt = 0;
                                            for (MessageExt msg : msgs) {
                                                MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                                                if (prev != null) {
                                                    removedCnt--;
                                                    msgSize.addAndGet(0 - msg.getBody().length);
                                                }
                                            }
                                            msgCount.addAndGet(removedCnt);
                        
                                            if (!msgTreeMap.isEmpty()) {
                                                result = msgTreeMap.firstKey();
                                            }
                                        }
                                    } finally {
                                        this.lockTreeMap.writeLock().unlock();
                                    }
                                } catch (Throwable t) {
                                    log.error("removeMessage exception", t);
                                }
                                return result;
                            }
                        復制

                        在removeMessage中通過msgTreeMap去做了一個保證,msgTreeMap是一個TreeMap,根據offset升序排序,如果treeMap中有值的話,他返回的offset就會是當前msgTreeMap中的firstKey,而不是當前的offset,從而就解決了問題三。

                        上面的過程總結為下圖所示:

                        3.1.2 順序消息

                        順序消息的消費前面過程和普通消息基本一樣,這里我們需要關注的是將消息丟給我們消費線程池之后的邏輯:

                        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                                    synchronized (objLock) {
                                        // 省略
                                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                                        // 省略
                                    }
                        復制

                        可以發現這里比普通消息多了一個步驟,那就是加鎖,這里會獲取到以messageQueue為緯度去加鎖,然后去我們的processQueue中獲取到我們的Message, 這里也是用的我們的msgTreeMap, 獲取的最小offset的Message。

                        所以我們之前的線程池提高并發速度的策略在這里沒有用了,那么應該怎么辦呢?既然我們加鎖是以messageQueue為緯度,那么增加MessageQueue就好了,所以這里的提升消費速度剛好和普通消息相反,再普通消息中提升Messagequeue可能效果并沒有那么大,但是在順序消息的消費中提升就很大了。

                        我們在壓測的時候,發現順序消息消費很慢,消息堆積很嚴重,經過調試發現阿里云上的rocketmq默認讀寫隊列為16,我們consumer機器有10臺,每個consumer線程池大小為10,理論并發應該有100,但是由于順序消息的原因導致實際并發只有16,最后找阿里的技術人員將讀寫隊列擴至100,這樣充分利用我們的資源,極大的增加了順序消息消費的速度,消息基本不會再堆積。

                        3.1.2.1 順序消息-消費結果處理

                        順序消息的結果處理和普通消息的處理流程,稍有不同,代碼如下:

                        public boolean processConsumeResult(
                                final List<MessageExt> msgs,
                                final ConsumeOrderlyStatus status,
                                final ConsumeOrderlyContext context,
                                final ConsumeRequest consumeRequest
                            ) {
                                boolean continueConsume = true;
                                long commitOffset = -1L;
                                if (context.isAutoCommit()) {
                                    switch (status) {
                                        case SUCCESS:
                                            commitOffset = consumeRequest.getProcessQueue().commit();
                                            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                                            break;
                                        case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                                            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                                            if (checkReconsumeTimes(msgs)) {
                                                consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                                                this.submitConsumeRequestLater(
                                                    consumeRequest.getProcessQueue(),
                                                    consumeRequest.getMessageQueue(),
                                                    context.getSuspendCurrentQueueTimeMillis());
                                                continueConsume = false;
                                            } else {
                                                commitOffset = consumeRequest.getProcessQueue().commit();
                                            }
                                            break;
                                        default:
                                            break;
                                    }
                                } else {
                                    switch (status) {
                                        case SUCCESS:
                                            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                                            break;
                                        case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                                            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                                            if (checkReconsumeTimes(msgs)) {
                                                consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                                                this.submitConsumeRequestLater(
                                                    consumeRequest.getProcessQueue(),
                                                    consumeRequest.getMessageQueue(),
                                                    context.getSuspendCurrentQueueTimeMillis());
                                                continueConsume = false;
                                            }
                                            break;
                                        default:
                                            break;
                                    }
                                }
                        
                                if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                                    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
                                }
                        
                                return continueConsume;
                            }
                        復制
                        • Step 1: 判斷當前offset是否是自動提交更新,一般autoCommit不需要設置,默認是自動提交,除非有特別的需求才會做這樣一個設置。
                        • Step 2: 如果是自動提交,需要判斷狀態:
                          • SUCCESS: 如果是成功狀態則獲取當前需要提交的offset,然后記錄到OK的TPS中
                          • SUSPEND_CURRENT_QUEUE_A_MOMENT:注意在普通消息中如果失敗會返回RECONSUME_LATER,有什么不同呢?在這個狀態下面,并不會向當前topic再次發送,而是會在本地線程池再次提交一個ConsumeRequest,延遲重試,這里默認時間是1s。如果大于了最大重試次數這里會將數據發送至RetryTopic。
                        • Step 3: 如果不是自動提交的話,和步驟2類似,但是不會獲取提交的offset。
                        • Step 4: 更新offset。

                        這里回到我們的第三個問題,如何設置消息消費的重試次數呢?由于我們直接使用的阿里云的mq,所以我們又包裝了一層,方便接入。在接入層中我們最開始統一配置了最大重試2000次,這里設置2000次的原因主要是想讓我們的消息隊列盡量無限重試,因為我們默認消息基本最終會成功,但是為了以防萬一,所以這里設置了一個較大的數值2000次。設置2000次對于我們的普通消息,基本沒什么影響,因為他會重新投遞至broker,但是我們的順序消息是不行的,如果順序消息設置重試2000次,當遇到了這種不可能成功的消息的時候就會導致消息一直在本地進行重試,并且由于對隊列加鎖了,所以當前MessageQueue將會一直被阻塞,導致后續消息不會被消費,如果設置2000次那么至少會阻塞半個小時以上。所以這里應該將順序消息設置一個較小的值,目前我們設置為16。

                        4. 最后

                        之前沒怎么看過Rocketmq的源碼,經過這次打壓,從Rocketmq中學習到了很多精妙優秀的設計,將一些經驗提煉成了文中的一些問題,希望大家能仔細閱讀,找到答案。

                        關聯標簽:
                        少妇各种各样BBBⅩXX