当然,SpringBoot 也支持消费者重试机制,坑爹的是,不能使用 try..catch 处理异常,一旦使用了,重试机制就失效了,不过我提供了支持重试+重试失败之后消息自动到死信队列中的方案。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated # 消息发送到交换机会触发回调方法
publisher-returns: true # 是否开启生产者消息回退功能,队列中的消息不可路由回退到生产者
listener:
simple:
retry:
enabled: true # 开启消费者重试
max-attempts: 5 # 最大重试次数,默认为 3 次
initial-interval: 2000 # 消息的间隔时间 默认为 1 秒
acknowledge-mode: manual # 手动确认
default-requeue-rejected: false # 重试次数超过上面的设置之后是否丢失
package com.github.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 许大仙
* @version 1.0
* @since 2022-05-27 10:49:11
*/
@Configuration
public class ConfirmConfig {
/**
* 交换机名称
*/
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
/**
* 死信交换机名称
*/
public static final String DEAD_EXCHANGE_NAME = "dead.exchange";
/**
* 队列的名称
*/
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
/**
* 死信队列
*/
public static final String DEAD_QUEUE_NAME = "dead.queue";
/**
* routing_key
*/
public static final String CONFIRM_ROUTING_KEY = "confirm";
/**
* dead routing key
*/
public static final String DEAD_ROUTING_KEY = "dead";
/**
* 配置交换机
*/
@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
/**
* 配置交换机
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
/**
* 配置队列
*/
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME)
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
.build();
}
/**
* 配置队列
*/
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE_NAME)
.build();
}
/**
* 配置绑定关系
*/
@Bean
public Binding confirmBinding() {
return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
}
/**
* 配置绑定关系
*/
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTING_KEY);
}
}
package com.github.web;
import com.github.config.ConfirmConfig;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
/**
* @author 许大仙
* @version 1.0
* @since 2022-05-27 13:37:39
*/
@Slf4j
@Value
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/confirm")
public class ConfirmController {
RabbitTemplate rabbitTemplate;
@GetMapping(value = "/sendMsg/{msg}")
public String sendMsg(@PathVariable String msg) {
log.info("发送的消息是:{}", msg);
String id = "1";
CorrelationData correlationData = new CorrelationData(id);
// 发送消息
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8), correlationData);
return "发送消息";
}
}
package com.github.listener;
import com.github.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* @author 许大仙
* @version 1.0
* @since 2022-05-25 14:20:14
*/
@Slf4j
@Component
public class RabbitmqListener {
// 重试次数
private int retryCount = 0;
/**
* 如果是重试机制,就不能使用 try-catch 捕获,否则重试机制将失效。
* 如果是自动 ack ,多次重试还是失败,消息就会自动确认,消息就会丢失;
* 如果是手动 ack ,多次重试还是失败,也无法 nack ,就会一直处于 unacked 状态,导致消息积压。
*
* @param message
* @param channel
* @param tag
* @throws IOException
*/
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receive(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
try {
// 处理主要业务
log.info("接收到的消息是:{}", msg);
int i = 10 / 0;
// 确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
retryCount++;
log.info("重试次数:{}", retryCount);
// 处理业务失败,还要进行其他操作,比如记录失败原因
log.info("记录失败原因 ====>");
// 抛出异常,触发重试机制
throw e;
} finally {
// 重试次数达到限制
if (retryCount == 5) {
log.info("消息异常,nack 消息到死信队列");
channel.basicNack(tag, false, false);
}
}
}
@RabbitListener(queues = ConfirmConfig.DEAD_QUEUE_NAME)
public void receiveDead(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("死信队列接收到的消息是:{}", msg);
channel.basicAck(tag, false);
}
}