.netcore3.1 RabbitMq 简单运用与相关方法的介绍 电脑版发表于:2021/1/18 17:24 ![](https://img.tnblog.net/arcimg/hb/585b0f1ffa7f4c2095baa20c175b32a0.png) >#.netcore3.1 RabbitMq 简单运用与相关方法的介绍 [TOC] tn>在这里我将使用简单的生产--->加入队列--->消费,做一个简单的流程。一个应用它既可以是消息的生产者,也可以是消息的消费者。这里生产与消费我们将分开进行! 准备 ------------ tn>在这之前我先创建一个管理员账号`bob` ![](https://img.tnblog.net/arcimg/hb/a284d641e9b84da79012b86453825c68.png) 构建生产项目 ------------ >### 创建生产项目 ```bash dotnet new console --name Send mv Send/Program.cs Send/Send.cs cd Send # 添加相关依赖包 dotnet add package RabbitMQ.Client dotnet restore ``` ![](https://img.tnblog.net/arcimg/hb/d0aaeef3a0d7482fa4fc1f156fd08a3f.png) >### 添加内容 tn>添加如下代码。里面的详细的方法待会再讲。 ```csharp using System.Text; using System; using RabbitMQ.Client; namespace Send { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 在这里还应该声明一个交换机 // 声明一个队列 channel.QueueDeclare( queue:"mytestqueue", durable: false, exclusive: false, autoDelete: false, arguments: null); // 创建一个消息 string message = "Hello World"; // 编码一个消息 var body = Encoding.UTF8.GetBytes(message); // 发布一个消息 channel.BasicPublish( exchange:string.Empty, routingKey: "mytestqueue", basicProperties: null, body: body ); System.Console.WriteLine("Sent Message:{0}",message); Console.ReadLine(); } } } } } ``` >### 运行项目 ![](https://img.tnblog.net/arcimg/hb/f1656567aef249068e7f261471007b6c.png) ![](https://img.tnblog.net/arcimg/hb/070110c7517e4ca8bd68e3273c62a180.png) tn>我们发现队列现在已经有一条数据了 构建消费项目 ------------ >### 创建消费项目 ![](https://img.tnblog.net/arcimg/hb/9592e91398b145458a173058e5c44172.png) ```bash dotnet new console --name Receive mv Receive/Program.cs Receive/Receive.cs cd ../Receive dotnet add package RabbitMQ.Client dotnet restore ``` >### 添加内容 ```csharp using System.Text; using System; using RabbitMQ.Client; using System.Buffers.Text; namespace Receive { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 在这里还应该声明一个交换机 // 获取一个消息 var result = channel.BasicGet("mytestqueue",true); var msg = Encoding.UTF8.GetString(result.Body.ToArray()); Console.WriteLine($"获取到一条消息:{msg}"); } } Console.ReadLine(); } } } ``` >### 测试运行 ![](https://img.tnblog.net/arcimg/hb/6216cc2009a84770a65813b4481ac30b.png) ![](https://img.tnblog.net/arcimg/hb/b94f3b5b1f104515a71334d7f0647822.png) 连接到RabbitMQ ------------ tn>在应用程序可以使用RabbitMQ之前,它必须打开与RabbitMQ节点的连接。然后,该连接将用于执行所有后续操作。连接本来应该是长期的。为每个操作打开一个新连接,其实是非常低效的,官方也不推荐。要打开.net客户端与RabbitMQ的连接,我们首先需要创建`ConnectionFactory`的实例并将其配置主机名,虚拟主机,凭据,TLS设置以及所需要的其他参数。 然后调用`factory.CreateConnection()`方法打开一个连接。再服务器日志中可以观察到成功和失败的客户端连接事件。 | 参数 | 描述 | | ------------ | ------------ | | Username | "guest" 最好不要用"guest" | | Password | "guest" | | Virtual host | "/" | | Hostname | "localhost" | | Port | 5672是默认 | | Uri | 可以通过简单的方式描述上面的内容 | | Uri值 | Username | Password | Host | Port | Vhost | | ------------ | ------------ | ------------ | ------------ | ------------ | ------------ | | amqp://user:pass@host:10000/vhost | "user" | "pass" | "host" | 10000 | "vhost" | | amqp://user%61:%61pass@ho%61st:10000/v%2fhost | "usera" | "apass" | "hoast" | 10000 | "v/host" | | amqp:// | | | | | | | amqp://:@/ | "" | "" | | | "" | | amqp://user@ | "user" | | | | | | amqp://user:pass@ | "user" | "pass" | | | | | amqp://host | | | "host" | | | | amqp://:10000 | | | | 10000 | | | amqp:///vhost | | | | | "vhost" | | amqp://host/ | | | "host" | | | | amqp://host/%2f | | | "host" | | "/" | | amqp://[::1] | | | "[::1]" (i.e. the IPv6 address ::1) | | | tn>如果不声明交换机,将把队列默认安排到默认的交换机中去 ```csharp channel.QueueDeclare( queue:"mytestqueue", durable: false, exclusive: false, autoDelete: false, arguments: null); ``` tn>声明一个交换机和一个队列,并将它们绑定在一起 ```csharp // 创建交换机 channel.ExchangeDeclare("MyExchangeName",ExchangeType.Direct); channel.QueueDeclare( queue:"mytestqueue", durable: false, exclusive: false, autoDelete: false, arguments: null); // 绑定在一起 channel.QueueBind("mytestqueue","MyExchangeName","routingKey",null); ``` 被动声明 ------------ tn>根据名称找到队列与交换机实例是否存在,如果存在,则没有任何操作,且返回一个实例。如果不存在,则操作失败,并抛出异常。这就是`QueueDeclarePassive`与`ExchangeDeclarePassive`的作用。 ```csharp var response = channel.QueueDeclarePassive("queue-name"); // 获取消息数量 var GetMessageCount = response.MessageCount; // 获取消费者数量 var GetConsumerCount = response.ConsumerCount; ``` 删除实体和清理消息 ------------ ```csharp // 可以显示删除队列和交换机 channel.QueueDelete("queue-name",false,false); // 仅当队列为空时才可以删除它 channel.QueueDelete("queue-name",false,true); // 如果不使用(没有任何使用者才删除) channel.QueueDelete("queue-name",true,false); // 可以清除队列(删除其所有消息) channel.QueuePurge("queue-name"); ``` 发布消息 ------------ tn>如果我们想把消息发布到交换机里面,我们可以这样写 ```csharp string message = "Hello World"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange:string.Empty, routingKey: "mytestqueue", basicProperties: null, body: body ); ``` tn>我们还可以重载的变体指定强制性标志或指定消息属性。 ```csharp IBasicProperties props = channel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2; // 发布一个消息 channel.BasicPublish( exchange:string.Empty, routingKey: "mytestqueue", basicProperties: props, body: body ); ``` tn>还可以添加更多信息到标头等等,请参考`IBasicProperties`的参数