本文最后更新于:9 天前

破冰
思维碰撞
基础知识介绍
2025 年 3 月 17 日
RabbitMQ详解,用心看完这一篇就够了【重点】-CSDN博客
2024 年 7 月 8 日
推荐阅读:
RabbitMQ 消息队列_rabbitmq 消息队列-CSDN 博客
消息队列——RabbitMq_rabbitmq 发送消息-CSDN 博客
吐血总结——消息队列之 RocketMQ 知识梳理_12856610 的技术博客_51CTO 博客
RocketMQ 分布式消息队列(最详细)_rocketmq 如何动态增加 topic 的队列数-CSDN 博客
RocketMQ 的一万字全面总结,带你快速入门消息队列_mq 消息队列-CSDN 博客
小试牛刀(实操)
RabbitMQ
下载安装
🍝 RabbitMQ 官网:Documentation: Table of Contents — RabbitMQ
Windows10 上 RabbitMQ 安装和启动详细步骤 - 行业资讯 - 电子产品设计开发与电子技术学习交流! (52dianzi.com)
🍜 RabbitMQ 下载(如下图所示):
下载地址:Installing on Windows | RabbitMQ
因为 RabbitMQ 服务端是使用并发式语言 Erlang 编写的,安装 Rabbit MQ 的前提是安装 Erlang

上个版本是 3.12.7 现在都 4.1.0 了。

执行安装失败,本机 Erlang 环境过于古老,得下载个可兼容版本。
卸载旧版本。
🍖 Erlang 下载(如下图所示):

Erlang 版本都 27 了。
Eelang 安装完毕,RabbitMQ 同样顺利安装完成。

在 sbin 目录下执行以下命令,安装 rabbitmq 监控面板:(2023/10/25 晚)
1
| rabbitmq-plugins enable rabbitmq_management
|

在 sbin 目录下执行以下命令,启动 rabbitmq:
- 可以在任务管理器处检查 RabbitMQ 服务是否正常启动:

登入监控面板
- 访问
http://localhost:15672
,输入初始账号密码,登入监控面板:
默认账号:guest
默认密码:guest

SpringBoot 集成 RabbitMQ
简单的 demo 演示
导入依赖
🥣 RabbitMQ 的相关依赖坐标可以在 mvn 中找到,依赖坐标可以在官方文档中查到:


1 2 3 4 5 6
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.19.0</version> </dependency>
|
简单的 Producer / Receiver
🍟 定义简单的生产者和消费者,完成 demo 代码的编写。当然,这部分代码也可以在官方文档中找到
🍛
生产消息 demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public class MessageProducer {
private final static String QUEUE_NAME = "hello3";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
|
消费消息 demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public class MessageReceiver {
private final static String QUEUE_NAME = "hello3";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
效果展示
🍿 启动 MessageProducer 分别生产 hello、hello1、hello2 三条消息
🌭 再启动 MessageReceiver 分别消费 hello、hello2 两条消息

- 在 RabbitMQ 监控面板中的显示效果如下:(2023/10/26 午)

Work Queue 多消费者
🍖 推荐阅读:RabbitMQ tutorial - Work Queues — RabbitMQ
- 生产者
MultiMesProducer
:(2024/01/15 午)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class MultiMesProducer {
private static final String TASK_QUEUE_NAME = "multi_queue";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class MultiMesReceiver {
private static final String TASK_QUEUE_NAME = "multi_queue";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
final Connection connection = factory.newConnection();
for (int i = 0; i < 2; i++) { final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1);
int finalI = i; DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");
try { System.out.println(" [x] Received '" + "编号:" + finalI + ":" + message + "'");
Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } } }
|


Publish / Subcribe 发布订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout-exchange";
public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } catch (TimeoutException | IOException e) { throw new RuntimeException(e); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class FanoutConsumer { private static final String EXCHANGE_NAME = "fanout-exchange";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
Connection connection = factory.newConnection(); Channel channel1 = connection.createChannel(); Channel channel2 = connection.createChannel(); channel1.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName1 = "nice_queue"; channel1.queueDeclare(queueName1, true, false, false, null); channel2.queueBind(queueName1, EXCHANGE_NAME, "");
String queueName2 = "fuck_queue"; channel1.queueDeclare(queueName2, true, false, false, null); channel2.queueBind(queueName2, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" nice " + message + "'"); };
DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" fuck " + message + "'"); };
channel1.basicConsume(queueName1, true, deliverCallback1, consumer -> { }); channel2.basicConsume(queueName2, true, deliverCallback2, consumer -> { }); } }
|
- 生产者生产消息,所有消费者都能够收到:(2024/01/15 午)


项目实践
2025 年 4 月 27 日
时隔一年多,在毕设项目中的订单提交流程中使用到了 RabbitMQ,从零开始再学习一遍吧。
本地启动 RabbitMQ 失败了,也许是 Erlang 卸载或者版本不兼容了。
Windows10 上 RabbitMQ 安装和启动详细步骤 - 行业资讯 - 电子产品设计开发与电子技术学习交流! (52dianzi.com)
🍜 RabbitMQ 下载(如下图所示):

🍖 Erlang 下载(如下图所示):

亮点集锦