1、添加依赖
springboot版本
org.springframework.boot spring-boot-starter-parent 2.0.5.RELEASE
需要的依赖
org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test
2、生产端配置
application.yml
server: port: 8080spring: rabbitmq: host: 118.89.196.99 #服务地址 port: 5672 #端口 username: suzhe #用户名 password: suzhe #密码 virtual-host: /hello #虚拟主机 publisher-confirms: true #如果要进行消息回调,则这里必须要设置为true publisher-returns: true #开启publisher Return机制 template: mandatory: true #开启mandatory
通过RabbitAdmin 提供的api 来初始化交换机队列并绑定。这里通过实现InitializingBean接口 ,在该类初始化后会执行afterPropertiesSet()这个方法。
下面主要声明了一个交换机order.topic,并声明了队列point.queue和cash.queue绑定到该交换机。其中point.queue设置了死信交换机的参数,同时也可以给队列设置过期时间,这里注释掉了。
@Configurationpublic class RabbitmqConfig implements InitializingBean { @Autowired RabbitAdmin rabbitAdmin; @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Override public void afterPropertiesSet() throws Exception { //声明一个死信交换机和一个队列 并绑定 rabbitAdmin.declareExchange(new TopicExchange("dlx.topic", true, false)); rabbitAdmin.declareQueue(new Queue("dlx.point.queue", true, false, false)); rabbitAdmin.declareBinding(new Binding("dlx.point.queue", Binding.DestinationType.QUEUE, "dlx.topic", "order.success", new HashMap<>())); //声明一个交换机 rabbitAdmin.declareExchange(new TopicExchange("order.topic", true, false)); //声明一个point队列 MappointArgs = new HashMap<>(); //pointArgs.put("x-message-ttl" , 30000);//可以设置队列里消息的ttl的时间30s //设置死信交换机 并指定 routing_key 为order.success pointArgs.put( "x-dead-letter-exchange","dlx.topic"); pointArgs.put( "x-dead-letter-routing-key","order.success"); rabbitAdmin.declareQueue(new Queue("point.queue", true, false, false, pointArgs)); //绑定point rabbitAdmin.declareBinding(new Binding("point.queue", Binding.DestinationType.QUEUE, "order.topic", "order.success", new HashMap<>())); //声明cash队列 rabbitAdmin.declareQueue(new Queue("cash.queue", true, false, false)); //绑定cash rabbitAdmin.declareBinding(new Binding("cash.queue", Binding.DestinationType.QUEUE, "order.topic", "order.success", new HashMap<>())); }}
封装一个发送者对象
主要是RabbitTemplate 提供的api,这里发送对象时注入了两个回调。
@Component@Slf4jpublic class RabbitSender { //自动注入RabbitTemplate模板类 @Autowired private RabbitTemplate rabbitTemplate; //回调函数: confirm确认 final ConfirmCallback confirmCallback = new ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("ack:{},CorrelationData:{}",ack,correlationData); if(!ack){ log.error("异常处理"); } } }; //回调函数: return返回 final ReturnCallback returnCallback = new ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("return exchange:{},routingKey:{},replyCode:{},replyText:{}",exchange,routingKey,replyCode,replyText); } }; //发送消息方法调用: 构建Message消息 public void send(String exchange,String routingKey,String msg) { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.convertAndSend(exchange, routingKey, msg); } }
测试Controller
@RestControllerpublic class OrderController { @Autowired RabbitSender rabbitSender; @GetMapping("/order") public ResponseEntity order(){ String orderId = UUID.randomUUID().toString(); rabbitSender.send("order.topic","order.success",orderId); return ResponseEntity.ok("success"); }}
启动springboot应用程序 可以看到对应的交换机和队列创建成功
点击可以查看到对应交换机的绑定信息
同时也可以查看到队列也创建成功了,查看一下point.queue这个队列的参数。
3、消费端配置
这里开启消费端的重试机制,当消费端超过5次还没有消费成功,则做出作出nack或reject等行为. 这时候消息将进入死信队列,需要注意的是这里配置的重发是在消费端应用内处理的,不是rabbitmq重发。
spring: rabbitmq: host: 192.168.6.133 #服务地址 username: suzhe #用户名 password: suzhe #密码 virtual-host: /hello #虚拟主机 connection-timeout: 5000ms #连接超时参数单位为毫秒:设置为“0”代表无穷大 listener: simple: retry: enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息) max-attempts: 5 #默认是3
监听队列
SpringBoot Rabbit使用手工应答机制, 当@RabbitListener修饰的方法被调用且没有抛出异常时, Spring Boot会为我们自动应答. 否则会根据设定的重试机制而作出nack或reject等行为. point.queue这个队列的监听会抛出异常。
@Component@Slf4jpublic class Listener { @RabbitListener(queues="point.queue") public void point(String msg){ log.info("point.queue 接收到消息:{}",msg); int i = 1/0; } @RabbitListener(queues="cash.queue") public void cash(String msg){ log.info("cash.queue 接收到消息:{}",msg); }}
启动消费者
生产者访问:http://localhost:8080/order 这里会发送消息到order.topic。可以看到point.queue收到了5次消息,然后抛出了异常
查看死信队列
获取消息,可以看到该条消息的信息。
详细源码地址: