.netcore3.1 RabbitMq Overflow,Dead Letter Exchange,Dead Letter Routing,Max priority 电脑版发表于:2021/2/1 13:41 ![](https://img.tnblog.net/arcimg/hb/585b0f1ffa7f4c2095baa20c175b32a0.png) >#.netcore3.1 RabbitMq Overflow,Dead Letter Exchange,Dead Letter Routing,Max priority [TOC] Overflow ------------ tn>从字面意思来讲就是溢出,它将结合上一个章节的`max-length`与`max-length-bytes`配合使用。它的意思是当队列所发布的消息超出`max-length`的数量时或消息的总大小超出`max-length-bytes`的大小时,所溢出的消息应该做什么处理?这个参数有两个`Value`如下图所示: | x-overflow | 描述 | | ------------ | ------------ | | `drop-head`(默认) | 表示多余的消息直接丢弃。在代码中不填写即可,否则报错! | | `reject-publish` | 表示将暂时不让它进入队列中来,当有新的消息被消费时,仍然会重新排列到队列中去 | >### 举个栗子 tn>我这里声明队列中的最大数量为`2`,将`x-overflow`设置为`reject-publish`,并发布8条消息。代码如下: ```csharp var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 声明一个只能为2条消息的队列,并将 channel.QueueDeclare( queue: "Max_Length_Overflow_Queue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string,object>{ { "x-max-length",2 }, { "x-overflow","reject-publish" } }); var consumer = new EventingBasicConsumer(channel); // 绑定处理事件 channel.BasicConsume(queue: "Max_Length_Overflow_Queue", autoAck: false, consumer: consumer); // 处理事件 consumer.Received += (model, ea) => { // 获取消息 var body = ea.Body.ToArray(); var props = ea.BasicProperties; try { // 人事做离职处理 var message = Encoding.UTF8.GetString(body); Console.WriteLine("Tag:{0} Handle Message:{1}",ea.DeliveryTag, message); } catch (Exception e) { Console.WriteLine("Have Error:" + e.Message); } finally { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true); } }; // 发布8条消息 for (int i = 0; i < 8; i++) { var msg = Encoding.UTF8.GetBytes("Good Morning!"); channel.BasicPublish(string.Empty, "Max_Length_Overflow_Queue", null, msg); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } ``` ![](https://img.tnblog.net/arcimg/hb/11a777f7aae047fca7b6dbcb2bffedf8.png) ![](https://img.tnblog.net/arcimg/hb/f8ca6a3dfc1341feaf777833e7c8d474.png) tn>我们发现消息,并没有被丢弃而是溢出的消息等待正准备被消费的消息被消费后,依次进入到消息队列中,然后再次被消费... 死信(Dead Letter Exchange|Dead Letter Routing) ------------ >### 死信的概念 tn>队列中的消息可以`按字母顺序排列`,当发生以下任何事件时,重新发布到交换机中: | | | ------------ | | 消费者使用`basic.reject`或`basic.nack`将`requeue`参数设置为`false`来否定该消息。 | | 该消息由于每个`消息TTL`而到期 | | 邮件被删除,因为其队列超过了长度限制 | >### 死信举例 tn>我们以下面这张图做解释,当有`11`个人发起要吃饭的请求,但是高级A餐厅只能容纳`10`人吃饭,最后那一个人将会到B餐厅吃饭。我们可以将`餐厅`比作`队列`,`两家餐厅不同的地点`表示`交换机`。 ![](https://img.tnblog.net/arcimg/hb/ed6fc3aab79346658390a3667ecbfc57.png) >相关参数 | 参数 | 参数描述 | | ------------ | ------------ | | `x-dead-letter-exchange` | 消息死了后会到哪个交换机上 | | `x-dead-letter-routing-key` | 会到哪个队列上 | >### 通过Demo实现举例 tn>发布死信队列(A餐厅)。 ```csharp var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 声明一个A餐厅的对立(位置就默认的老地方就可以了,我嫌麻烦) channel.QueueDeclare( queue: "A_Centeen_Queue", durable: false, exclusive: false, autoDelete: false, arguments: new Dictionary<string,object>{ { "x-max-length",10 }, { "x-dead-letter-exchange","B_Exchange" }, { "x-dead-letter-routing-key","B_Centeen_Queue" }, }); // 发布11个人去吃饭的消息 for (int i = 0; i < 12; i++) { var msg = Encoding.UTF8.GetBytes($"Person {i}"); channel.BasicPublish(string.Empty, "A_Centeen_Queue", null, msg); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } ``` tn>再发布B餐厅交换机与队列 ```csharp var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 声明一个B餐厅位置的交换机 channel.ExchangeDeclare("B_Exchange", ExchangeType.Direct, true, false, null); // 声明一个B餐厅 channel.QueueDeclare( queue: "B_Centeen_Queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // 绑定B餐厅 channel.QueueBind("B_Centeen_Queue", "B_Exchange", "B_Centeen_Queue", null); // 我们这里只需要获取去B餐厅的那个人就可以了 var consumer = new EventingBasicConsumer(channel); // 绑定处理事件 channel.BasicConsume(queue: "B_Centeen_Queue", autoAck: false, consumer: consumer); // B餐厅消费 consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var props = ea.BasicProperties; try { var message = Encoding.UTF8.GetString(body); Console.WriteLine("B Centeen Consumer Message: {0}", message); } catch (Exception e) { Console.WriteLine("Have Error:" + e.Message); } finally { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } ``` tn>运行测试一下。 ![](https://img.tnblog.net/arcimg/hb/a91a54a7c1f249fdbacc7bc98d8759cb.png) ![](https://img.tnblog.net/arcimg/hb/9bf778362e9f4ca3853e39dea1546013.png) Max Priority(优先级) ------------ tn>简单来说`0-10`,那么`10`是最大,我们就可以按照`1-10`来设立优先级,当优先级为`10`时,我就先让它处理该消息。 | 参数 | 描述 | | ------------ | ------------ | | `x-max-priority`(默认值为`0`) | 表示声明的优先级范围,一般在`0-255`之间。但由于里面是树形接口希望越小越好。 | >### Max Priority Demo tn>示例代码如下: ```csharp var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 声明一个Priority_Queue队列 channel.QueueDeclare( queue: "Priority_Queue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string,object>{ // 设置优先级 0-10 { "x-max-priority",10 }, }); // 发布Priority_Queue队列中的消息 for (int i = 0; i < 10; i++) { var msg = Encoding.UTF8.GetBytes($"Message {i} ,Priority: 1"); var cbp = channel.CreateBasicProperties(); // 优先级都为1 cbp.Priority = 1; channel.BasicPublish(string.Empty, "Priority_Queue", cbp, msg); } var cbphigh = channel.CreateBasicProperties(); cbphigh.Priority = 10; // 发布一个高优先级消息 channel.BasicPublish(string.Empty, "Priority_Queue", cbphigh, Encoding.UTF8.GetBytes($"High Priority Message,Priority: 10")); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } ``` ![](https://img.tnblog.net/arcimg/hb/ab8e63d593144206926c8109327f4295.png) ![](https://img.tnblog.net/arcimg/hb/7d57ed50ba1e4e1bbdab881639a5846a.png)