文章内容文件目录:

rabbitmq启动命令-rabbitmq消息队列5种模式-第1张图片介绍

RabbitMQ是erlang开发设计的线程池。线程池用以程序中间的多线程合作。

rabbitmq启动命令-rabbitmq消息队列5种模式-第2张图片基本要素

信息:它由信息头和信息体构成。信息文章正文不是全透明的,而信息头由一系列可选特性构成,包含路由器键,优先,传送方式等。

上传者:信息的经营者。

互换:接受信息并将他们路由器到一个或好几个序列。默认设置网络交换机是默认设置传送数据网络交换机,名字为空 string。每一个创好的序列将全自动关联到默认设置网络交换机,而且关联的路由密钥名字与序列名字同样。

关联:根据关联将互换与序列密切相关,便于互换了解将信息路由器到哪一个序列。

序列:储存信息。该序列的特征是先进先出法。信息能够派发到一个或好几个序列。

云虚拟主机:每一个vhost实质上全是一个专业版的RabbitMQ网络服务器,有自已的序列,网络交换机,关联和管理权限体制。vhost是AMQP定义的基本,务必在衔接时特定。RabbitMQ的默认设置vhost为/。当好几个不一样的客户应用同一个RabbitMQ网络服务器给予的服务项目时,能够区划好几个VHOST,每一个人在自身的VHOST建立互换和序列。

代理商:线程池网络服务器实体线。

什么时候用MQ?

针对一些不用马上起效的实际操作,他们可能被分拆,多线程实行,并应用线程池来完成。

以普遍的订单系统为例子,客户点一下订单信息按键后的领域模型很有可能包含:扣库存量,转化成相匹配票据,发送信息通告。您能够在这个情景中应用MQ。向MQ发送信息通告多线程实行,在下订单信息的主流程(如扣库存量,转化成相匹配票据)后向MQ推送信息,那样主流程能够迅速进行,而另一个进程耗费MQ信息。

优势与劣势

缺陷:二郎不利二次开发维护保养;与卡夫卡对比,主要表现更差。生产制造和消費信息内容的货运量约为1-2万条,卡夫卡的货运量为10万条。

优势:有管理方法页面,方便使用;稳定性高;功能丰富,适用信息分布式锁,信息确定体制和各种各样信息派发体制。

互换型

Exchange依据不一样的派发对策分发送邮件。现阶段,有四种种类:立即,扇出,主题风格和文章标题。头方式依据头路由器信息。除此之外,标题文字电源开关和立即电源开关是同样的,但他们的特性要相差太多。

互换标准。

种类叙述扇出将发送至互换的任何信息路由器到关联到它的全部序列。立即路由器键= =关联键主题风格模糊匹配标题文字Exchange不依靠路由器键和关联键中间的配对标准来路由器信息,反而是依据推送的信息內容中的标题文字特性开展配对。

立即的

立即网络交换机将信息路由器到关联密匙和路由密钥彻底配对的序列。这也是彻底配对的单播方式。

rabbitmq启动命令-rabbitmq消息队列5种模式-第3张图片扇出

发送至扇出种类网络交换机的任何信息将被路由器到关联到该网络交换机的全部序列。扇出种类是分享信息的更快方法。

rabbitmq启动命令-rabbitmq消息队列5种模式-第4张图片主题风格

主题风格网络交换机应用路由密钥和关联密匙开展模糊匹配,假如配对取得成功,信息将被发送至对应的序列。路由器键和关联键是用句号“.”隔开的字符串数组,关联键中能够有两个特殊符号“*”和“#”用以模糊匹配,在其中“*”用以配对一个英语单词,而“#”用以配对好几个英语单词。

rabbitmq启动命令-rabbitmq消息队列5种模式-第5张图片射门

标题文字依据推送的信息內容中的标题文字特性转换路由器。关联序列和互换时特定一组键值对;当一条信息被发送至Exchange时,RabbitMQ将获得信息的头(也是键值对的方式),并较为键值对是不是与Queue与Exchange关联时选定的键值对彻底配对。假如彻底配对,信息将被路由器到序列;不然,它将不容易被路由器到序列。

