RabbitMQ基础知识详解
RabbitMQ基础知识详解
初步认识
消息队列介绍
消息队列(Message Queue,简称MQ)是一种应用程序对应用程序的通信方法。应用程序通过写入和检索出入队列的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。
消息队列的主要组成部分:
- 消息发送者:投递消息的一方,相当于远程调用中的调用者
- 消息接收者:接收和处理消息的一方,相当于远程调用中的服务提供者
- 消息队列:管理、暂存、转发消息的中间服务器
同步与异步通信
同步通信
同步调用就像你和朋友视频聊天,双方同时在线交流。在微服务架构中,使用OpenFeign等技术进行的远程调用属于同步调用。
特点:
- 时效性强,可以立即得到响应
- 调用方等待服务方响应期间处于阻塞状态
- 系统耦合度高
问题:
- 处理高并发时性能受限
- 调用链过长时容易级联失败
- 扩展性差
异步通信
异步调用就像你发送一条短信,对方可以在任何时间查看并回复。在微服务架构中,使用消息队列进行的通信属于异步调用。
优点:
- 解耦合:发送方和接收方不直接依赖,提高系统灵活性
- 减少耗时:发送方无需等待接收方处理完成
- 故障隔离:下游服务故障不会影响上游服务
- 缓存消息:能够应对流量峰值,起到削峰填谷的作用
缺点:
- 时效性差,无法立即得到处理结果
- 无法保证后续业务一定成功
- 业务安全依赖于消息代理的可靠性
技术选型
目前市场上主流的消息队列技术有:
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | Java | Erlang | Java | Scala |
单机吞吐量 | 万级 | 万级 | 十万级 | 十万级 |
时效性 | ms级 | μs级 | ms级 | ms级以内 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
功能特性 | 成熟的产品,在很多公司得到应用 | 轻量级,易于部署,管理界面友好 | 功能丰富,扩展性好 | 只支持主要的MQ功能,大数据领域应用广泛 |
适用场景 | 传统项目 | 中小型企业 | 金融互联网 | 大数据领域 |
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。RabbitMQ是使用Erlang语言来编写的,并且基于AMQP(Advanced Message Queuing Protocol,高级消息队列协议)协议。
结构:
安装部署
Docker安装
RabbitMQ可以通过Docker快速部署,以下是使用Docker安装RabbitMQ的命令:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network host \
-d \
rabbitmq:3.8-management
参数说明:
-e RABBITMQ_DEFAULT_USER
:设置登录用户名-e RABBITMQ_DEFAULT_PASS
:设置登录密码-v
:数据卷挂载--name
和--hostname
:容器名和主机名-p
:端口映射- 15672:控制台Web界面端口
- 5672:AMQP协议端口,用于收发消息
--network
:网络配置(可选)-d
:后台运行
访问控制台
安装完成后,可以通过浏览器访问RabbitMQ的管理界面:
http://服务器IP:15672
使用设置的用户名和密码登录。
控制台操作
RabbitMQ控制台主要包含以下几个部分:
- Overview:系统概览,显示节点、连接等信息
- Connections:当前的连接
- Channels:通道信息
- Exchanges:交换机管理
- Queues:队列管理
- Admin:用户和权限管理
Queue(队列)
队列是RabbitMQ的核心组件,用于存储消息直到被消费者处理。
在队列页面中:
- 第一部分显示所有现有队列,可以点击队列名查看详细信息并进行操作
- 第二部分是新建队列的表单,其中队列名是必填项
进入队列详情页面后,可以:
- 在Bindings部分查看与交换机的绑定关系或创建新的绑定
- 删除队列
- 查看队列中的消息
- 发布消息到队列
- 清空队列
Exchange(交换机)
交换机负责接收消息并将其路由到一个或多个队列。交换机本身不存储消息,如果没有队列与交换机绑定,消息将会丢失。
交换机页面与队列页面类似,可以:
- 查看所有交换机
- 创建新的交换机
进入交换机详情页面后,可以:
- 在Bindings部分绑定队列,输入队列名称并指定路由键(routing key)
- 在Publish message部分发送测试消息
- 删除交换机
数据隔离
RabbitMQ通过虚拟主机(Virtual Host)实现数据隔离。不同的虚拟主机之间的队列和交换机是完全隔离的,互不可见。
在Admin页面下可以:
-
管理用户
- 查看所有用户
- 添加新用户(设置用户名、密码和权限)
-
管理虚拟主机
- 创建新的虚拟主机
- 为用户分配虚拟主机的访问权限
Java客户端操作
发送消息(生产者)
使用Java客户端发送消息到RabbitMQ的基本步骤:
- 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置RabbitMQ连接信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
- 发送消息示例
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
接收消息(消费者)
接收消息的基本方式:
- 使用
@RabbitListener
注解
@Component
public class MessageConsumer {
@RabbitListener(queues = "myQueue")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
// 处理消息的业务逻辑
}
}
- 手动确认消息(一般用这个)
@Component
public class ManualAckConsumer {
@RabbitListener(queues = "myQueue", ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
System.out.println("Received message: " + message);
// 处理消息的业务逻辑
// 手动确认消息已处理
channel.basicAck(tag, false);
} catch (Exception e) {
// 处理失败,拒绝消息
channel.basicNack(tag, false, true);
}
}
}
工作队列模式(work模式)
工作队列(Work Queue)模式用于在多个消费者之间分配任务,避免立即执行资源密集型任务。
- 多个消费者绑定到同一个队列,可以加快消息处理速度
- 同一条消息只会被一个消费者处理,避免重复消费
- 通过prefetch配置控制消费者预取的消息数量,实现能者多劳
默认情况下,RabbitMQ会将消息依次轮询(Round-Robin)投递给绑定在队列上的每一个消费者。但这种方式并没有考虑到消费者的处理能力,可能会导致部分消费者消息堆积,而其他消费者却处于空闲状态。
为了优化这个问题,我们可以通过设置prefetch值来限制每个消费者能够预取的消息数量。例如,将prefetch设置为1,确保消费者在处理完当前消息之前不会收到新的消息:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
这样的配置可以实现"能者多劳":处理速度快的消费者可以处理更多的消息,而处理速度慢的消费者则会处理较少的消息,从而提高整体的处理效率。
实现工作队列模式:
- 生产者代码
@Component
public class TaskProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendTask(String message) {
rabbitTemplate.convertAndSend("workQueue", message);
}
}
- 消费者代码(可以启动多个实例)
@Component
public class Worker {
@RabbitListener(queues = "workQueue")
public void processTask(String message) throws InterruptedException {
System.out.println("Processing task: " + message);
// 模拟耗时操作
Thread.sleep(1000);
}
}
交换机类型(重点)
RabbitMQ支持多种类型的交换机,每种类型有不同的路由策略:
广播交换机(Fanout Exchange)
广播交换机会将消息发送到所有绑定的队列,忽略路由键。这是最简单的交换机类型,适合广播消息。
实现示例:
配置交换机和队列(消费者)
// 配置交换机和队列(消费者)
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Binding bindingA(Queue queueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
@Bean
public Binding bindingB(Queue queueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
}
生产者发送消息
// 发送消息
@Service
public class FanoutSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend("fanoutExchange", "", message);
}
}
定向交换机(Direct Exchange)
定向交换机根据消息的路由键(routing key)将消息发送到特定的队列。队列通过特定的路由键绑定到交换机。
实现示例:
// 配置交换机和队列(消费者服务)
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
public Queue errorQueue() {
return new Queue("direct.error");
}
@Bean
public Queue infoQueue() {
return new Queue("direct.info");
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange directExchange) {
return BindingBuilder.bind(errorQueue).to(directExchange).with("error");
}
@Bean
public Binding infoBinding(Queue infoQueue, DirectExchange directExchange) {
return BindingBuilder.bind(infoQueue).to(directExchange).with("info");
}
}
// 发送消息(另一个生产者服务)
@Service
public class DirectSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendError(String message) {
rabbitTemplate.convertAndSend("directExchange", "error", message);
}
public void sendInfo(String message) {
rabbitTemplate.convertAndSend("directExchange", "info", message);
}
}
话题交换机(Topic Exchange)
话题交换机根据通配符匹配路由键,将消息发送到匹配的队列。这是最灵活的交换机类型。
路由键通配符规则:
*
:匹配一个单词#
:匹配零个或多个单词
实现示例:
// 配置交换机和队列
@Configuration
public class TopicConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Queue allQueue() {
return new Queue("topic.all");
}
@Bean
public Queue userQueue() {
return new Queue("topic.user");
}
@Bean
public Binding allBinding(Queue allQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(allQueue).to(topicExchange).with("#");
}
@Bean
public Binding userBinding(Queue userQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(userQueue).to(topicExchange).with("user.#");
}
}
// 发送消息
@Service
public class TopicSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendUserMessage(String message) {
rabbitTemplate.convertAndSend("topicExchange", "user.save", message);
}
public void sendOrderMessage(String message) {
rabbitTemplate.convertAndSend("topicExchange", "order.create", message);
}
}
总结
问:什么是RabbitMQ,它的核心组件有哪些?
关键知识点
RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP协议实现。它的核心组件包括:
- Producer(生产者):发送消息的应用程序
- Consumer(消费者):接收和处理消息的应用程序
- Queue(队列):存储消息的缓冲区,直到消费者处理它们
- Exchange(交换机):接收生产者发送的消息,并根据路由规则将它们路由到队列
- Binding(绑定):定义交换机和队列之间的关系
- Virtual Host(虚拟主机):提供逻辑隔离,一个虚拟主机包含一组交换机、队列和绑定
深入解析
RabbitMQ的工作流程是:生产者将消息发送到交换机,交换机根据路由规则将消息路由到一个或多个队列,消费者从队列中获取消息并处理。
RabbitMQ支持多种交换机类型,每种类型有不同的路由策略:
- Fanout Exchange:将消息广播到所有绑定的队列
- Direct Exchange:根据精确的路由键匹配将消息发送到队列
- Topic Exchange:根据通配符匹配路由键将消息发送到队列
- Headers Exchange:根据消息头信息而非路由键进行路由
在高可用性方面,RabbitMQ支持集群部署,可以实现负载均衡和故障转移。它还提供了消息持久化机制,确保消息在服务器重启后不会丢失。
对于消息的可靠性,RabbitMQ提供了消息确认机制(ACK),确保消息被正确处理。同时,它还支持消息的TTL(生存时间)和死信队列,用于处理过期或无法处理的消息。
问:同步调用和异步调用(消息队列)的区别和各自的优缺点?
关键知识点
同步调用和异步调用(消息队列)的主要区别:
-
通信方式:
- 同步调用:调用方直接调用服务方,等待响应
- 异步调用:调用方发送消息到消息队列,不等待响应
-
耦合度:
- 同步调用:系统间紧密耦合
- 异步调用:系统间松散耦合
-
时效性:
- 同步调用:实时性强,立即得到响应
- 异步调用:实时性弱,无法立即得到响应
深入解析
同步调用的优缺点:
- 优点:实时性强,流程简单清晰,易于理解和实现
- 缺点:系统耦合度高,调用方需等待服务方响应,性能受限,容易级联失败
异步调用(消息队列)的优缺点:
-
优点:
- 系统解耦:发送方和接收方不直接依赖
- 削峰填谷:能够应对流量峰值
- 故障隔离:下游服务故障不会影响上游服务
- 异步处理:提高系统吞吐量
-
缺点:
- 实时性差:无法立即得到处理结果
- 系统复杂度增加:需要考虑消息丢失、重复消费等问题
- 依赖消息中间件:增加了系统的复杂性和维护成本
在实际应用中,通常会根据业务场景选择合适的通信方式。对于需要立即响应的核心业务流程,可以使用同步调用;对于可以异步处理的非核心业务,可以使用消息队列实现异步调用,提高系统的整体性能和可用性。