消息队列

消息队列的使用场景

  1. 解耦:在不同系统之间,把原来通过网络传输换为MQ进行消息的异步通信。只要该操作不需要同步,就可以换成MQ,这样项目间不存在耦合。

  2. 异步:一个操作可能需要好几个步骤,比如说用户创建一个订单,之后还需要更新库存等步骤,这样可能会需要大量时间,这样客户是无法接受的,而后面的几个操作其实不需要同步进行,我们就可以将其放入消息队列中去异步操作,加快系统的访问速度,提供更好的客户体验。

  3. 削峰:一个系统的访问流量有高峰期也有低峰期,在流量过大的时候,我们可以将用户的大量消息放入消息队列中,然后按我们系统的最大消费能力去消费这些消息,这样可以保证系统的稳定性

消息丢失怎么解决

首先我们在一开始创建消息队列的时候,就要将durable设置为true,即开启持久化,确保队列在重启后仍然存在

并且我们在代码逻辑中必须确保拿到消息的唯一标识后手动向MQ发送确认。

如果没有成功,我们需要在抛出的异常中将消息重新入队。

 Producer
   └─ 发布确认 ✅
       └─ 持久化消息 deliveryMode = 2 ✅
           └─ RabbitMQ durable Queue ✅
               └─ 手动ACK ✅
                   └─ 失败重试/死信队列 ✅
                       └─ 幂等检查 ✅
                           └─ 成功处理 ✅
 ​

企业中,为了避免消息丢失,RabbitMQ 不仅仅需要 durable 和 ack, 更关键的是:生产端确认 + 消息持久化 + 消费幂等 + 失败补偿 + 全链路监控 + 死信机制,一个都不能少。

RabbitMQ

简单队列模式是一个生产者一个队列一个消费者,他比较适合任务量不大的情况,比如说下单后通知消费者下单成功

工作队列是一个生产者一个队列多个消费者,我认为工作队列是简单队列的升级版,主要应对于任务量较大的情况,我们采用工作队列的公平分发模式可以很好解决这个问题

fanout模式,指发布订阅模式,他采用交换机将信息传递给多个队列,比较适合一条消息需要多个系统处理的情况,比如说用户注册成功通知多个系统

路由模式其实就是在fanout模式的基础上,提供了更强的定向分发能力,适合分发给特定的系统,更加的个性化,比如说按照不同的业务类型推送消息

而主体模式这是增加了模糊匹配功能,提供了一个动态分发的功能

3种交换机

fanout:

他会将所有发送到改交换机的消息发送到与他绑定的所有queue中

direct:

他会将消息发送到BindingKey(一个队列可以有多个Binding key)和RoutingKey完全匹配的Queue中

topic:

这其实就是在direct的基础上支持了模糊匹配,*只能代表一个单词,#代表0个或多个单词

5种工作模式

简单队列模式:

只包含一个生产者和一个消费者

工作队列模式:

多个消费者绑定到同一个队列上

轮询分发和公平分发

轮询分发:一个消费者一条,按均分配

公平分发:按照处理消息的能力来分配

发布订阅模式

就是fanout交换器

路由模式

direct交换器

主题模式

topic交换器

实操

前置工作

依赖导入

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

配置文件

 server:
   port: 8080
 ​
 spring:
   rabbitmq:
     host: 47.108.159.244
     port: 5672
     username: admin
     password: admin

简单队列模式

配置类

这样启动时就会自动识别,然后创建一个名为"simple.queue"的简单队列

 @Configuration
 public class RabbitConfig {
 ​
     public static final String SIMPLE_QUEUE = "simple.queue";
 ​
     @Bean
     public Queue simpleQueue() {
         // 持久化队列
         return new Queue(SIMPLE_QUEUE, true);
     }
 }

生产者

 @Service//一般都用@Service不用@Component
 public class Producer {
 ​
     @Autowired
     private RabbitTemplate rabbitTemplate;
 ​
     public void sendMessage(String msg) {
         //队列名称和信息就行
         rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE, msg);
         System.out.println("发送消息: " + msg);
     }
 }

消费者

 @Service
 public class Consumer {
 ​
     //这个注解会持续监听名为"simple.queue"的消息队列
     @RabbitListener(queues = "simple.queue")
     public void receiveMessage(String message) {
         System.out.println("收到消息: " + message);
         // 这里写你的业务逻辑
     }
 }

Controller

 @RestController
 public class Controller {
 ​
     @Autowired
     private Producer producer;
 ​
     @GetMapping("/simple")
     public String send(String msg) {
         producer.sendMessage(msg);
         return "消息发送成功: " + msg;
     }
 }

这样调用接口-Producer-mq-Consumer

如果有多个消费者监听同一个队列,那么是竞争消费,只会被消费一次。但是一般只是一个队列一个消费者