信息遗失

信息遗失情景:从经营者到rabbtmq网络服务器的信息遗失,储存在rabbtmq网络服务器中的信息遗失,及其从rabbtmq网络服务器到顾客的信息遗失。

信息遗失能够从经营者确定体制,顾客人力确定信息和持续性三个层面来处理。

经营者确定体制。

经营者将讯息发送至序列,而且不可以保证推送的信息取得成功抵达网络服务器。

解决方案:

事务管理体制。在一条信息推送以后会使推送端堵塞,等候RabbitMQ的回复,以后才可以再次推送下一条信息。特性差。打开经营者确定体制,只需信息取得成功发送至网络交换机以后,RabbitMQ便会推送一个ack给经营者(即便信息沒有Queue接受,也会推送ack)。假如信息沒有取得成功发送至网络交换机,便会推送一条nack信息,提醒推送不成功。

在回弹力中,确定方式由上传者-确定基本参数:

spring: rabbitmq: #打开 confirm 确定体制 publisher-confirms: true

在生产制造端给予回调函数方式。网络服务器确定一条或好几条信息后,经营者会回调函数此方式,并依据实际的結果对信息开展事后解决,如再发,日志等。

// 信息是不是取得成功发送至Exchangefinal RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> { log.info("correlationData: " correlationData); log.info("ack: " ack); if(!ack) { log.info("错误处理...."); } };rabbitTemplate.setConfirmCallback(confirmCallback);

路由器没法达到的电子邮件。

经营者确定体制仅保证信息恰当抵达网络交换机,无法从网络交换机路由器到序列的信息将被丢掉,进而造成信息遗失。

针对不能路由器的信息,有2种处理方法:回到信息体制和备份数据转换。

回到信息体制。

Return信息体制给予了一个调用函数ReturnCallback,仅有当信息没法从网络交换机路由器到序列时才会被回调函数。务必设定为true才可以接听具备不能抵达路由器的信息。

spring: rabbitmq: #开启ReturnCallback务必设定mandatory=true, 不然Exchange沒有寻找Queue便会丢掉掉信息, 而不容易开启ReturnCallback template.mandatory: true

根据ReturnCallback监听没法达到的信息。

final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) -> log.info("return exchange: " exchange ", routingKey: " routingKey ", replyCode: " replyCode ", replyText: " replyText);rabbitTemplate.setReturnCallback(returnCallback);

当信息没法从网络交换机路由器到序列时,它将回到互换:,路由密钥:mail,replycode: 312,replytext: no _ route。

预留电源开关

预留网络交换机预留网络交换机是一种一般网络交换机。当您向相匹配的网络交换机推送信息时,假如与序列不配对,它会全自动迁移到备份数据网络交换机相匹配的序列中,那样信息就不容易遗失。

顾客指南信息内容确定。

有可能顾客接到信息后还不等他解决MQ服务项目就下来了,造成信息遗失。由于正常状况下,太阳龙宝宝应用全自动确定,一旦顾客接到信息,他将通告MQ网络服务器信息已被解决,MQ将删掉该信息。

解决方案:顾客被配置为手动式确定信息。在解决完逻辑性以后,顾客向代理商回应一个ack,说明信息早已被取得成功消費,能够从代理商中删掉。当messenger消費不成功时,回应代理商nack,依据配备决策是加回去或是从代理商中清除,或是进到死信队列。只需没得到客服的确定,代理商便会一直保存信息,但不容易再次查看或分发送给别的顾客。

为顾客设定手动式确定:

#设定消費端手动式 ackspring.rabbitmq.listener.simple.acknowledge-mode=manual

解决完信息后,手动式确定:

@RabbitListener(queues = RabbitMqConfig.MAIL_QUEUE) public void onMessage(Message message, Channel channel) throws IOException { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } long deliveryTag = message.getMessageProperties().getDeliveryTag(); //手工制作ack;第二个主要参数是multiple,设定为true,表明deliveryTag系列号以前(包含本身)的信息都早已接到,设成false则表明接到一条信息 channel.basicAck(deliveryTag, true); System.out.println("mail listener receive: " new String(message.getBody())); }

