RabbitMQ基础知识详解

初步认识

消息队列介绍

消息队列(Message Queue,简称MQ)是一种应用程序对应用程序的通信方法。应用程序通过写入和检索出入队列的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。
消息队列的主要组成部分:

  • 消息发送者:投递消息的一方,相当于远程调用中的调用者
  • 消息接收者:接收和处理消息的一方,相当于远程调用中的服务提供者
  • 消息队列:管理、暂存、转发消息的中间服务器

同步与异步通信

同步通信

同步调用就像你和朋友视频聊天,双方同时在线交流。在微服务架构中,使用OpenFeign等技术进行的远程调用属于同步调用。

特点

  • 时效性强,可以立即得到响应
  • 调用方等待服务方响应期间处于阻塞状态
  • 系统耦合度高

问题

  • 处理高并发时性能受限
  • 调用链过长时容易级联失败
  • 扩展性差

异步通信

异步调用就像你发送一条短信,对方可以在任何时间查看并回复。在微服务架构中,使用消息队列进行的通信属于异步调用。

优点

  • 解耦合:发送方和接收方不直接依赖,提高系统灵活性
  • 减少耗时:发送方无需等待接收方处理完成
  • 故障隔离:下游服务故障不会影响上游服务
  • 缓存消息:能够应对流量峰值,起到削峰填谷的作用

缺点

  • 时效性差,无法立即得到处理结果
  • 无法保证后续业务一定成功
  • 业务安全依赖于消息代理的可靠性

技术选型

目前市场上主流的消息队列技术有:

特性ActiveMQRabbitMQRocketMQKafka
开发语言JavaErlangJavaScala
单机吞吐量万级万级十万级十万级
时效性ms级μs级ms级ms级以内
可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
功能特性成熟的产品,在很多公司得到应用轻量级,易于部署,管理界面友好功能丰富,扩展性好只支持主要的MQ功能,大数据领域应用广泛
适用场景传统项目中小型企业金融互联网大数据领域

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。RabbitMQ是使用Erlang语言来编写的,并且基于AMQP(Advanced Message Queuing Protocol,高级消息队列协议)协议。
结构:
8F9CCAA0AB2F1B50FAB05F61A06DF34F.png

安装部署

Docker安装

RabbitMQ可以通过Docker快速部署,以下是使用Docker安装RabbitMQ的命令:

docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network host \
-d \
rabbitmq:3.8-management

参数说明:

  • -e RABBITMQ_DEFAULT_USER:设置登录用户名
  • -e RABBITMQ_DEFAULT_PASS:设置登录密码
  • -v:数据卷挂载
  • --name--hostname:容器名和主机名
  • -p:端口映射
    • 15672:控制台Web界面端口
    • 5672:AMQP协议端口,用于收发消息
  • --network:网络配置(可选)
  • -d:后台运行

访问控制台

安装完成后,可以通过浏览器访问RabbitMQ的管理界面:

http://服务器IP:15672

使用设置的用户名和密码登录。

控制台操作

RabbitMQ控制台主要包含以下几个部分:

  • Overview:系统概览,显示节点、连接等信息
  • Connections:当前的连接
  • Channels:通道信息
  • Exchanges:交换机管理
  • Queues:队列管理
  • Admin:用户和权限管理
    QQ20250607-005613.png

Queue(队列)

队列是RabbitMQ的核心组件,用于存储消息直到被消费者处理。

在队列页面中:

  1. 第一部分显示所有现有队列,可以点击队列名查看详细信息并进行操作
  2. 第二部分是新建队列的表单,其中队列名是必填项
    3CEAE71EDBC60441789114E70E3B031A.png

进入队列详情页面后,可以:

  • Bindings部分查看与交换机的绑定关系或创建新的绑定
  • 删除队列
  • 查看队列中的消息
  • 发布消息到队列
  • 清空队列
    QQ20250607-005817.png

Exchange(交换机)

交换机负责接收消息并将其路由到一个或多个队列。交换机本身不存储消息,如果没有队列与交换机绑定,消息将会丢失。

交换机页面与队列页面类似,可以:

  1. 查看所有交换机
  2. 创建新的交换机

进入交换机详情页面后,可以:

  • Bindings部分绑定队列,输入队列名称并指定路由键(routing key)
  • Publish message部分发送测试消息
  • 删除交换机

数据隔离

