Featured image of post RabbitMQ學習筆記

RabbitMQ學習筆記

努力到無能為力,拼搏到感動至極

前言

今天是第二天,學習RabbitMQ。第一次接觸到Message Queue是在御諾的時候,當時有個同事做了一個這個東西,在Sprint的結尾展示,那時候就覺得MessageQueue這個東西也太酷了,但一直沒有機會(懶)去接觸。

line-up

什麼是Message Queue

Memphis.dev - What is a message queue?

Message Queue(訊息佇列、消息隊列),是一種用在應用程序之間傳遞消息的通訊是,Message Queue允許應用之間異步的發送和接收消息,並且不需要直接連接到對方

什麼是RabbitMQ

RabbitMQ是個基於AMQP(Advanced Message Queuing Protocal 高級消息隊列協議) ,用於應用程式之間通訊的中間層,Rabbit有四大核心

  1. 生產者:發送消息的應用程式
  2. 消費者:接受消息的應用程式
  3. 佇列:訊息在Message Queue中儲存的位置
  4. 交換機:訊息路由的一個組件,會依照我們的配置,把訊息分發給特定的佇列

AMQP也包含了四個核心組件

  1. 消息:包括消息頭、消息體、消息屬性
  2. 交換機:消息傳遞的中間件,將消息路由到一個或多個隊列中
  3. 佇列:用來儲存消息的資料結構
  4. 綁定:交換機和隊列的綁定

下載與安裝

以下適用於M1、M2

使用brew安裝

1
brew install rabbitmq

啟動rabbitMq

1
brew services start rabbitmq

image-20230908142910340

停止rabbitMq

1
brew services stop rabbitmq

啟動後,可以在瀏覽器中輸入

http://localhost:15672/

即可訪問rabbitMq的畫面

image-20230908144549913

帳號密碼都是guest

image-20230908144612130

就可以看到rabbitMq的畫面了

常用指令

創建使用者

tom為使用者名稱,tom60229為使用者密碼

1
rabbitmqctl add_user tom tom60229

賦予使用者權限

將tom設成管理員

1
rabbitmqctl set_user_tags tom administrator

實際使用

範例Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //服務地址
        connectionFactory.setHost("127.0.0.1");
        //帳號
        connectionFactory.setUsername("guest");
        //密碼
        connectionFactory.setPassword("guest");
        //端口號
        connectionFactory.setPort(5672);
        //創建連接
        Connection connection = connectionFactory.newConnection();
        //創建channel
        Channel channel = connection.createChannel();

        /**
         * 創建交換機
         * 1. 交換機名稱
         * 2. 交換機類型 direct, topic, fanout, headers
         * 3. 指定交換機是否需要持久化,如果設置為True,那麼交換機的元數據要持久化
         * 4. 指定交換機在沒有隊列綁定時,是否刪除?
         * 5. Map<String,Object>類型,用來指定我們交換機其他的一些機構話參數,我們在這裡直接設置成Null
         */

        String exchangeName = "xc_exchange_name";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);

        /**
         * 生產一個隊列
         * 1. 隊列名稱
         * 2. 隊列是否需要持久化,但是要注意,這裡的持久化只是隊列名稱等這些元數據的持久化,不是隊列中消息的持久化
         * 3. 表示隊列是不是私有,如果是私有的,只有唱見他的應用程序才能消費消息
         * 4. 隊列在沒有消費者訂閱的情況下,是否自動刪除
         * 5. 隊列的一些結構化訊息,比如說聲明死信隊列,磁盤隊列會用到
         */
        String queueName = "xc_queue_name";
        channel.queueDeclare(queueName, true, false, false, null);

        /**
         * 將交換機與隊列綁定
         * 1. 隊列名稱
         * 2. 交換機名稱
         * 3. 路由鍵,在我們直連模式下,可以為我們的隊列名稱
         */
        channel.queueBind(queueName, exchangeName, queueName);
        // 要發送的消息
        String message = "Hello rabbitMQ";
        /**
         * 發送消息
         * 1. 發送到哪個交換機
         * 2. 隊列名稱
         * 3. 其他參數訊息
         * 4. 發送消息的消息體
         */
        channel.basicPublish(exchangeName, queueName, null, message.getBytes());
        channel.close();
        connection.close();
    }
}