当信息消費不成功时,顾客回应代理商nack。假如使用人将requeue设定为false,代理商将删掉信息或在nack后进到死信队列,不然信息将加回去序列。

坚持不懈

假如RabbitMQ服务项目因出现异常而重启,信息可能遗失。RabbitMQ给予了一种长久体制,将运行内存中的信息长久储存到电脑硬盘上。即便RabbitMQ重启,信息也不会遗失。

信息持续性必须达到下述标准:

信息设定分布式锁。公布信息前,设定递送方式delivery mode为2,表明信息必须分布式锁。Queue设定分布式锁。交换机设置分布式锁。

当向网络交换机公布信息时,在向经营者推送回应以前,小兔子会将信息载入持续性日志。一旦信息从序列中被消費并被确定,RabbitMQ将从持续性日志中删掉该信息。在消費信息以前,假如RabbitMQ重新启动,网络服务器会全自动复建网络交换机和序列,并将长久日志中的信息载入到合适的序列或网络交换机中,以保证信息不容易遗失。

镜像系统序列

当MQ不成功时,它将造成服务项目不能用。引进RabbitMQ的镜像系统序列体制,将序列镜像系统到群集中的别的连接点。假如群集中的一个时间发生常见故障,它能够自行转换到镜像系统中的另一个连接点,以保证服务项目的易用性。

一般,每一个镜像系统序列包括相匹配于不一样连接点的一个主序列和好几个从序列。发送至镜像系统序列的任何信息一直立即发送至主序列和全部从序列。除开公布以外的全部姿势都将只发给关键设备,随后关键设备将指令实行的結果广播节目给从机器设备。镜像系统序列中的耗费实际操作事实上是在主网络服务器上实行的。

反复消費

信息反复有两个缘故:1。生产过程中的数据反复。消費全过程中的数据反复。

经营者向MQ推送信息时,MQ确定时存有网络不稳定,经营者沒有接到确定。这时,经营者将再次推送信息,造成MQ接受到反复的信息。

消費完成后,确定MQ时有网络不稳定,MQ未接到确定。为了更好地保证信息不容易遗失,MQ将再次向顾客传送以前的信息。这时候,顾客收到了两根完全一致的信息内容。由于反复的信息是由互联网缘故导致的,因此没法防止。

解决方法:在推送信息时,让每条信息带上一个全局性唯一的ID,在消費信息以前分辨信息是不是早已被消費,以确保信息消費逻辑性的幂等性。实际消費步骤是:

顾客获得到信息后先依据id去查看redis/db是不是存有该信息假如不会有,则一切正常消費,消費结束后载入redis/db假如存有,则证实信息被消費过,立即丢掉

客户侧过流保护

当RabbitMQ网络服务器库存积压很多信息时,序列中的信息会涌进顾客,这也许会造成顾客服务器崩溃。在这样的情形下,必须限制消费端电流量。

Spring RabbitMQ给予的主要参数预取能够设定单独要求解决的信息总数。假如顾客与此同时解决的信息做到最高值,顾客将阻拦而且不容易消費最新动态,直至有信息确定。

在局端打开过流保护:

#在单独要求中解决的信息数量,unack的最高总数spring.rabbitmq.listener.simple.prefetch=2

该设备RabbitMQ还给予了2个主要参数,预取尺寸和全局性。Spring RabbitMQ沒有这两个主要参数。

//一条信息尺寸限定,0意味着不限定//global:限定过流保护作用是channel等级的或是consumer等级。当设定为false,consumer等级,过流保护作用起效,设定为true没了过流保护作用,由于channel等级并未完成。void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

死信队列

储存不成功信息的序列。

信息消費不成功的缘故:

信息被拒绝而且信息沒有再次入队(requeue=false)信息请求超时未消費做到较大序列长短

设定死信队列的互换和序列,随后关联:

@Bean public DirectExchange dlxExchange() { return new DirectExchange(RabbitMqConfig.DLX_EXCHANGE); } @Bean public Queue dlxQueue() { return new Queue(RabbitMqConfig.DLX_QUEUE, true); } @Bean public Binding bindingDeadExchange(Queue dlxQueue, DirectExchange deadExchange) { return BindingBuilder.bind(dlxQueue).to(deadExchange).with(RabbitMqConfig.DLX_QUEUE); }

向一般序列加上2个主要参数,将一般序列关联到死信队列。当信息耗费不成功时,信息将被路由器到死信队列。

@Bean public Queue sendSmsQueue() { Map arguments = new HashMap(2); // 关联该序列到私聊网络交换机 arguments.put("x-dead-letter-exchange", RabbitMqConfig.DLX_EXCHANGE); arguments.put("x-dead-letter-routing-key", RabbitMqConfig.DLX_QUEUE); return new Queue(RabbitMqConfig.MAIL_QUEUE, true, false, false, arguments); }

详细的经营者编码:

@Component@Slf4jpublic class MQProducer { @Autowired RabbitTemplate rabbitTemplate; @Autowired RandomUtil randomUtil; @Autowired UserService userService; final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> { log.info("correlationData: " correlationData); log.info("ack: " ack); if(!ack) { log.info("错误处理...."); } }; final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) -> log.info("return exchange: " exchange ", routingKey: " routingKey ", replyCode: " replyCode ", replyText: " replyText); public void sendMail(String mail) { //好像进程不安全 范畴100000 - 999999 Integer random = randomUtil.nextInt(100000, 999999); Map map = new HashMap(2); String code = random.toString(); map.put("mail", mail); map.put("code", code); MessageProperties mp = new MessageProperties(); //在生产过程中这儿无需Message,反而是应用 fastJson 等专用工具将目标变换为 json 文件格式推送 Message msg = new Message("tyson".getBytes(), mp); msg.getMessageProperties().setExpiration("3000"); //假如消費端要设定为手工制作 ACK ,那麼生产制造端推送信息的过程中一定推送 correlationData ,而且全局性唯一,用于唯一标志信息。 CorrelationData correlationData = new CorrelationData("1234567890" new Date()); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.convertAndSend(RabbitMqConfig.MAIL_QUEUE, msg, correlationData); //存进redis userService.updateMailSendState(mail, code, MailConfig.MAIL_STATE_WAIT); }}

顾客详细编码:

@Slf4j@Componentpublic class DeadListener { @RabbitListener(queues = RabbitMqConfig.DLX_QUEUE) public void onMessage(Message message, Channel channel) throws IOException { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } long deliveryTag = message.getMessageProperties().getDeliveryTag(); //手工制作ack channel.basicAck(deliveryTag,false); System.out.println("receive--1: " new String(message.getBody())); }}

当一般序列中有死信时,RabbitMQ会全自动将信息再次公布到设定的死信网络交换机,随后路由器到死信队列。您能够接听死信队列中的信息,并相对地解决他们。

别的的

拉方式

在拉方式下,信息关键根据channel.basicGet方式获得,实例编码如下所示:

GetResponse response = channel.basicGet(QUEUE_NAME, false);System.out.println(new String(response.getBody()));channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

信息期满時间。

当生产制造端推送信息时,它能够设定信息的期满時间(ms)。

Message msg = new Message("tyson".getBytes(), mp);msg.getMessageProperties().setExpiration("3000");

您还可以在建立序列时特定序列的ttl。从信息进到序列时起,超出此時间的信息将被删掉。

参照连接

RabbitMQ慈善基金会

跳羚融合了RabbitMQ。

RabbitMQ的信息持续性。

RabbitMQ邮件发送的编码。

线上拉比难题

最终给各位共享一个GitHub库房,里边有200多本經典的计算机书籍,包含C语言,C ,Java,Python,前面,数据库查询,电脑操作系统,互联网,算法设计与优化算法,深度学习,程序编写人生道路等。你能运行它,下一次立即检索。

评论(0条)

刀客源码 游客评论