RabbitMQ通过虚拟主机(Virtual Host)实现数据隔离。不同的虚拟主机之间的队列和交换机是完全隔离的,互不可见。

在Admin页面下可以:

  1. 管理用户

    • 查看所有用户
    • 添加新用户(设置用户名、密码和权限)
  2. 管理虚拟主机

    • 创建新的虚拟主机
    • 为用户分配虚拟主机的访问权限

Java客户端操作

发送消息(生产者)

使用Java客户端发送消息到RabbitMQ的基本步骤:

  1. 添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置RabbitMQ连接信息
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
  1. 发送消息示例
@Service
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

接收消息(消费者)

接收消息的基本方式:

  1. 使用@RabbitListener注解
@Component
public class MessageConsumer {
    
    @RabbitListener(queues = "myQueue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
        // 处理消息的业务逻辑
    }
}
  1. 手动确认消息(一般用这个)
@Component
public class ManualAckConsumer {
    
    @RabbitListener(queues = "myQueue", ackMode = "MANUAL")
    public void receiveMessage(String message, Channel channel, 
                              @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            System.out.println("Received message: " + message);
            // 处理消息的业务逻辑
            
            // 手动确认消息已处理
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 处理失败,拒绝消息
            channel.basicNack(tag, false, true);
        }
    }
}

工作队列模式(work模式)

工作队列(Work Queue)模式用于在多个消费者之间分配任务,避免立即执行资源密集型任务。

  • 多个消费者绑定到同一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理,避免重复消费
  • 通过prefetch配置控制消费者预取的消息数量,实现能者多劳

默认情况下,RabbitMQ会将消息依次轮询(Round-Robin)投递给绑定在队列上的每一个消费者。但这种方式并没有考虑到消费者的处理能力,可能会导致部分消费者消息堆积,而其他消费者却处于空闲状态。

为了优化这个问题,我们可以通过设置prefetch值来限制每个消费者能够预取的消息数量。例如,将prefetch设置为1,确保消费者在处理完当前消息之前不会收到新的消息:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1  # 每次只能获取一条消息,处理完成才能获取下一个消息

这样的配置可以实现"能者多劳":处理速度快的消费者可以处理更多的消息,而处理速度慢的消费者则会处理较少的消息,从而提高整体的处理效率。
QQ20250607-010310.png

实现工作队列模式:

  1. 生产者代码
@Component
public class TaskProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendTask(String message) {
        rabbitTemplate.convertAndSend("workQueue", message);
    }
}
  1. 消费者代码(可以启动多个实例)
@Component
public class Worker {
    
    @RabbitListener(queues = "workQueue")
    public void processTask(String message) throws InterruptedException {
        System.out.println("Processing task: " + message);
        // 模拟耗时操作
        Thread.sleep(1000);
    }
}

交换机类型(重点)

RabbitMQ支持多种类型的交换机,每种类型有不同的路由策略:

广播交换机(Fanout Exchange)

广播交换机会将消息发送到所有绑定的队列,忽略路由键。这是最简单的交换机类型,适合广播消息。

QQ20250607-012934.png

实现示例:
配置交换机和队列(消费者)

// 配置交换机和队列(消费者)
@Configuration
public class FanoutConfig {
    
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
    
    @Bean
    public Queue queueA() {
        return new Queue("fanout.A");
    }
    
    @Bean
    public Queue queueB() {
        return new Queue("fanout.B");
    }
    
    @Bean
    public Binding bindingA(Queue queueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }
    
    @Bean
    public Binding bindingB(Queue queueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }
}

生产者发送消息

// 发送消息
@Service
public class FanoutSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(String message) {
        rabbitTemplate.convertAndSend("fanoutExchange", "", message);
    }
}

定向交换机(Direct Exchange)

定向交换机根据消息的路由键(routing key)将消息发送到特定的队列。队列通过特定的路由键绑定到交换机。

QQ20250607-013214.png

实现示例:

// 配置交换机和队列(消费者服务)
@Configuration
public class DirectConfig {
    
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }
    
    @Bean
    public Queue errorQueue() {
        return new Queue("direct.error");
    }
    
    @Bean
    public Queue infoQueue() {
        return new Queue("direct.info");
    }
    
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(errorQueue).to(directExchange).with("error");
    }
    
    @Bean
    public Binding infoBinding(Queue infoQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(infoQueue).to(directExchange).with("info");
    }
}

