如需转载,请根据 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 许可,附上本文作者及链接。
本文作者: 执笔成念
作者昵称: zbcn
本文链接: https://1363653611.github.io/zbcn.github.io/2021/02/11/MQ_04rabbitmq%E4%BD%BF%E7%94%A8%E6%95%99%E7%A8%8B/
rabbitmq 使用教程
安装
略
管理界面
- 地址:http://localhost:15672/
- 用户名/密码:guest/guest
- 管理界面
管理界面功能
- 手动创建虚拟host
- 创建用户
- 分配权限
- 创建交换机
- 创建队列
- 等等…
其他:
- 查看队列消息
- 消费效率
- 推送效率
- 。。。
发送方回调机制
在创建 RabbitTemplate
bean 时,我们用到了两个callBack 方法:
1 | |
2 | public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ |
3 | RabbitTemplate rabbitTemplate = new RabbitTemplate(); |
4 | rabbitTemplate.setConnectionFactory(connectionFactory); |
5 | //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 |
6 | rabbitTemplate.setMandatory(true); |
7 | |
8 | rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { |
9 | |
10 | public void confirm(CorrelationData correlationData, boolean ack, String cause) { |
11 | System.out.println("ConfirmCallback: "+"相关数据:"+correlationData); |
12 | System.out.println("ConfirmCallback: "+"确认情况:"+ack); |
13 | System.out.println("ConfirmCallback: "+"原因:"+cause); |
14 | } |
15 | }); |
16 | |
17 | rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { |
18 | |
19 | public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { |
20 | System.out.println("ReturnCallback: "+"消息:"+message); |
21 | System.out.println("ReturnCallback: "+"回应码:"+replyCode); |
22 | System.out.println("ReturnCallback: "+"回应信息:"+replyText); |
23 | System.out.println("ReturnCallback: "+"交换机:"+exchange); |
24 | System.out.println("ReturnCallback: "+"路由键:"+routingKey); |
25 | } |
26 | }); |
27 | |
28 | return rabbitTemplate; |
29 | } |
一个叫 ConfirmCallback ,一个叫 RetrunCallback;
这两种回调函数都是在什么情况会触发呢?
- ①消息推送到server,但是在server里找不到交换机 —
ConfirmCallback 回调函数
- ②消息推送到server,找到交换机了,但是没找到队列 — 触发的是 ConfirmCallback和RetrunCallback两个回调函数。
- ③消息推送到sever,交换机和队列啥都没找到 —–触发的是 ConfirmCallback 回调函数。
- ④消息推送成功 — 触发的是 ConfirmCallback 回调函数。
消费者接收到消息的消息确认机制
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
自动确认
这也是默认的消息确认情况。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
根据情况确认
略
手动确认
我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用 basic.ack/basic.nack/basic.reject
后,RabbitMQ收到这些消息后,才认为本次投递成功。
basic.ack用于肯定确认
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
而basic.nack,basic.reject表示没有被正确处理:
reject
reject, 经常用于重入队列的场景。
channel.basicReject(deliveryTag, true);
拒绝消费当前消息.
如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。
设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。
但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。
nack
nack,相当于设置不消费某条消息。
channel.basicNack(deliveryTag, false, true);
- 第一个参数依然是当前消息到的数据的唯一id;
- 第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
- 第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
消息手动确认实例
新建MessageListenerConfig.java上添加代码相关的配置代码:
1 | |
2 | import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver; |
3 | import org.springframework.amqp.core.AcknowledgeMode; |
4 | import org.springframework.amqp.core.Queue; |
5 | import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; |
6 | import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; |
7 | import org.springframework.beans.factory.annotation.Autowired; |
8 | import org.springframework.context.annotation.Bean; |
9 | import org.springframework.context.annotation.Configuration; |
10 |
|
11 | public class MessageListenerConfig { |
12 | |
13 | |
14 | private CachingConnectionFactory connectionFactory; |
15 | |
16 | private MyAckReceiver myAckReceiver;//消息接收处理类 |
17 | |
18 | |
19 | public SimpleMessageListenerContainer simpleMessageListenerContainer() { |
20 | SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); |
21 | container.setConcurrentConsumers(1); |
22 | container.setMaxConcurrentConsumers(1); |
23 | container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 |
24 | //设置一个队列 |
25 | container.setQueueNames("TestDirectQueue"); |
26 | //如果同时设置多个如下: 前提是队列都是必须已经创建存在的 |
27 | // container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3"); |
28 | |
29 | |
30 | //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues |
31 | //container.setQueues(new Queue("TestDirectQueue",true)); |
32 | //container.addQueues(new Queue("TestDirectQueue2",true)); |
33 | //container.addQueues(new Queue("TestDirectQueue3",true)); |
34 | container.setMessageListener(myAckReceiver); |
35 | |
36 | return container; |
37 | } |
38 | |
39 | |
40 | } |
对应的手动确认消息监听类,MyAckReceiver.java(手动确认模式需要实现 ChannelAwareMessageListener)
1 | import com.rabbitmq.client.Channel; |
2 | import org.springframework.amqp.core.Message; |
3 | import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; |
4 | import org.springframework.stereotype.Component; |
5 | import java.util.HashMap; |
6 | import java.util.Map; |
7 | |
8 | public class MyAckReceiver implements ChannelAwareMessageListener { |
9 | |
10 | |
11 | public void onMessage(Message message, Channel channel) throws Exception { |
12 | long deliveryTag = message.getMessageProperties().getDeliveryTag(); |
13 | try { |
14 | //因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理 |
15 | String msg = message.toString(); |
16 | String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据 |
17 | Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3); |
18 | String messageId=msgMap.get("messageId"); |
19 | String messageData=msgMap.get("messageData"); |
20 | String createTime=msgMap.get("createTime"); |
21 | System.out.println(" MyAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime); |
22 | System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue()); |
23 | channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 |
24 | // channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝 |
25 | } catch (Exception e) { |
26 | channel.basicReject(deliveryTag, false); |
27 | e.printStackTrace(); |
28 | } |
29 | } |
30 | |
31 | //{key=value,key=value,key=value} 格式转换成map |
32 | private Map<String, String> mapStringToMap(String str,int entryNum ) { |
33 | str = str.substring(1, str.length() - 1); |
34 | String[] strs = str.split(",",entryNum); |
35 | Map<String, String> map = new HashMap<String, String>(); |
36 | for (String string : strs) { |
37 | String key = string.split("=")[0].trim(); |
38 | String value = string.split("=")[1]; |
39 | map.put(key, value); |
40 | } |
41 | return map; |
42 | } |
43 | } |
这时,先调用接口/sendDirectMessage, 给直连交换机xxxDirectExchange 的队列xxxDirectQueue 推送一条消息,