執行完後,訪問RabbitMQ的頁面,會發現Queue中確實存在一則Message

image-20230908160020482

這時在創建另一個Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.example.springbootinaction.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName = "xc_exchange_name";
        String queueName = "xc_queue_name";
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //服務地址
        connectionFactory.setHost("127.0.0.1");
        //帳號
        connectionFactory.setUsername("guest");
        //密碼
        connectionFactory.setPassword("guest");
        //端口號
        connectionFactory.setPort(5672);
        //創建連接
        Connection connection = connectionFactory.newConnection();
        //創建channel
        Channel channel = connection.createChannel();
        //接收消息的回調函數
        DeliverCallback deliverCallback = (consumerTage, message) -> {
            System.out.println("接收到消息" + new String(message.getBody()));
        };
        //取消消息的回調函數
        CancelCallback cancelCallback = consumerTage ->{
            System.out.println("消息被中斷");
        };

        /**
         * 消費消息
         * 1. 消費哪個隊列
         * 2. 消費成功之後是否需要自動應答,true:自動應答
         * 3. 接受消息的回調函數
         */
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);


    }
}

image-20230908160125807

確實有收到消息,重新整理rabbitMQ頁面後

image-20230908160203126

會發現已經沒有Messege在等待了

交換機介紹

Direct

路由鍵與隊列名「完全匹配」交換機,通過與RoutingKey路由鍵將交換機與隊列進行綁定,消息被發送到exchange時,需要根據消息的RountingKey,來進行匹配,只將消息發送到完全匹配到此RountingKey的隊列,如果想要模糊匹配的話要用Topic。

比如一個隊列綁定到交換機要求路由鍵為"key",則只轉發RountingKey標記為"key"的消息,不會轉發"key1"、“key." …等等,他是完全匹配、單撥的形式

image-20230908160823867

以下為範例Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class ProducerDirect {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName = "xc_exchange_name";
        String queueName_1 = "xc_queue_name_1";
        String queueName_2 = "xc_queue_name_2";
        String queueName_3 = "xc_queue_name_3";
        String queueName_4 = "xc_queue_name_4";

        String key_1 = "key_1";
        String key_3 = "key_3";
        String key_4 = "key_4";

        ConnectionFactory connectionFactory = new ConnectionFactory();
        //服務地址
        connectionFactory.setHost("127.0.0.1");
        //帳號
        connectionFactory.setUsername("guest");
        //密碼
        connectionFactory.setPassword("guest");
        //端口號
        connectionFactory.setPort(5672);
        //創建連接
        Connection connection = connectionFactory.newConnection();
        //創建channel
        Channel channel = connection.createChannel();

        /**
         * 創建交換機
         * 1. 交換機名稱
         * 2. 交換機類型 direct, topic, fanout, headers
         * 3. 指定交換機是否需要持久化,如果設置為True,那麼交換機的元數據要持久化
         * 4. 指定交換機在沒有隊列綁定時,是否刪除?
         * 5. Map<String,Object>類型,用來指定我們交換機其他的一些機構話參數,我們在這裡直接設置成Null
         */

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);

        /**
         * 生產一個隊列
         * 1. 隊列名稱
         * 2. 隊列是否需要持久化,但是要注意,這裡的持久化只是隊列名稱等這些元數據的持久化,不是隊列中消息的持久化
         * 3. 表示隊列是不是私有,如果是私有的,只有唱見他的應用程序才能消費消息
         * 4. 隊列在沒有消費者訂閱的情況下,是否自動刪除
         * 5. 隊列的一些結構化訊息,比如說聲明死信隊列,磁盤隊列會用到
         */
        channel.queueDeclare(queueName_1, true, false, false, null);
        channel.queueDeclare(queueName_2, true, false, false, null);
        channel.queueDeclare(queueName_3, true, false, false, null);
        channel.queueDeclare(queueName_4, true, false, false, null);

        /**
         * 將交換機與隊列綁定
         * 1. 隊列名稱
         * 2. 交換機名稱
         * 3. 路由鍵,在我們直連模式下,可以為我們的隊列名稱
         */
        channel.queueBind(queueName_1, exchangeName, key_1);
        channel.queueBind(queueName_2, exchangeName, key_1);
        channel.queueBind(queueName_3, exchangeName, key_3);
        channel.queueBind(queueName_4, exchangeName, key_4);

        // 要發送的消息
        String message = "Hello rabbitMQ";
        /**
         * 發送消息
         * 1. 發送到哪個交換機
         * 2. 隊列名稱
         * 3. 其他參數訊息
         * 4. 發送消息的消息體
         */
        channel.basicPublish(exchangeName, key_1, null, "key_1 message".getBytes());
        channel.basicPublish(exchangeName, key_3, null, "key_3 message".getBytes());
        channel.basicPublish(exchangeName, key_4, null, "key_4 message".getBytes());
        channel.close();
        connection.close();
    }
}

執行完後長這樣

image-20230908162003791

接著去掉用Consumer去查看Messege

image-20230908162120977

image-20230908162146641

改成呼叫queueName_2,接收到的仍是key_1的訊息,因為我們在生產者那邊,將queueName2與key_1進行綁定

image-20230908162254880

Fanout

image-20230908173139493

扇出類型的exchange,會將消息分發給所有綁定了此exchange的隊列,此時RountingKey參數無效,因此不管給哪個Queue送消息、屬於同一個exchange底下的都會共享

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.example.springbootinaction.rabbitmq.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerFanout {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName = "xc_exchange_fanout_name";
        String queueName_1 = "xc_queue_name_fanout_1";
        String queueName_2 = "xc_queue_name_fanout_2";
        String queueName_3 = "xc_queue_name_fanout_3";
        String queueName_4 = "xc_queue_name_fanout_4";

        String key_1 = "key_1";
        String key_3 = "key_3";
        String key_4 = "key_4";

        ConnectionFactory connectionFactory = new ConnectionFactory();
        //服務地址
        connectionFactory.setHost("127.0.0.1");
        //帳號
        connectionFactory.setUsername("guest");
        //密碼
        connectionFactory.setPassword("guest");
        //端口號
        connectionFactory.setPort(5672);
        //創建連接
        Connection connection = connectionFactory.newConnection();
        //創建channel
        Channel channel = connection.createChannel();

        /**
         * 創建交換機
         * 1. 交換機名稱
         * 2. 交換機類型 direct, topic, fanout, headers
         * 3. 指定交換機是否需要持久化,如果設置為True,那麼交換機的元數據要持久化
         * 4. 指定交換機在沒有隊列綁定時,是否刪除?
         * 5. Map<String,Object>類型,用來指定我們交換機其他的一些機構話參數,我們在這裡直接設置成Null
         */

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, null);

        /**
         * 生產一個隊列
         * 1. 隊列名稱
         * 2. 隊列是否需要持久化,但是要注意,這裡的持久化只是隊列名稱等這些元數據的持久化,不是隊列中消息的持久化
         * 3. 表示隊列是不是私有,如果是私有的,只有唱見他的應用程序才能消費消息
         * 4. 隊列在沒有消費者訂閱的情況下,是否自動刪除
         * 5. 隊列的一些結構化訊息,比如說聲明死信隊列,磁盤隊列會用到
         */
        channel.queueDeclare(queueName_1, true, false, false, null);
        channel.queueDeclare(queueName_2, true, false, false, null);
        channel.queueDeclare(queueName_3, true, false, false, null);
        channel.queueDeclare(queueName_4, true, false, false, null);

        /**
         * 將交換機與隊列綁定
         * 1. 隊列名稱
         * 2. 交換機名稱
         * 3. 路由鍵,在我們直連模式下,可以為我們的隊列名稱
         */
        channel.queueBind(queueName_1, exchangeName, key_1);
        channel.queueBind(queueName_2, exchangeName, key_1);
        channel.queueBind(queueName_3, exchangeName, key_3);
        channel.queueBind(queueName_4, exchangeName, key_4);

        // 要發送的消息
        String message = "Hello rabbitMQ";
        /**
         * 發送消息
         * 1. 發送到哪個交換機
         * 2. 隊列名稱
         * 3. 其他參數訊息
         * 4. 發送消息的消息體
         */
        channel.basicPublish(exchangeName, key_1, null, "key_1 fanout message".getBytes());
        //註解掉,其他的,來看看會變什麼樣
//        channel.basicPublish(exchangeName, key_3, null, "key_3 message".getBytes());
//        channel.basicPublish(exchangeName, key_4, null, "key_4 message".getBytes());
        channel.close();
        connection.close();
    }
}

執行後會出現一個新的交換機

image-20230908164448373

在Queue中多出了四個Queue

image-20230908172557300

Warning

如果你的Queue有不該跳起來的卻跳了起來,很可能是你不小心綁定到了同一個exchange。如果你的xc_queue_name_fanout,一直沒跳起來,很可能是你的consumer持續在背景執行,一直消費你的Queue,記得關掉

joker-dont-say-i-didnt-warn-you-gif

接著啟動消費者,來看看他拿到什麼

Topic

image-20230908173439226

主題類型交換機,此種交換機與Direct類似,也是需要透過routingKey路由鍵進行匹配分發,區別在Topic可以模糊匹配

  1. Topic中,將RoutingKey通過".“來分為多個部分
  2. “*":代表一部分
  3. “#":代表0個或多個部分,如果綁定的路由為#時,則接受所有消息,因為路由鍵所有都匹配
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class ProducerTopic {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName = "xc_exchange_topic _name";
        String queueName_1 = "xc_queue_topic_name_1";
        String queueName_2 = "xc_queue_topic_name_2";
        String queueName_3 = "xc_queue_topic_name_3";
        String queueName_4 = "xc_queue_topic_name_4";
				//key的名稱綁定有變!!!
        String key_1 = "key_1.key2.key3.*";
        String key_2 = "key_1.#";
        String key_3 = "*.key_2.*.key_4";
        String key_4 = "#.key_3.key_4";

        ConnectionFactory connectionFactory = new ConnectionFactory();
        //服務地址
        connectionFactory.setHost("127.0.0.1");
        //帳號
        connectionFactory.setUsername("guest");
        //密碼
        connectionFactory.setPassword("guest");
        //端口號
        connectionFactory.setPort(5672);
        //創建連接
        Connection connection = connectionFactory.newConnection();
        //創建channel
        Channel channel = connection.createChannel();

        /**
         * 創建交換機
         * 1. 交換機名稱
         * 2. 交換機類型 direct, topic, fanout, headers
         * 3. 指定交換機是否需要持久化,如果設置為True,那麼交換機的元數據要持久化
         * 4. 指定交換機在沒有隊列綁定時,是否刪除?
         * 5. Map<String,Object>類型,用來指定我們交換機其他的一些機構話參數,我們在這裡直接設置成Null
         */

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, null);

        /**
         * 生產一個隊列
         * 1. 隊列名稱
         * 2. 隊列是否需要持久化,但是要注意,這裡的持久化只是隊列名稱等這些元數據的持久化,不是隊列中消息的持久化
         * 3. 表示隊列是不是私有,如果是私有的,只有唱見他的應用程序才能消費消息
         * 4. 隊列在沒有消費者訂閱的情況下,是否自動刪除
         * 5. 隊列的一些結構化訊息,比如說聲明死信隊列,磁盤隊列會用到
         */
        channel.queueDeclare(queueName_1, true, false, false, null);
        channel.queueDeclare(queueName_2, true, false, false, null);
        channel.queueDeclare(queueName_3, true, false, false, null);
        channel.queueDeclare(queueName_4, true, false, false, null);

        /**
         * 將交換機與隊列綁定
         * 1. 隊列名稱
         * 2. 交換機名稱
         * 3. 路由鍵,在我們直連模式下,可以為我們的隊列名稱
         */
        channel.queueBind(queueName_1, exchangeName, key_1);
        channel.queueBind(queueName_2, exchangeName, key_2);
        channel.queueBind(queueName_3, exchangeName, key_3);
        channel.queueBind(queueName_4, exchangeName, key_4);

        // 要發送的消息
        String message = "Hello rabbitMQ";
        /**
         * 發送消息
         * 1. 發送到哪個交換機
         * 2. 隊列名稱
         * 3. 其他參數訊息
         * 4. 發送消息的消息體
         */
        channel.basicPublish(exchangeName, "key_1", null, "key 1 topic message".getBytes());
        channel.close();
        connection.close();
    }
}