// 发送消息(另一个生产者服务)
@Service
public class DirectSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendError(String message) {
        rabbitTemplate.convertAndSend("directExchange", "error", message);
    }
    
    public void sendInfo(String message) {
        rabbitTemplate.convertAndSend("directExchange", "info", message);
    }
}

话题交换机(Topic Exchange)

话题交换机根据通配符匹配路由键,将消息发送到匹配的队列。这是最灵活的交换机类型。
QQ20250607-013320.png

路由键通配符规则:

  • *:匹配一个单词
  • #:匹配零个或多个单词

实现示例:

// 配置交换机和队列
@Configuration
public class TopicConfig {
    
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }
    
    @Bean
    public Queue allQueue() {
        return new Queue("topic.all");
    }
    
    @Bean
    public Queue userQueue() {
        return new Queue("topic.user");
    }
    
    @Bean
    public Binding allBinding(Queue allQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(allQueue).to(topicExchange).with("#");
    }
    
    @Bean
    public Binding userBinding(Queue userQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(userQueue).to(topicExchange).with("user.#");
    }
}

// 发送消息
@Service
public class TopicSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendUserMessage(String message) {
        rabbitTemplate.convertAndSend("topicExchange", "user.save", message);
    }
    
    public void sendOrderMessage(String message) {
        rabbitTemplate.convertAndSend("topicExchange", "order.create", message);
    }
}

总结

问:什么是RabbitMQ,它的核心组件有哪些?

关键知识点

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP协议实现。它的核心组件包括:

  1. Producer(生产者):发送消息的应用程序
  2. Consumer(消费者):接收和处理消息的应用程序
  3. Queue(队列):存储消息的缓冲区,直到消费者处理它们
  4. Exchange(交换机):接收生产者发送的消息,并根据路由规则将它们路由到队列
  5. Binding(绑定):定义交换机和队列之间的关系
  6. Virtual Host(虚拟主机):提供逻辑隔离,一个虚拟主机包含一组交换机、队列和绑定

深入解析

RabbitMQ的工作流程是:生产者将消息发送到交换机,交换机根据路由规则将消息路由到一个或多个队列,消费者从队列中获取消息并处理。

RabbitMQ支持多种交换机类型,每种类型有不同的路由策略:

  • Fanout Exchange:将消息广播到所有绑定的队列
  • Direct Exchange:根据精确的路由键匹配将消息发送到队列
  • Topic Exchange:根据通配符匹配路由键将消息发送到队列
  • Headers Exchange:根据消息头信息而非路由键进行路由

在高可用性方面,RabbitMQ支持集群部署,可以实现负载均衡和故障转移。它还提供了消息持久化机制,确保消息在服务器重启后不会丢失。

对于消息的可靠性,RabbitMQ提供了消息确认机制(ACK),确保消息被正确处理。同时,它还支持消息的TTL(生存时间)和死信队列,用于处理过期或无法处理的消息。

问:同步调用和异步调用(消息队列)的区别和各自的优缺点?

关键知识点

同步调用和异步调用(消息队列)的主要区别:

  1. 通信方式

    • 同步调用:调用方直接调用服务方,等待响应
    • 异步调用:调用方发送消息到消息队列,不等待响应
  2. 耦合度

    • 同步调用:系统间紧密耦合
    • 异步调用:系统间松散耦合
  3. 时效性

    • 同步调用:实时性强,立即得到响应
    • 异步调用:实时性弱,无法立即得到响应

深入解析

同步调用的优缺点:

  • 优点:实时性强,流程简单清晰,易于理解和实现
  • 缺点:系统耦合度高,调用方需等待服务方响应,性能受限,容易级联失败

异步调用(消息队列)的优缺点:

  • 优点:

    1. 系统解耦:发送方和接收方不直接依赖
    2. 削峰填谷:能够应对流量峰值
    3. 故障隔离:下游服务故障不会影响上游服务
    4. 异步处理:提高系统吞吐量
  • 缺点:

    1. 实时性差:无法立即得到处理结果
    2. 系统复杂度增加:需要考虑消息丢失、重复消费等问题
    3. 依赖消息中间件:增加了系统的复杂性和维护成本

在实际应用中,通常会根据业务场景选择合适的通信方式。对于需要立即响应的核心业务流程,可以使用同步调用;对于可以异步处理的非核心业务,可以使用消息队列实现异步调用,提高系统的整体性能和可用性。