工作队列模式

Rabbit队列本身就是队列,不区分简单队列和工作队列,区别在于你怎么使用他,像上面那样一个队列多个消费者那就是工作队列的轮询分发模式了

配置类

     public static final String WORK_QUEUE = "work.queue";
 ​
     @Bean
     public Queue workQueue() {
         // durable = true 表示持久化
         return new Queue(WORK_QUEUE, true);
     }
 ​
 ​
     //自定义工厂
     @Bean
     public SimpleRabbitListenerContainerFactory manualAckListenerContainerFactory(ConnectionFactory connectionFactory) {
         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
         factory.setConnectionFactory(connectionFactory);
 ​
         // 设置手动ACK
         factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 ​
         // 设置prefetchCount实现公平分发,一次只派发1条消息给消费者
         factory.setPrefetchCount(1);
 ​
         return factory;
     }
 ​

生产者

     @Autowired
     private RabbitTemplate rabbitTemplate;
 ​
     public void sendTask(String task) {
         // 发送消息到名为 work.queue 的队列
         rabbitTemplate.convertAndSend("work.queue", task);
         System.out.println("任务已发送:" + task);
     }

消费者

     @RabbitListener(queues = "work.queue", containerFactory = "manualAckListenerContainerFactory")
     public void handleWorkQueue(Message message, Channel channel) throws IOException {
         long deliveryTag = message.getMessageProperties().getDeliveryTag();
         try {
             String msg = new String(message.getBody(), StandardCharsets.UTF_8);
             System.out.println("收到工作队列消息: " + msg);
 ​
             // 模拟业务处理耗时
             Thread.sleep(1000);
 ​
             // 处理成功,手动ACK
             channel.basicAck(deliveryTag, false);
 ​
         } catch (Exception e) {
             System.err.println("处理失败,消息重新入队: " + e.getMessage());
 ​
             // 处理失败,消息重新入队(NACK)
             channel.basicNack(deliveryTag, false, true);
         }
     }

fanout模式

配置类

     public static final String FANOUT_EXCHANGE = "user.register.exchange";
     public static final String EMAIL_QUEUE = "user.register.email.queue";
     public static final String LOG_QUEUE = "user.register.log.queue";
 ​
     //配置交换机
     @Bean
     public FanoutExchange fanoutExchange() {
         return new FanoutExchange(FANOUT_EXCHANGE);
     }
     //配置两个队列
     @Bean
     public Queue emailQueue() {
         return new Queue(EMAIL_QUEUE, true);
     }
     @Bean
     public Queue logQueue() {
         return new Queue(LOG_QUEUE, true);
     }
     //让交换机连接到队列
     @Bean
     public Binding emailBinding(FanoutExchange fanoutExchange, Queue emailQueue) {
         return BindingBuilder.bind(emailQueue).to(fanoutExchange);
     }
 ​
     @Bean
     public Binding logBinding(FanoutExchange fanoutExchange, Queue logQueue) {
         return BindingBuilder.bind(logQueue).to(fanoutExchange);
     }

生产者

     @Autowired
     private RabbitTemplate rabbitTemplate;
 ​
     public void sendRegisterMessage(String username) {
         String message = "用户【" + username + "】注册成功,发送通知广播";
         // 发送到 fanout 交换机,routingKey 忽略
         rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", message);
     }

消费者

     @RabbitListener(queues = RabbitConfig.EMAIL_QUEUE)
     public void handleEmail(String message) {
         System.out.println("📧 [邮件服务] 接收到消息:" + message);
         // 模拟发送邮件...
     }
 ​
     @RabbitListener(queues = RabbitConfig.LOG_QUEUE)
     public void handleLog(String message) {
         System.out.println("📝 [日志服务] 接收到消息:" + message);
         // 模拟写日志...
     }

疑问

  • 消费者端只要写Consumer就可以了,记得加上@Service就行

  • 对于RabbitConfig,一般需要写该项目中用了哪些就写哪些,不用像生产者那里全写

  • 企业中也是直接用默认的RabbitTemplate就可以了,不像Redis一样

  • 一般企业中都是一个业务的生产者和消费者放在一个类里

对于同一队列中传入的不同类型的怎么分辨呢?

一个队列一个类,Spring 会根据消息反序列化后的类型自动选择哪个 @RabbitHandler 方法来执行。

注意,这里@RabbitListener放类上,@RabbitHandler放方法上

 @Component
 @RabbitListener(queues = "my.queue")
 public class MyConsumer {
 ​
     @RabbitHandler
     public void handleString(String msg) {
         System.out.println("接收到字符串消息: " + msg);
     }
 ​
     @RabbitHandler
     public void handleUser(User user) {
         System.out.println("接收到用户对象: " + user.getName());
     }
 }


比较是偷走幸福的小偷