萬字精華總結RocketMQ的常見用法

                        小編:管理員 68閱讀 2022.08.02

                        概述

                        上篇博文,我們介紹了什么是RocketMQ,以及如何安裝單機版的RocketMQ。在安裝的過程了,我們主要安裝了兩個服務,NameServer和Broker。在發送和接收消息時,又接觸了兩個概念,生產者和消費者。

                        那這些又代表什么含義呢?

                        對于單機版本的RocketMQ架構,如下圖所示:

                        主要分為四部分:

                        • 名字服務(Name Server)

                        Name Server充當路由消息的提供者。生產者或消費者能夠通過Name Server查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。

                        • 代理服務器(Broker Server)

                        Broker Server負責存儲消息、轉發消息。Broker在RocketMQ系統中負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。Broker也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。

                        • 消息生產者(Producer)

                        負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。

                        • 消息消費者(Consumer)

                        負責消費消息,一般是后臺系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序。

                        對于上面的學習,我們知道了RocketMQ的核心模塊以及相應的概念。那么,RocketMQ都有哪些發送消息的方式呢,又如何使用,使用的場景是什么,又是如何消費的?

                        常見用法

                        在項目中添加MQ客戶端依賴

                        <dependency>
                            <groupId>org.apache.rocketmq</groupId>
                            <artifactId>rocketmq-client</artifactId>
                            <version>x.x.x</version>
                        </dependency>
                        復制1、基本消息1.1消息發送
                        • 在基本消息發送中,我們使用RocketMQ發送三種類型的消息:同步消息、異步消息和單向消息。其中前兩種消息是可靠的,因為會有發送是否成功的應答。
                        • 使用RocketMQ兩個不同模式,來消費接收到的消息。
                        1、同步消息

                        這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。

                        public class SyncProducer {
                            public static void main(String[] args) throws Exception {
                                //1.創建消息生產者producer,并制定生產者組名
                                DefaultMQProducer producer = new
                                        DefaultMQProducer("please_rename_unique_group_name");
                                //2.指定Nameserver地址
                                producer.setNamesrvAddr("localhost:9876");
                                //3.啟動producer
                                producer.start();
                                for (int i = 0; i < 10; i++) {
                                    //4.創建消息對象,指定主題Topic、Tag和消息體
                                    /**
                                     * 參數一:消息主題Topic
                                     * 參數二:消息Tag
                                     * 參數三:消息內容
                                     */
                                    Message msg = new Message("TopicTest" /* Topic */,
                                            "TagA" /* Tag */,
                                            ("Hello RocketMQ " +
                                                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                                    );
                                    //5.發送同步消息,將消息發送給其中一個broker
                                    SendResult sendResult = producer.send(msg);
                                    System.out.printf("%s%n", sendResult);
                                }
                                //6.關閉生產者producer
                                producer.shutdown();
                            }
                        }
                        復制

                        上面的案例中設計到兩個陌生的概念,含義如下所示:

                        生產者組(Producer Group):同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。 標簽(Tag):為消息設置的標志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。

                        2、異步消息

                        異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。

                        public class AsyncProducer {
                            public static void main(String[] args) throws Exception {
                                //1.創建消息生產者producer,并制定生產者組名
                                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                                //2.指定Nameserver地址
                                producer.setNamesrvAddr("localhost:9876");
                                //3.啟動producer
                                producer.start();
                        
                                int messageCount = 10;
                                final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
                                for (int i = 0; i < messageCount; i++) {
                                    try {
                                        final int index = i;
                                        //4.創建消息對象,指定主題Topic、Tag和消息體
                                        /**
                                         * 參數一:消息主題Topic
                                         * 參數二:消息Tag
                                         * 參數三:消息內容
                                         */
                                        Message msg = new Message("TopicTest",
                                                "TagA",
                                                "OrderID188",
                                                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                        
                                        //5.發送異步消息,SendCallback接收異步返回結果的回調
                                        producer.send(msg, new SendCallback() {
                                            @Override
                                            public void onSuccess(SendResult sendResult) {
                                                countDownLatch.countDown();
                                                System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                                            }
                        
                                            @Override
                                            public void onException(Throwable e) {
                                                countDownLatch.countDown();
                                                System.out.printf("%-10d Exception %s %n", index, e);
                                                e.printStackTrace();
                                            }
                                        });
                                    } catch (Exception e) {
                                        e.printStackTrace();
                                    }
                                }
                                countDownLatch.await(5, TimeUnit.SECONDS);
                        
                                //6.關閉生產者producer
                                producer.shutdown();
                            }
                        
                        }
                        復制

                        keys:Message索引鍵,多個用空格隔開,RocketMQ可以根據這些key快速檢索到消息對消息關鍵字的提取方便查詢。

                        3、單向消息

                        這種方式主要用在不特別關心發送結果的場景,例如日志發送。

                        只發送消息,不等待服務器響應,只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。

                        public class OneWayProducer {
                        
                            public static void main(String[] args) throws Exception, MQBrokerException {
                                //1.創建消息生產者producer,并制定生產者組名
                                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                                //2.指定Nameserver地址
                                producer.setNamesrvAddr("localhost:9876");
                                //3.啟動producer
                                producer.start();
                        
                                for (int i = 0; i < 10; i++) {
                                    //4.創建消息對象,指定主題Topic、Tag和消息體
                                    /**
                                     * 參數一:消息主題Topic
                                     * 參數二:消息Tag
                                     * 參數三:消息內容
                                     */
                                    Message msg = new Message("TopicTest", "TagA", ("Hello World,單向消息" + i).getBytes());
                                    //5.發送單向消息
                                    producer.sendOneway(msg);
                        
                                    //線程睡1秒
                                    TimeUnit.SECONDS.sleep(5);
                                }
                        
                                //6.關閉生產者producer
                                producer.shutdown();
                            }
                        }
                        復制1.2消息消費

                        此時,RocketMQ中已經有我們需要發送的消息了,我們使用RocketMQ來消費隊列中的消息。接收消息有兩種模式:

                        • 負載均衡模式(Clustering)
                        • 廣播模式(Broadcasting)

                        啟動多個消費者,最直接的區別:模式不同,消費的消息不同。

                        1、負載均衡模式

                        默認模式,消費者采用負載均衡方式消費消息,相同消費者組的每個消費者共同消費隊列中的消息即每個Consumer實例平均分攤消息,每個消費者處理的消息不同。消費進度存儲在服務端。

                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-14 15:22
                         *
                         * 異步消息,同步消息,單向消息 - 消費者 - 負載均衡模式
                         */
                        public class ClusteringConsumer {
                        
                            public static void main(String[] args) throws InterruptedException, MQClientException {
                        
                                //創建一個消息消費者,并設置一個消息消費者組
                                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
                        
                                //指定 NameServer 地址
                                consumer.setNamesrvAddr("localhost:9876");
                        
                                // Subscribe one more more topics to consume.
                                //訂閱指定 Topic 下的所有消息
                                consumer.subscribe("TopicTest", "*");
                        
                                //負載均衡模式,默認
                                consumer.setMessageModel(MessageModel.CLUSTERING);
                                // Register callback to execute on arrival of messages fetched from brokers.
                                // 注冊回調函數,處理消息
                                consumer.registerMessageListener(new MessageListenerConcurrently() {
                        
                                    @Override
                                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                                    ConsumeConcurrentlyContext context) {
                                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                        
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                });
                        
                                // 啟動消費者
                                consumer.start();
                        
                                System.out.println("消息消費者已啟動");
                            }
                        }
                        復制2、廣播模式

                        消費者采用廣播的方式消費消息,相同Consumer Group的每個消費者消費的消息都是相同的。消費進度存儲在消費者本地。

                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 19:02
                         *
                         *
                         *  異步消息,同步消息,單向消息 - 消費者 - 廣播模式
                         */
                        public class BroadcastConsumer {
                        
                            public static void main(String[] args) throws Exception {
                        
                                //創建一個消息消費者,并設置一個消息消費者組
                                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
                        
                                //指定 NameServer 地址
                                consumer.setNamesrvAddr("localhost:9876");
                                //設置廣播模式
                                consumer.setMessageModel(MessageModel.BROADCASTING);
                        
                                //訂閱指定 Topic 下的所有消息
                                consumer.subscribe("TopicTest", "*");
                        
                                // 注冊回調函數,處理消息
                                consumer.registerMessageListener(new MessageListenerConcurrently() {
                        
                                    @Override
                                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                                    ConsumeConcurrentlyContext context) {
                        //                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                        
                                        //默認 list 里只有一條消息,可以通過設置參數來批量接收消息
                                        if (msgs != null) {
                                            for (MessageExt ext : msgs) {
                                                try {
                                                    System.out.println(new Date() + ext.toString() + new String(ext.getBody(), "UTF-8"));
                                                } catch (UnsupportedEncodingException e) {
                                                    e.printStackTrace();
                                                }
                                            }
                                        }
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                });
                        
                                consumer.start();
                                System.out.println("消息消費者已啟動");
                            }
                        }
                        復制

                        消費者組(Consumer Group):同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。

                        2、順序消息

                        消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了三條消息分別是訂單創建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以并行消費的。RocketMQ可以嚴格的保證消息有序。

                        順序消息分為全局順序消息與分區順序消息,全局順序是指某個Topic下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。

                        • 全局順序 對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。 實現方式: 當發送和消費參與的queue只有一個 適用場景: 性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景
                        • 分區順序 對于指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。 實現方式: 如果多個queue參與,按照Sharding key選擇隊列,則為分區有序,即相對每個queue,消息都是有序的。 適用場景: 性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。

                        下面用訂單進行分區有序的示例。一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。

                        2.1順序消息生產
                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 11:17
                         *
                         * 順序消息-生產者
                         */
                        public class ProducerInOrder {
                            public static void main(String[] args) throws Exception {
                                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                        
                                producer.setNamesrvAddr("localhost:9876");
                        
                                producer.start();
                        
                                String[] tags = new String[]{"TagA", "TagC", "TagD"};
                        
                                // 訂單列表
                                List<OrderStep> orderList = new ProducerInOrder().buildOrders();
                        
                                Date date = new Date();
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                String dateStr = sdf.format(date);
                                for (int i = 0; i < orderList.size(); i++) {
                                    // 加個時間前綴
                                    String body = dateStr + " Hello RocketMQ " + orderList.get(i);
                                    Message msg = new Message("OrderTopic", tags[i % tags.length], "KEY" + i, body.getBytes());
                        
                                    //自定義消息隊列選取規則
                        //            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        //                @Override
                        //                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        //                    Long id = (Long) arg;  //根據訂單id選擇發送queue
                        //                    long index = id % mqs.size();
                        //                    return mqs.get((int) index);
                        //                }
                        //            }, orderList.get(i).getOrderId());//訂單id
                        
                                    //SelectMessageQueueByHash,官方提供的選取規則,還有其他實現,大家自行發現
                                    SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderList.get(i).getOrderId());
                        
                                    System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                                            sendResult.getSendStatus(),
                                            sendResult.getMessageQueue().getQueueId(),
                                            body));
                        
                                }
                                producer.shutdown();
                            }
                        
                            /**
                             * 訂單的步驟
                             */
                            private static class OrderStep {
                                private long orderId;
                                private String desc;
                        
                                public long getOrderId() {
                                    return orderId;
                                }
                        
                                public void setOrderId(long orderId) {
                                    this.orderId = orderId;
                                }
                        
                                public String getDesc() {
                                    return desc;
                                }
                        
                                public void setDesc(String desc) {
                                    this.desc = desc;
                                }
                        
                                @Override
                                public String toString() {
                                    return "OrderStep{" +
                                            "orderId=" + orderId +
                                            ", desc='" + desc + '\'' +
                                            '}';
                                }
                            }
                        
                            /**
                             * 生成模擬訂單數據
                             */
                            private List<OrderStep> buildOrders() {
                                List<OrderStep> orderList = new ArrayList<OrderStep>();
                        
                                OrderStep orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103111039L);
                                orderDemo.setDesc("創建");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103111065L);
                                orderDemo.setDesc("創建");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103111039L);
                                orderDemo.setDesc("付款");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103117235L);
                                orderDemo.setDesc("創建");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103111065L);
                                orderDemo.setDesc("付款");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103117235L);
                                orderDemo.setDesc("付款");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103111065L);
                                orderDemo.setDesc("完成");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103111039L);
                                orderDemo.setDesc("推送");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103117235L);
                                orderDemo.setDesc("完成");
                                orderList.add(orderDemo);
                        
                                orderDemo = new OrderStep();
                                orderDemo.setOrderId(15103111039L);
                                orderDemo.setDesc("完成");
                                orderList.add(orderDemo);
                        
                                return orderList;
                            }
                        
                        }
                        復制2.2順序消息消費
                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 11:28
                         *
                         * 順序消息-消費者
                         */
                        public class ConsumerInOrder {
                        
                            public static void main(String[] args) throws Exception {
                                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
                                consumer.setNamesrvAddr("localhost:9876");
                                /**
                                 * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
                                 * 如果非第一次啟動,那么按照上次消費的位置繼續消費
                                 */
                                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                        
                                consumer.subscribe("OrderTopic", "TagA || TagC || TagD");
                        
                                consumer.registerMessageListener(new MessageListenerOrderly() {
                        
                                    Random random = new Random();
                        
                                    @Override
                                    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                                        context.setAutoCommit(true);
                                        for (MessageExt msg : msgs) {
                                            // 可以看到每個queue有唯一的consume線程來消費, 訂單對每個queue(分區)有序
                                            System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                                        }
                        
                                        try {
                                            //模擬業務邏輯處理中...
                                            TimeUnit.SECONDS.sleep(random.nextInt(10));
                                        } catch (Exception e) {
                                            e.printStackTrace();
                                        }
                                        return ConsumeOrderlyStatus.SUCCESS;
                                    }
                                });
                        
                                // 啟動消費者
                                consumer.start();
                        
                                System.out.println("消息消費者已啟動");
                            }
                        }
                        復制

                        注意:MessageListenerOrderly是順序消息監聽器,每個隊列只有一個線程消費。

                        普通順序消息(Normal Ordered Message) 普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。 嚴格順序消息(Strictly Ordered Message) 嚴格順序消息模式下,消費者收到的所有消息均是有順序的。

                        • 順序消費的原理解析

                        在默認的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。

                        3、延時消息

                        定時消息(延遲隊列)是指消息發送到broker后,不會立即被消費,等待特定時間投遞給真正的topic。 broker有配置項messageDelayLevel,默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level?梢耘渲米远xmessageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬于某個topic。發消息時,設置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:

                        • level == 0,消息為非延遲消息
                        • 1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s
                        • level > maxLevel,則level== maxLevel,例如level==20,延遲2h

                        定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發送延遲的消息能夠順序消費。broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。需要注意的是,定時消息會在第一次寫入和調度寫入真實topic時都會計數,因此發送數量、tps都會變高。

                        應用場景:

                        比如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。

                        3.1延時消息生產
                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 14:48
                         *
                         * 延時消息 - 生產者
                         */
                        public class ScheduledMessageProducer {
                        
                        
                            public static void main(String[] args) throws Exception {
                                // 實例化一個生產者來產生延時消息
                                DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
                                producer.setNamesrvAddr("localhost:9876");
                                // 啟動生產者
                                producer.start();
                                int totalMessagesToSend = 10;
                                for (int i = 0; i < totalMessagesToSend; i++) {
                                    Message message = new Message("DelayTopic", ("Hello scheduled message " + i).getBytes());
                                    // 設置延時等級3,這個消息將在10s之后發送(現在只支持固定的幾個時間,詳看delayTimeLevel)
                                    message.setDelayTimeLevel(3);
                                    // 發送消息
                                    producer.send(message);
                                }
                                // 關閉生產者
                                producer.shutdown();
                            }
                        
                        }
                        復制3.2延時消息消費
                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 14:45
                         *
                         *
                         * 延時消息 - 消費者
                         */
                        public class ScheduledMessageConsumer {
                        
                            public static void main(String[] args) throws Exception {
                                // 實例化消費者
                                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
                                consumer.setNamesrvAddr("localhost:9876");
                        
                                // 訂閱Topics
                                consumer.subscribe("DelayTopic", "*");
                                // 注冊消息監聽者
                                consumer.registerMessageListener(new MessageListenerConcurrently() {
                                    @Override
                                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                                        for (MessageExt message : messages) {
                                            // Print approximate delay time period
                                            System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                                        }
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                });
                                // 啟動消費者
                                consumer.start();
                        
                                System.out.println("消息消費者已啟動");
                            }
                        }
                        復制3.3使用限制
                        // org/apache/rocketmq/store/config/MessageStoreConfig.java
                        private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
                        復制

                        現在RocketMq并不支持任意時間的延時,需要設置幾個固定的延時等級,從1s到2h分別對應著等級1到18,消息消費失敗會進入延時消息隊列,消息發送時間與設置的延時等級和重試次數有關

                        4、批量消息

                        批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過4MB。

                        4.1批量消息生產
                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 15:10
                         *
                         * 批量消息 - 生產者
                         */
                        public class SimpleBatchProducer {
                        
                        
                            public static void main(String[] args) throws Exception {
                                //1、創建一個消息生產者,并設置一個消息生產者組
                                DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
                                //2、指定 NameServer 地址
                                producer.setNamesrvAddr("localhost:9876");
                                //3.啟動producer
                                producer.start();
                        
                                //If you just send messages of no more than 1MiB at a time, it is easy to use batch
                                //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
                                String topic = "BatchTopic";
                                List<Message> messages = new ArrayList<>();
                                messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
                                messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
                                messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
                        
                                producer.send(messages);
                        
                                // 一旦生產者實例不再被使用則將其關閉,包括清理資源,關閉網絡連接等
                                producer.shutdown();
                            }
                        }
                        復制4.2批量消息消費
                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 15:12
                         *
                         * 批量消息 - 消費者
                         */
                        public class SimpleBatchConsumer {
                        
                            public static void main(String[] args) throws Exception {
                                //創建一個消息消費者,并設置一個消息消費者組
                                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
                                //指定 NameServer 地址
                                consumer.setNamesrvAddr("localhost:9876");
                                //設置 TagFilterConsumer 第一次啟動時從隊列頭部開始消費還是隊列尾部開始消費
                        //        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                                //訂閱指定 Topic 下的所有消息
                                consumer.subscribe("BatchTopic", "*");
                        
                                //注冊消息監聽器
                                consumer.registerMessageListener(new MessageListenerConcurrently() {
                        
                                    @Override
                                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                                        //默認 list 里只有一條消息,可以通過設置參數來批量接收消息
                                        if (list != null) {
                                            for (MessageExt ext : list) {
                                                try {
                                                    System.out.println(new Date() + ext.toString() +  "   內容:" + new String(ext.getBody(), "UTF-8"));
                                                } catch (UnsupportedEncodingException e) {
                                                    e.printStackTrace();
                                                }
                                            }
                                        }
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                });
                        
                                // 消費者對象在使用之前必須要調用 start 初始化
                                consumer.start();
                                System.out.println("消息消費者已啟動");
                            }
                        }
                        復制5、過濾消息

                        RocketMQ的消費者可以根據Tag進行消息過濾,也支持自定義屬性過濾。消息過濾目前是在Broker端實現的,優點是減少了對于Consumer無用消息的網絡傳輸,缺點是增加了Broker的負擔、而且實現相對復雜。

                        5.1 根據Tag過濾

                        在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的消息。例如:

                        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
                        consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
                        復制5.1.1消息生產者
                        public class TagFilterProducer {
                        
                            public static void main(String[] args) throws Exception{
                        
                                //創建一個消息生產者,并設置一個消息生產者組
                                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                        
                                //指定 NameServer 地址
                                producer.setNamesrvAddr("localhost:9876");
                        
                                //初始化 ProducerInOrder,整個應用生命周期內只需要初始化一次
                                producer.start();
                        
                                String[] tags = new String[] {"TagA", "TagB", "TagC"};
                        
                        
                                for (int i = 0; i < 10; i++) {
                                    //創建一條消息對象,指定其主題、標簽和消息內容
                                    Message msg = new Message(
                                            "FilterTagTopic" /* 消息主題名 */,
                                            tags[i % tags.length] /* 消息標簽 */,
                                            ("sync producer Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
                                    );
                        
                                    //發送消息并返回結果
                                    SendResult sendResult = producer.send(msg);
                        
                                    System.out.printf("%s%n", sendResult);
                                }
                        
                                // 一旦生產者實例不再被使用則將其關閉,包括清理資源,關閉網絡連接等
                                producer.shutdown();
                        
                            }
                        }
                        復制5.1.2消息消費者
                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 15:19
                         *
                         * MQClientException: CODE: 1  DESC: The broker does not support consumer to filter message by SQL92
                         * 需要添加配置
                         * # 開啟對propertyFilter的支持
                         * enablePropertyFilter = true
                         *
                         */
                        public class TagFilterConsumer {
                        
                        
                            public static void main(String[] args) throws Exception {
                                //創建一個消息消費者,并設置一個消息消費者組
                                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
                                //指定 NameServer 地址
                                consumer.setNamesrvAddr("localhost:9876");
                                //設置 TagFilterConsumer 第一次啟動時從隊列頭部開始消費還是隊列尾部開始消費
                                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                                //訂閱指定 Topic 下的所有消息
                                consumer.subscribe("FilterTagTopic", "TagA || TagB");
                                //注冊消息監聽器
                                consumer.registerMessageListener(new MessageListenerConcurrently() {
                        
                                    @Override
                                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                                        //默認 list 里只有一條消息,可以通過設置參數來批量接收消息
                                        if (list != null) {
                                            for (MessageExt ext : list) {
                                                try {
                                                    System.out.println(new Date() + ext.toString() + new String(ext.getBody(), "UTF-8"));
                                                } catch (UnsupportedEncodingException e) {
                                                    e.printStackTrace();
                                                }
                                            }
                                        }
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                });
                        
                                // 啟動消費者
                                consumer.start();
                                System.out.println("消息消費者已啟動");
                            }
                        }
                        復制

                        消費者將接收包含TAGA或TAGB或TAGC的消息。但是限制是一個消息只能有一個標簽,這對于復雜的場景可能不起作用。

                        5.2根據SQL過濾

                        使用Tag有一定的局限性,也可以使用SQL表達式篩選消息。SQL特性可以通過發送消息時的屬性來進行計算。在RocketMQ定義的語法下,可以實現一些簡單的邏輯。下面是一個例子:

                        ------------
                        | message  |
                        |----------|  a > 5 AND b = 'abc'
                        | a = 10   |  --------------------> Gotten
                        | b = 'abc'|
                        | c = true |
                        ------------
                        ------------
                        | message  |
                        |----------|   a > 5 AND b = 'abc'
                        | a = 1    |  --------------------> Missed
                        | b = 'abc'|
                        | c = true |
                        ------------
                        復制5.2.1SQL基本語法

                        RocketMQ只定義了一些基本語法來支持這個特性。你也可以很容易地擴展它。

                        • 數值比較,比如:>,>=,<,<=,BETWEEN,=;
                        • 字符比較,比如:=,<>,IN;
                        • IS NULL 或者 IS NOT NULL;
                        • 邏輯符號 AND,OR,NOT;

                        常量支持類型為:

                        • 數值,比如:123,3.1415;
                        • 字符,比如:‘abc’,必須用單引號包裹起來;
                        • NULL,特殊的常量
                        • 布爾值,TRUEFALSE

                        只有使用push模式的消費者才能用使用SQL92標準的sql語句,接口如下:

                        public void subscribe(finalString topic, final MessageSelector messageSelector)
                        復制5.2.2 消息生產者

                        發送消息時,你能通過putUserProperty來設置消息的屬性

                        public class SqlFilterProducer {
                        
                        
                            public static void main(String[] args) throws Exception{
                        
                                //創建一個消息生產者,并設置一個消息生產者組
                                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                        
                                //指定 NameServer 地址
                                producer.setNamesrvAddr("localhost:9876");
                        
                                //初始化 ProducerInOrder,整個應用生命周期內只需要初始化一次
                                producer.start();
                        
                                String[] tags = new String[] {"TagA", "TagB", "TagC"};
                                for (int i = 0; i < 10; i++) {
                                    //創建一條消息對象,指定其主題、標簽和消息內容
                                    Message message = new Message(
                                            "FilterSQLTopic" /* 消息主題名 */,
                                            tags[i % tags.length] /* 消息標簽 */,
                                            ("sync producer Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
                                    );
                        
                                    message.putUserProperty("a", String.valueOf(i));
                        
                                    //發送消息并返回結果
                                    SendResult sendResult = producer.send(message);
                        
                                    System.out.printf("%s%n", sendResult);
                                }
                        
                                // 一旦生產者實例不再被使用則將其關閉,包括清理資源,關閉網絡連接等
                                producer.shutdown();
                        
                            }
                        }
                        復制5.2.3 消息消費者

                        用MessageSelector.bySql來使用sql篩選消息

                        /**
                         * @PROJECT_NAME: SpringCloud-Learning
                         * @USER: yuliang
                         * @DESCRIPTION:
                         * @DATE: 2021-04-19 15:19
                         * MQClientException: CODE: 1  DESC: The broker does not support consumer to filter message by SQL92
                         * 需要添加配置
                         * # 開啟對propertyFilter的支持
                         * enablePropertyFilter = true
                         */
                        public class SqlFilterConsumer {
                        
                            public static void main(String[] args) throws Exception {
                                //創建一個消息消費者,并設置一個消息消費者組
                                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
                                //指定 NameServer 地址
                                consumer.setNamesrvAddr("localhost:9876");
                                //設置 TagFilterConsumer 第一次啟動時從隊列頭部開始消費還是隊列尾部開始消費
                                //訂閱指定 Topic 下的所有消息
                        //        consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("a between 0 and 3"));
                        
                                // Don't forget to set enablePropertyFilter=true in broker
                                consumer.subscribe("FilterSQLTopic",
                                        MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                                                "and (a is not null and a between 0 and 3)"));
                        
                                //注冊消息監聽器
                                consumer.registerMessageListener(new MessageListenerConcurrently() {
                        
                                    @Override
                                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                                        //默認 list 里只有一條消息,可以通過設置參數來批量接收消息
                                        if (list != null) {
                                            for (MessageExt ext : list) {
                                                try {
                                                    System.out.println(new Date() + ext.toString() + new String(ext.getBody(), "UTF-8"));
                                                } catch (UnsupportedEncodingException e) {
                                                    e.printStackTrace();
                                                }
                                            }
                                        }
                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                    }
                                });
                        
                                // 啟動消費者
                                consumer.start();
                                System.out.println("消息消費者已啟動");
                            }
                        }
                        復制6、事務消息

                        RocketMQ事務消息(Transactional Message)是指應用本地事務和發送消息操作可以被定義到全局事務中,要么同時成功,要么同時失敗。RocketMQ的事務消息提供類似 X/Open XA 的分布事務功能,通過事務消息能達到分布式事務的最終一致。

                        6.1 流程分析

                        上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程。

                        1、事務消息發送及提交

                        (1) 發送消息(half消息)。

                        (2) 服務端響應消息寫入結果。

                        (3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。

                        (4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)

                        2、事務補償

                        (1) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查”

                        (2) Producer收到回查消息,檢查回查消息對應的本地事務的狀態

                        (3) 根據本地事務狀態,重新Commit或者Rollback

                        其中,補償階段用于解決消息Commit或者Rollback發生超時或者失敗的情況。

                        3、事務消息狀態

                        事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:

                        • TransactionStatus.CommitTransaction: 提交事務,它允許消費者消費此消息。
                        • TransactionStatus.RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。
                        • TransactionStatus.Unknown: 中間狀態,它代表需要檢查消息隊列來確定狀態。
                        6.2 發送事務消息1) 創建事務性生產者

                        使用TransactionMQProducer類創建生產者,并指定唯一的ProducerGroup,就可以設置自定義線程池來處理這些檢查請求。執行本地事務后、需要根據執行結果對消息隊列進行回復;貍鞯氖聞諣顟B在請參考前一節。

                        public class Producer {
                        
                            public static void main(String[] args) throws Exception{
                        
                                //創建一個消息生產者,并設置一個消息生產者組
                                TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
                        
                                //指定 NameServer 地址
                                producer.setNamesrvAddr("localhost:9876");
                        
                                TransactionListener transactionListener = new TransactionListenerImpl();
                        
                                producer.setTransactionListener(transactionListener);
                        
                                ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                                    @Override
                                    public Thread newThread(Runnable r) {
                                        Thread thread = new Thread(r);
                                        thread.setName("client-transaction-msg-check-thread");
                                        return thread;
                                    }
                                });
                        
                                producer.setExecutorService(executorService);
                        
                        
                                //初始化 ProducerInOrder,整個應用生命周期內只需要初始化一次
                                producer.start();
                        
                        
                                String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
                                for (int i = 0; i < 10; i++) {
                                    try {
                                        Message msg =
                                                new Message("TransTopic", tags[i % tags.length], "KEY" + i,
                                                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                                        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                                        System.out.printf("%s%n", sendResult);
                                        Thread.sleep(10);
                                    } catch (MQClientException e) {
                                        e.printStackTrace();
                                    }
                                }
                        
                                // 一旦生產者實例不再被使用則將其關閉,包括清理資源,關閉網絡連接等
                                producer.shutdown();
                        
                            }
                        }
                        復制2)實現事務的監聽接口

                        當發送半消息成功時,我們使用executeLocalTransaction方法來執行本地事務。它返回前一節中提到的三個事務狀態之一。checkLocalTranscation方法用于檢查本地事務狀態,并回應消息隊列的檢查請求。它也是返回前一節中提到的三個事務狀態之一。

                        public class TransactionListenerImpl implements TransactionListener {
                        
                          private AtomicInteger transactionIndex = new AtomicInteger(0);
                        
                          private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
                        
                          @Override
                          public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                              System.out.println("執行本地事務");
                              
                              System.out.println("消息內容 :" + msg.getKeys() +  new String(msg.getBody()));
                              int value = transactionIndex.getAndIncrement();
                              int status = value % 3;
                              localTrans.put(msg.getTransactionId(), status);
                              return LocalTransactionState.UNKNOW;
                          }
                        
                          @Override
                          public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                              Integer status = localTrans.get(msg.getTransactionId());
                              if (null != status) {
                                  switch (status) {
                                      case 0:
                                          return LocalTransactionState.UNKNOW;
                                      case 1:
                                          return LocalTransactionState.COMMIT_MESSAGE;
                                      case 2:
                                          return LocalTransactionState.ROLLBACK_MESSAGE;
                                  }
                              }
                              return LocalTransactionState.COMMIT_MESSAGE;
                          }
                        }
                        復制6.3 使用限制
                        1. 事務消息不支持延時消息和批量消息。
                        2. 為了避免單個消息被檢查太多次而導致半隊列消息累積,我們默認將單個消息的檢查次數限制為 15 次,但是用戶可以通過 Broker 配置文件的transactionCheckMax參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N =transactionCheckMax) 則 Broker 將丟棄此消息,并在默認情況下同時打印錯誤日志。用戶可以通過重寫AbstractTransactionCheckListener類來修改這個行為。
                        3. 事務消息將在 Broker 配置文件中的參數 transactionMsgTimeout 這樣的特定時間長度之后被檢查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先于transactionMsgTimeout參數。
                        4. 事務性消息可能不止一次被檢查或消費。
                        5. 提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、并且事務完整性得到保證,建議使用同步的雙重寫入機制。
                        6. 事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者。
                        總結

                        我們對于Rocketmq發送、消費消息的方式進行了全方面的解析,并給出了相應的案例,消息生產者和消費者可以進行簡單的抽象:

                        • 消息生產者步驟
                        1. 創建消息生產者producer,并制定生產者組名
                        2. 指定Nameserver地址
                        3. 啟動producer
                        4. 創建消息對象,指定主題Topic、Tag和消息體
                        5. 發送消息
                        6. 關閉生產者producer
                        • 消息消費者步驟
                        1. 創建消費者Consumer,制定消費者組名
                        2. 指定Nameserver地址
                        3. 訂閱主題Topic和Tag
                        4. 設置回調函數,處理消息
                        5. 啟動消費者consumer
                        代碼示例

                        本文示例讀者可以通過查看下面倉庫的中的rocketmq-simple項目:

                        • Github:https://github.com/jiuqiyuliang/SpringCloud-Learning
                        關聯標簽:
                        少妇各种各样BBBⅩXX