一、添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、增加配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
listener:
simple:
concurrency: 10 #并发数
max-concurrency: 10 #最大并发数
prefetch: 1 #预取数
default-requeue-rejected: true #默认情况下,重新获取队列,拒绝
auto-startup: true #自动启动
template:
retry:
enabled: true #重试开启
initial-interval: 1000 #初始间隔
max-attempts: 3 #最大尝试次数
max-interval: 10000 #最大间隔
multiplier: 1.0 #乘数
三、三种模式
1、direct模式交换机(exchange)模式
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
public static final String QUEUE_NAME = "queue";
@Bean
public Queue queue(){
//第一个参数是队列名 第二是是否持久化
return new Queue(QUEUE_NAME,true);
}
}
创建消息发送者
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String msg){
amqpTemplate.convertAndSend(MQConfig.QUEUE_NAME,msg);
System.out.println("send message:"+msg);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MQReceiver {
@RabbitListener(queues = MQConfig.QUEUE_NAME)
public void receiver(String message){
System.out.println("receiveMessage:"+message);
}
}
2、Topic模式交换机(exchange)模式
/**
* topic 模式
*/
@Bean
public Queue topicQueue1(){
return new Queue("topic.queue1",true);//第一个参数是队列名 第二是是否持久化
}
@Bean
public Queue topicQueue2(){
return new Queue("topic.queue2",true);//第一个参数是队列名 第二是是否持久化
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Binding topicBinding(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
}
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}
/**
* 流程说明:我们先创建了两个queue 分别命名为 topic.queue1 和 topic.queue2 , 然后再创建一个交换机 命名为 topicExchang,最后将两个queue和交换机绑定,同时制定了匹配规则 ,"#"代表全部匹配
* /
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendTopic(String msg){
amqpTemplate.convertAndSend("topicExchange","topic.key1",msg+"1");//第一个参数代表交换机名 第二个代表满足匹配规则的表达式 第三个消息
amqpTemplate.convertAndSend("topicExchange","topic.key2",msg+"2");
System.out.println("send message:"+msg);
}
}
/**
* 我们在绑定交换机与queue时制定了匹配规则,"topic.key1"只能匹配"topic.key1","topic.#"可以匹配全部以"topic."开头的消息; 这样,第一条消息就会被 topic.queue1和 topic.queue2所匹配,而第二条只能被 topic.queue2匹配到
* /
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1)
public void receiverTopic1(String message){
System.out.println("receive topic queue1 message:"+message);
}
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2)
public void receiverTopic2(String message){
System.out.println("receive topic queue2 message:"+message);
}
3、Fanout模式交换机(exchange)模式
/**
* fanout模式
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
}
public void sendFanout(String msg){
//第一个参数代表交换机名,第三个消息
amqpTemplate.convertAndSend("fanoutExchange","",msg);
System.out.println("send fanout message:"+msg);
}
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1)
public void receiverTopic1(String message){
System.out.println("receive queue1 message:"+message);
}
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2)
public void receiverTopic2(String message){
System.out.println("receive queue2 message:"+message);
}