博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rabbitmq-springboot集成
阅读量:6967 次
发布时间:2019-06-27

本文共 5369 字,大约阅读时间需要 17 分钟。

hot3.png

1、添加依赖

springboot版本

org.springframework.boot
spring-boot-starter-parent
2.0.5.RELEASE

需要的依赖

org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-test

 

2、生产端配置

application.yml

server:  port: 8080spring:  rabbitmq:    host: 118.89.196.99 #服务地址    port: 5672 #端口    username: suzhe #用户名    password: suzhe #密码    virtual-host: /hello #虚拟主机    publisher-confirms: true #如果要进行消息回调,则这里必须要设置为true    publisher-returns: true  #开启publisher Return机制    template:      mandatory: true #开启mandatory

通过RabbitAdmin 提供的api 来初始化交换机队列并绑定。这里通过实现InitializingBean接口 ,在该类初始化后会执行afterPropertiesSet()这个方法。

下面主要声明了一个交换机order.topic,并声明了队列point.queue和cash.queue绑定到该交换机。其中point.queue设置了死信交换机的参数,同时也可以给队列设置过期时间,这里注释掉了。

@Configurationpublic class RabbitmqConfig implements InitializingBean {    @Autowired    RabbitAdmin rabbitAdmin;    @Bean    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {        return new RabbitAdmin(connectionFactory);    }    @Override    public void afterPropertiesSet() throws Exception {        //声明一个死信交换机和一个队列 并绑定        rabbitAdmin.declareExchange(new TopicExchange("dlx.topic", true, false));        rabbitAdmin.declareQueue(new Queue("dlx.point.queue", true, false, false));        rabbitAdmin.declareBinding(new Binding("dlx.point.queue",                Binding.DestinationType.QUEUE,                "dlx.topic", "order.success", new HashMap<>()));        //声明一个交换机        rabbitAdmin.declareExchange(new TopicExchange("order.topic", true, false));        //声明一个point队列        Map
pointArgs = new HashMap<>(); //pointArgs.put("x-message-ttl" , 30000);//可以设置队列里消息的ttl的时间30s //设置死信交换机 并指定 routing_key 为order.success pointArgs.put( "x-dead-letter-exchange","dlx.topic"); pointArgs.put( "x-dead-letter-routing-key","order.success"); rabbitAdmin.declareQueue(new Queue("point.queue", true, false, false, pointArgs)); //绑定point rabbitAdmin.declareBinding(new Binding("point.queue", Binding.DestinationType.QUEUE, "order.topic", "order.success", new HashMap<>())); //声明cash队列 rabbitAdmin.declareQueue(new Queue("cash.queue", true, false, false)); //绑定cash rabbitAdmin.declareBinding(new Binding("cash.queue", Binding.DestinationType.QUEUE, "order.topic", "order.success", new HashMap<>())); }}

封装一个发送者对象

主要是RabbitTemplate 提供的api,这里发送对象时注入了两个回调。

@Component@Slf4jpublic class RabbitSender {	//自动注入RabbitTemplate模板类	@Autowired	private RabbitTemplate rabbitTemplate;  		//回调函数: confirm确认	final ConfirmCallback confirmCallback = new ConfirmCallback() {		@Override		public void confirm(CorrelationData correlationData, boolean ack, String cause) {			log.info("ack:{},CorrelationData:{}",ack,correlationData);			if(!ack){				log.error("异常处理");			}		}	};		//回调函数: return返回	final ReturnCallback returnCallback = new ReturnCallback() {		@Override		public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,				String exchange, String routingKey) {			log.info("return exchange:{},routingKey:{},replyCode:{},replyText:{}",exchange,routingKey,replyCode,replyText);		}	};	//发送消息方法调用: 构建Message消息	public void send(String exchange,String routingKey,String msg) {		rabbitTemplate.setConfirmCallback(confirmCallback);		rabbitTemplate.setReturnCallback(returnCallback);		rabbitTemplate.convertAndSend(exchange, routingKey, msg);	}	}

测试Controller

@RestControllerpublic class OrderController {    @Autowired    RabbitSender rabbitSender;    @GetMapping("/order")    public ResponseEntity order(){        String orderId = UUID.randomUUID().toString();        rabbitSender.send("order.topic","order.success",orderId);        return ResponseEntity.ok("success");    }}

启动springboot应用程序 可以看到对应的交换机和队列创建成功

ad8ee6d1aae4b217dd357feb0afa3ca8232.jpg

点击可以查看到对应交换机的绑定信息

f5a84786b091d5ed4ec96b923594e3e8148.jpg

同时也可以查看到队列也创建成功了,查看一下point.queue这个队列的参数。

19993a57d70e17b3cece65d3b040dc3f48d.jpg

 

3、消费端配置

这里开启消费端的重试机制,当消费端超过5次还没有消费成功,则做出作出nack或reject等行为. 这时候消息将进入死信队列,需要注意的是这里配置的重发是在消费端应用内处理的,不是rabbitmq重发。

spring:  rabbitmq:    host: 192.168.6.133 #服务地址    username: suzhe #用户名    password: suzhe #密码    virtual-host: /hello #虚拟主机    connection-timeout: 5000ms #连接超时参数单位为毫秒:设置为“0”代表无穷大    listener:      simple:        retry:          enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)          max-attempts: 5 #默认是3

监听队列

SpringBoot Rabbit使用手工应答机制, 当@RabbitListener修饰的方法被调用且没有抛出异常时, Spring Boot会为我们自动应答. 否则会根据设定的重试机制而作出nack或reject等行为.  point.queue这个队列的监听会抛出异常。

@Component@Slf4jpublic class Listener {    @RabbitListener(queues="point.queue")    public void point(String msg){        log.info("point.queue 接收到消息:{}",msg);        int i = 1/0;    }    @RabbitListener(queues="cash.queue")    public void cash(String msg){        log.info("cash.queue 接收到消息:{}",msg);    }}

启动消费者

生产者访问:http://localhost:8080/order  这里会发送消息到order.topic。

可以看到point.queue收到了5次消息,然后抛出了异常

968fe407171fa731c23dd98372bdb2ff36a.jpg

查看死信队列

d902980e73e70468e7e7d6fe39faa9301f1.jpg

获取消息,可以看到该条消息的信息。

d972678d2d8a34564a53cd3c76411040649.jpg

 

详细源码地址:

 

b343eda16faea01bc727de7c76a560b4b4e.jpg

转载于:https://my.oschina.net/suzheworld/blog/3004357

你可能感兴趣的文章
Hibernate总结(二)
查看>>
TSP问题
查看>>
ubuntu14.06 Lts开启ssh服务
查看>>
对象比较:Comparable 和 Comparator
查看>>
jsp中的contentType与pageEncoding的区别和作用
查看>>
java 调用启动远程shell脚本,启动spark
查看>>
Spring boot ----RestTemplate学习笔记
查看>>
[LUOGU] P3128 [USACO15DEC]最大流Max Flow
查看>>
windows2003server下能安装的MSN
查看>>
Caffe将自己的文件生成lmdb
查看>>
C# 枚举中的位运算
查看>>
Codeforces Global Round 1 晕阙记
查看>>
百度文化秘籍
查看>>
Algs4-1.3.33一个双向队列Deque-双向链表实现
查看>>
Algs4-2.2.29自然的归并排序(未解决)
查看>>
shell中数组基础语法
查看>>
P1215 母亲的牛奶
查看>>
回头再看第一次项目
查看>>
有无关键字new的区别
查看>>
Hashmap,Set,Map,List,ArrayList的区别
查看>>