執行完後

image-20230908173950517

Headers

幾乎跟Direct一樣,所以基本上用不到,有需要再來看

在SpringBoot中使用

加入依賴

1
2
3
4
5
6
7
8
9
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.16.0</version>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

加入配置文件

注意,這邊記得要自己創建一個使用者,不要用guest,有可能會出錯== 不要問我為什麼會知道

image-20230909111727803

1
2
3
4
5
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=hoxton
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=test1

生成關於exchange、queue的,再將兩者綁在一起Bean

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
public class RabbitMQConfig {
    private final static String EXCHANGE_NAME = "my_boot_fanout_exchange";
    private final static String QUEUE_NAME = "my_boot_fanout_queue1";

    /**
     * 聲明交換機
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME, true, false);

    } 

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true, false, false);
    }

    @Bean  
    public Binding queueBinding(FanoutExchange fanoutExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

在配置個

Producer

我們這邊用RestAPI的方式發送請求過去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.example.springbootinaction.controller;

import com.example.springbootinaction.service.RabbitMQProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("rabbit")
@RequiredArgsConstructor
public class MessageController {

    private final RabbitMQProducer rabbitMQProducer;

    @GetMapping()
    public ResponseEntity<String> senedMessage() {
        rabbitMQProducer.sendMessage("我是來自前端的訊息");
        return ResponseEntity.ok().body("完成");
    }
}

其中的rabbitTemplate是SpringBoot用來與rabbitMq交互的一個類

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package com.example.springbootinaction.service;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import static com.example.springbootinaction.config.RabbitMQConfig.EXCHANGE_NAME;

@Service
@RequiredArgsConstructor
public class RabbitMQProducer {

    private final RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        System.out.println("送出訊息");
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
    }
}

接著再寫一個

Consumer

Listener

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.example.springbootinaction.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import static com.example.springbootinaction.config.RabbitMQConfig.QUEUE_NAME;

@Component
public class ConsumerListener {

    /**
     * 監聽隊列:當隊列中有消息,則監聽器工作,處理接收到的訊息
     * @param message
     */
    @RabbitListener(queues = QUEUE_NAME)
    public void process(Message message) {
        byte[] body = message.getBody();
        System.out.println("接收到的消息" + new String(body));

    }
}

這個方法會去監聽指定Queue的Messege,並且做相應的處理

目前這邊的方式就是把收到的訊息Print出來,並把那個Messege清空,請特別注意,這邊是由rabbit自動幫你ack(acknowledge)的,如果不想要的話也可以去設定把這部分關掉,這邊就不展示了。

範例圖

iShot_2023-09-09_22.28.49

參考資料

1小时学会RabbitMQ!快速掌握RabbitMQ的使用与原理实战

编程不良人】MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程,已完结!

Licensed under CC BY-NC-SA 4.0