GitHub網址
2024/05/24更新
https://github.com/Hoxton019030/RabbitMQ_Practice/tree/prototype
Prototype分支是個MVP(Minimum Viable Product),使用了RabbitMq來接收前端傳來的一個訊息並打印出來,可以藉此對MessageQueue有個認識與回顧-
https://github.com/Hoxton019030/JobTalk
這個Job Talk則有比較詳細的MessageQueue內容
前言
今天是第二天,學習RabbitMQ。第一次接觸到Message Queue是在御諾的時候,當時有個同事做了一個這個東西,在Sprint的結尾展示,那時候就覺得MessageQueue這個東西也太酷了,但一直沒有機會(懶)去接觸。
什麼是Message Queue
Message Queue(訊息佇列、消息隊列),是一種用在應用程序之間傳遞消息的通訊是,Message Queue允許應用之間異步的發送和接收消息,並且不需要直接連接到對方
2025/1/10 補
現在對Message Queue的理解是這樣
- 削峰降流
假設再製作一個網購商城,從用戶下單到結算,中間會經歷很多過程,耗時非常長,此時可以將下單的行為放到MessageQueue中做處理,避免許多請求直接造成伺服器負擔,比如說可以設定1秒只處理5筆這樣子
- 避免請求丟失
分佈式系統中,不同模塊需要透過網路交互,如果一方的模塊網路突然出問題,那麼很多與之交互的模塊的請求送不到會發生許多問題,一樣以電商舉例,現在存貨已經扣了,要進到扣款流程,但扣款模塊壞了,導致存貨被扣,但支付卻還沒發生,並且支付模塊修好後,由於資料丟失,相關的扣款也無法繼續下去
- 服務解耦
比如說Java處理好的資料要給Python用,就可以讓Java Push 到Queue中,Python再去Consume這個消息就ok,達成服務解耦的目的
- 異步處理
沒啥特別好說的,就當一個大型異步處理來用
什麼是RabbitMQ
RabbitMQ是個基於AMQP(Advanced Message Queuing Protocal 高級消息隊列協議) ,用於應用程式之間通訊的中間層,Rabbit有四大核心
- 生產者:發送消息的應用程式
- 消費者:接受消息的應用程式
- 佇列:訊息在Message Queue中儲存的位置
- 交換機:訊息路由的一個組件,會依照我們的配置,把訊息分發給特定的佇列
AMQP也包含了四個核心組件
- 消息:包括消息頭、消息體、消息屬性
- 交換機:消息傳遞的中間件,將消息路由到一個或多個隊列中
- 佇列:用來儲存消息的資料結構
- 綁定:交換機和隊列的綁定
下載與安裝
以下適用於M1、M2
使用brew安裝
啟動rabbitMq
1
| brew services start rabbitmq
|
停止rabbitMq
1
| brew services stop rabbitmq
|
啟動後,可以在瀏覽器中輸入
http://localhost:15672/
即可訪問rabbitMq的畫面
帳號密碼都是guest
就可以看到rabbitMq的畫面了
常用指令
創建使用者
tom為使用者名稱,tom60229為使用者密碼
1
| rabbitmqctl add_user tom tom60229
|
賦予使用者權限
將tom設成管理員
1
| rabbitmqctl set_user_tags tom administrator
|
實際使用
範例Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
//服務地址
connectionFactory.setHost("127.0.0.1");
//帳號
connectionFactory.setUsername("guest");
//密碼
connectionFactory.setPassword("guest");
//端口號
connectionFactory.setPort(5672);
//創建連接
Connection connection = connectionFactory.newConnection();
//創建channel
Channel channel = connection.createChannel();
/**
* 創建交換機
* 1. 交換機名稱
* 2. 交換機類型 direct, topic, fanout, headers
* 3. 指定交換機是否需要持久化,如果設置為True,那麼交換機的元數據要持久化
* 4. 指定交換機在沒有隊列綁定時,是否刪除?
* 5. Map<String,Object>類型,用來指定我們交換機其他的一些機構話參數,我們在這裡直接設置成Null
*/
String exchangeName = "xc_exchange_name";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
/**
* 生產一個隊列
* 1. 隊列名稱
* 2. 隊列是否需要持久化,但是要注意,這裡的持久化只是隊列名稱等這些元數據的持久化,不是隊列中消息的持久化
* 3. 表示隊列是不是私有,如果是私有的,只有唱見他的應用程序才能消費消息
* 4. 隊列在沒有消費者訂閱的情況下,是否自動刪除
* 5. 隊列的一些結構化訊息,比如說聲明死信隊列,磁盤隊列會用到
*/
String queueName = "xc_queue_name";
channel.queueDeclare(queueName, true, false, false, null);
/**
* 將交換機與隊列綁定
* 1. 隊列名稱
* 2. 交換機名稱
* 3. 路由鍵,在我們直連模式下,可以為我們的隊列名稱
*/
channel.queueBind(queueName, exchangeName, queueName);
// 要發送的消息
String message = "Hello rabbitMQ";
/**
* 發送消息
* 1. 發送到哪個交換機
* 2. 隊列名稱
* 3. 其他參數訊息
* 4. 發送消息的消息體
*/
channel.basicPublish(exchangeName, queueName, null, message.getBytes());
channel.close();
connection.close();
}
}
|
執行完後,訪問RabbitMQ的頁面,會發現Queue中確實存在一則Message
這時在創建另一個Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| package com.example.springbootinaction.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
String exchangeName = "xc_exchange_name";
String queueName = "xc_queue_name";
ConnectionFactory connectionFactory = new ConnectionFactory();
//服務地址
connectionFactory.setHost("127.0.0.1");
//帳號
connectionFactory.setUsername("guest");
//密碼
connectionFactory.setPassword("guest");
//端口號
connectionFactory.setPort(5672);
//創建連接
Connection connection = connectionFactory.newConnection();
//創建channel
Channel channel = connection.createChannel();
//接收消息的回調函數
DeliverCallback deliverCallback = (consumerTage, message) -> {
System.out.println("接收到消息" + new String(message.getBody()));
};
//取消消息的回調函數
CancelCallback cancelCallback = consumerTage ->{
System.out.println("消息被中斷");
};
/**
* 消費消息
* 1. 消費哪個隊列
* 2. 消費成功之後是否需要自動應答,true:自動應答
* 3. 接受消息的回調函數
*/
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
|
確實有收到消息,重新整理rabbitMQ頁面後
會發現已經沒有Messege在等待了
交換機介紹(Exchange)
Direct
路由鍵與隊列名「完全匹配」交換機,通過與RoutingKey路由鍵將交換機與隊列進行綁定,消息被發送到exchange時,需要根據消息的RountingKey,來進行匹配,只將消息發送到完全匹配到此RountingKey的隊列,如果想要模糊匹配的話要用Topic。
比如一個隊列綁定到交換機要求路由鍵為"key",則只轉發RountingKey標記為"key"的消息,不會轉發"key1"、“key." …等等,他是完全匹配、單撥的形式
以下為範例Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| public class ProducerDirect {
public static void main(String[] args) throws IOException, TimeoutException {
String exchangeName = "xc_exchange_name";
String queueName_1 = "xc_queue_name_1";
String queueName_2 = "xc_queue_name_2";
String queueName_3 = "xc_queue_name_3";
String queueName_4 = "xc_queue_name_4";
String key_1 = "key_1";
String key_3 = "key_3";
String key_4 = "key_4";
ConnectionFactory connectionFactory = new ConnectionFactory();
//服務地址
connectionFactory.setHost("127.0.0.1");
//帳號
connectionFactory.setUsername("guest");
//密碼
connectionFactory.setPassword("guest");
//端口號
connectionFactory.setPort(5672);
//創建連接
Connection connection = connectionFactory.newConnection();
//創建channel
Channel channel = connection.createChannel();
/**
* 創建交換機
* 1. 交換機名稱
* 2. 交換機類型 direct, topic, fanout, headers
* 3. 指定交換機是否需要持久化,如果設置為True,那麼交換機的元數據要持久化
* 4. 指定交換機在沒有隊列綁定時,是否刪除?
* 5. Map<String,Object>類型,用來指定我們交換機其他的一些機構話參數,我們在這裡直接設置成Null
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
/**
* 生產一個隊列
* 1. 隊列名稱
* 2. 隊列是否需要持久化,但是要注意,這裡的持久化只是隊列名稱等這些元數據的持久化,不是隊列中消息的持久化
* 3. 表示隊列是不是私有,如果是私有的,只有唱見他的應用程序才能消費消息
* 4. 隊列在沒有消費者訂閱的情況下,是否自動刪除
* 5. 隊列的一些結構化訊息,比如說聲明死信隊列,磁盤隊列會用到
*/
channel.queueDeclare(queueName_1, true, false, false, null);
channel.queueDeclare(queueName_2, true, false, false, null);
channel.queueDeclare(queueName_3, true, false, false, null);
channel.queueDeclare(queueName_4, true, false, false, null);
/**
* 將交換機與隊列綁定
* 1. 隊列名稱
* 2. 交換機名稱
* 3. 路由鍵,在我們直連模式下,可以為我們的隊列名稱
*/
channel.queueBind(queueName_1, exchangeName, key_1);
channel.queueBind(queueName_2, exchangeName, key_1);
channel.queueBind(queueName_3, exchangeName, key_3);
channel.queueBind(queueName_4, exchangeName, key_4);
// 要發送的消息
String message = "Hello rabbitMQ";
/**
* 發送消息
* 1. 發送到哪個交換機
* 2. 隊列名稱
* 3. 其他參數訊息
* 4. 發送消息的消息體
*/
channel.basicPublish(exchangeName, key_1, null, "key_1 message".getBytes());
channel.basicPublish(exchangeName, key_3, null, "key_3 message".getBytes());
channel.basicPublish(exchangeName, key_4, null, "key_4 message".getBytes());
channel.close();
connection.close();
}
}
|
執行完後長這樣
接著去掉用Consumer去查看Messege
改成呼叫queueName_2,接收到的仍是key_1的訊息,因為我們在生產者那邊,將queueName2與key_1進行綁定
Fanout
扇出類型的exchange,會將消息分發給所有綁定了此exchange的隊列,此時RountingKey參數無效,因此不管給哪個Queue送消息、屬於同一個exchange底下的都會共享
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
| package com.example.springbootinaction.rabbitmq.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerFanout {
public static void main(String[] args) throws IOException, TimeoutException {
String exchangeName = "xc_exchange_fanout_name";
String queueName_1 = "xc_queue_name_fanout_1";
String queueName_2 = "xc_queue_name_fanout_2";
String queueName_3 = "xc_queue_name_fanout_3";
String queueName_4 = "xc_queue_name_fanout_4";
String key_1 = "key_1";
String key_3 = "key_3";
String key_4 = "key_4";
ConnectionFactory connectionFactory = new ConnectionFactory();
//服務地址
connectionFactory.setHost("127.0.0.1");
//帳號
connectionFactory.setUsername("guest");
//密碼
connectionFactory.setPassword("guest");
//端口號
connectionFactory.setPort(5672);
//創建連接
Connection connection = connectionFactory.newConnection();
//創建channel
Channel channel = connection.createChannel();
/**
* 創建交換機
* 1. 交換機名稱
* 2. 交換機類型 direct, topic, fanout, headers
* 3. 指定交換機是否需要持久化,如果設置為True,那麼交換機的元數據要持久化
* 4. 指定交換機在沒有隊列綁定時,是否刪除?
* 5. Map<String,Object>類型,用來指定我們交換機其他的一些機構話參數,我們在這裡直接設置成Null
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, null);
/**
* 生產一個隊列
* 1. 隊列名稱
* 2. 隊列是否需要持久化,但是要注意,這裡的持久化只是隊列名稱等這些元數據的持久化,不是隊列中消息的持久化
* 3. 表示隊列是不是私有,如果是私有的,只有唱見他的應用程序才能消費消息
* 4. 隊列在沒有消費者訂閱的情況下,是否自動刪除
* 5. 隊列的一些結構化訊息,比如說聲明死信隊列,磁盤隊列會用到
*/
channel.queueDeclare(queueName_1, true, false, false, null);
channel.queueDeclare(queueName_2, true, false, false, null);
channel.queueDeclare(queueName_3, true, false, false, null);
channel.queueDeclare(queueName_4, true, false, false, null);
/**
* 將交換機與隊列綁定
* 1. 隊列名稱
* 2. 交換機名稱
* 3. 路由鍵,在我們直連模式下,可以為我們的隊列名稱
*/
channel.queueBind(queueName_1, exchangeName, key_1);
channel.queueBind(queueName_2, exchangeName, key_1);
channel.queueBind(queueName_3, exchangeName, key_3);
channel.queueBind(queueName_4, exchangeName, key_4);
// 要發送的消息
String message = "Hello rabbitMQ";
/**
* 發送消息
* 1. 發送到哪個交換機
* 2. 隊列名稱
* 3. 其他參數訊息
* 4. 發送消息的消息體
*/
channel.basicPublish(exchangeName, key_1, null, "key_1 fanout message".getBytes());
//註解掉,其他的,來看看會變什麼樣
// channel.basicPublish(exchangeName, key_3, null, "key_3 message".getBytes());
// channel.basicPublish(exchangeName, key_4, null, "key_4 message".getBytes());
channel.close();
connection.close();
}
}
|
執行後會出現一個新的交換機
在Queue中多出了四個Queue
Warning
如果你的Queue有不該跳起來的卻跳了起來,很可能是你不小心綁定到了同一個exchange。如果你的xc_queue_name_fanout,一直沒跳起來,很可能是你的consumer持續在背景執行,一直消費你的Queue,記得關掉
接著啟動消費者,來看看他拿到什麼
Topic
主題類型交換機,此種交換機與Direct類似,也是需要透過routingKey路由鍵進行匹配分發,區別在Topic可以模糊匹配
- Topic中,將RoutingKey通過".“來分為多個部分
- “*":代表一部分
- “#":代表0個或多個部分,如果綁定的路由為#時,則接受所有消息,因為路由鍵所有都匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| public class ProducerTopic {
public static void main(String[] args) throws IOException, TimeoutException {
String exchangeName = "xc_exchange_topic _name";
String queueName_1 = "xc_queue_topic_name_1";
String queueName_2 = "xc_queue_topic_name_2";
String queueName_3 = "xc_queue_topic_name_3";
String queueName_4 = "xc_queue_topic_name_4";
//key的名稱綁定有變!!!
String key_1 = "key_1.key2.key3.*";
String key_2 = "key_1.#";
String key_3 = "*.key_2.*.key_4";
String key_4 = "#.key_3.key_4";
ConnectionFactory connectionFactory = new ConnectionFactory();
//服務地址
connectionFactory.setHost("127.0.0.1");
//帳號
connectionFactory.setUsername("guest");
//密碼
connectionFactory.setPassword("guest");
//端口號
connectionFactory.setPort(5672);
//創建連接
Connection connection = connectionFactory.newConnection();
//創建channel
Channel channel = connection.createChannel();
/**
* 創建交換機
* 1. 交換機名稱
* 2. 交換機類型 direct, topic, fanout, headers
* 3. 指定交換機是否需要持久化,如果設置為True,那麼交換機的元數據要持久化
* 4. 指定交換機在沒有隊列綁定時,是否刪除?
* 5. Map<String,Object>類型,用來指定我們交換機其他的一些機構話參數,我們在這裡直接設置成Null
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, null);
/**
* 生產一個隊列
* 1. 隊列名稱
* 2. 隊列是否需要持久化,但是要注意,這裡的持久化只是隊列名稱等這些元數據的持久化,不是隊列中消息的持久化
* 3. 表示隊列是不是私有,如果是私有的,只有唱見他的應用程序才能消費消息
* 4. 隊列在沒有消費者訂閱的情況下,是否自動刪除
* 5. 隊列的一些結構化訊息,比如說聲明死信隊列,磁盤隊列會用到
*/
channel.queueDeclare(queueName_1, true, false, false, null);
channel.queueDeclare(queueName_2, true, false, false, null);
channel.queueDeclare(queueName_3, true, false, false, null);
channel.queueDeclare(queueName_4, true, false, false, null);
/**
* 將交換機與隊列綁定
* 1. 隊列名稱
* 2. 交換機名稱
* 3. 路由鍵,在我們直連模式下,可以為我們的隊列名稱
*/
channel.queueBind(queueName_1, exchangeName, key_1);
channel.queueBind(queueName_2, exchangeName, key_2);
channel.queueBind(queueName_3, exchangeName, key_3);
channel.queueBind(queueName_4, exchangeName, key_4);
// 要發送的消息
String message = "Hello rabbitMQ";
/**
* 發送消息
* 1. 發送到哪個交換機
* 2. 隊列名稱
* 3. 其他參數訊息
* 4. 發送消息的消息體
*/
channel.basicPublish(exchangeName, "key_1", null, "key 1 topic message".getBytes());
channel.close();
connection.close();
}
}
|
執行完後
幾乎跟Direct一樣,所以基本上用不到,有需要再來看
在SpringBoot中使用
加入依賴
1
2
3
4
5
6
7
8
9
| <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
|
加入配置文件
注意,這邊記得要自己創建一個使用者,不要用guest,有可能會出錯== 不要問我為什麼會知道
1
2
3
4
5
| spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=hoxton
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=test1
|
生成關於exchange、queue的,再將兩者綁在一起Bean
2024/5/24 回頭來看,不知道為啥下面的都沒作用了,改用Configuration篇章的Code 就可以了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| @Configuration
public class RabbitMQConfig {
private final static String EXCHANGE_NAME = "my_boot_fanout_exchange";
private final static String QUEUE_NAME = "my_boot_fanout_queue1";
/**
* 聲明交換機
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME, true, false);
}
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true, false, false);
}
@Bean
public Binding queueBinding(FanoutExchange fanoutExchange, Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
|
Properties
1
2
3
| rabbitmq.queue.name :hoxton_queue_name
rabbitmq.exchange.name:hoxton_exchange_name
rabbitmq.routekey.name:hoxton_queue_name
|
Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| package com.example.rabbitmq.configuration;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Configuration
public class RabbitMessageQueueConfiguration {
private ConnectionFactory connectionFactory;
@Value("${rabbitmq.routekey.name}")
String routeKey;
@Value("${rabbitmq.exchange.name}")
String exchangeName;
@Value("rabbitmq.queue.name")
String queueName;
@Bean
public ConnectionFactory connectionFactory() {
return connectionFactory;
}
@PostConstruct
public void setupRabbitMQ() throws IOException, TimeoutException {
// 使用 connectionFactory bean 创建连接
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("hoxton");
connectionFactory.setPassword("123456");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName
, routeKey);
String message = "Rabbit Mq啟動成功";
channel.basicPublish(exchangeName, queueName, null, message.getBytes());
channel.close();
connection.close();
}
}
|
在配置
個
Controller
我們這邊用RestAPI的方式發送請求過去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| package com.example.rabbitmq.controller;
import com.example.rabbitmq.producer.RabbitMQProducer;
import com.example.rabbitmq.service.TestService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("rabbit")
@RequiredArgsConstructor
public class TestController {
private final TestService testService;
@GetMapping("/")
public ResponseEntity<String> sendMessage() {
testService.sendMessageToRabbitMq("我是來自前端的訊息");
return ResponseEntity.ok().body("完成");
}
}
|
Service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| package com.example.rabbitmq.service;
import com.example.rabbitmq.producer.RabbitMQProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class TestService {
private final RabbitMQProducer rabbitMQProducer;
public void sendMessageToRabbitMq(String message){
log.info("發送消息到Rabbit Mq:{}",message);
rabbitMQProducer.sendMessage(message);
}
}
|
Producer
其中的rabbitTemplate是SpringBoot用來與rabbitMq交互的一個類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| package com.example.rabbitmq.producer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class RabbitMQProducer {
private final RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.routekey.name}")
String routeKey;
@Value("${rabbitmq.exchange.name}")
String exchangeName;
@Value("${rabbitmq.queue.name}")
String queueName;
public void sendMessage(String message) {
log.info("呼叫到MessageQueueProducer");
rabbitTemplate.convertAndSend(exchangeName, routeKey, message);
}
}
|
接著再寫一個
Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| package com.example.rabbitmq.comsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitConsumer {
/**
* 監聽隊列:當隊列中有消息,則監聽器工作,處理接收到的訊息
* @param message
*/
@RabbitListener(queues = "${rabbitmq.queue.name}")
public void process(Message message) {
byte[] body = message.getBody();
log.info("呼叫到Consumer");
log.info("接收到的消息" + new String(body));
}
}
|
這個方法會去監聽指定Queue的Messege,並且做相應的處理
目前這邊的方式就是把收到的訊息Print出來,並把那個Messege清空,請特別注意,這邊是由rabbit自動幫你ack(acknowledge)的,如果不想要的話也可以去設定把這部分關掉,這邊就不展示了。
範例圖
參考資料
1小时学会RabbitMQ!快速掌握RabbitMQ的使用与原理实战
编程不良人】MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程,已完结!