.NET6中使用RabbitMQ详尽指南
电脑版发表于:2025/1/2 11:52
解释:RabbitMQ 是一个流行的开源消息队列系统,广泛用于实现异步通信、解耦组件、负载均衡等场景。在本篇博客中,我们将详细介绍如何在 .NET 6 中使用 RabbitMQ,包括生产者和消费者的实现,以及如何通过依赖注入来管理它们。
一、创建 .NET 6 应用
首先,确保你已经安装了 .NET 6 SDK。可以使用命令行工具创建一个新的 .NET 控制台应用:
dotnet new console -n RabbitMQDemo cd RabbitMQDemo接着,你需要安装 RabbitMQ 的客户端库,通过 NuGet 包管理器来安装 RabbitMQ.Client:
dotnet add package RabbitMQ.Client; //这里我们选择 6.4.0 版本 dotnet add package Masuit.Tools.Core; //这个为一个二、配置 RabbitMQ
接下来,我们需要定义连接 RabbitMQ 所需的配置选项。通常我们会将这些配置选项存储在一个类中。以下是配置类(RabbitMQServiceOptions.cs)的实现
/// <summary>
/// RabbitMQ服务配置
/// </summary>
public class RabbitMQServiceOptions
{
/// <summary>
/// 服务地址
/// </summary>
public string Host { get; set; }
/// <summary>
/// 端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 用户名
/// </summary>
public string UserName { get; set; }
/// <summary>
/// 密码
/// </summary>
public string Password { get; set; }
}三、实现 RabbitMQ 连接工厂为确保我们能够高效且安全地建立 RabbitMQ 连接,我们将创建一个名为 RabbitMQContext 的连接工厂类
/// <summary>
/// RabbitMQ连接工厂
/// </summary>
public class RabbitMQContext
{
private static ConnectionFactory? factory;
private static readonly object lockObj = new();
/// <summary>
/// 获取单个RabbitMQ连接
/// </summary>
/// <returns></returns>
public static IConnection GetConnection(string hostName, int port, string userName, string password)
{
if (factory == null)
{
lock (lockObj)
{
factory ??= new ConnectionFactory
{
HostName = hostName,
Port = port,
UserName = userName,
Password = password
};
}
}
return factory.CreateConnection();
}
}四、实现 RabbitMQ 生产者接下来我们来实现一个 RabbitMQ 生产者,用于发送消息到队列。创建一个名为 RabbitMQProducer 的类:
/// <summary>
/// RabbitMQ 客户端,用于发送消息到 RabbitMQ 队列或交换机。
/// </summary>
public class RabbitMQProducer
{
private readonly ILogger<RabbitMQProducer> _logger;
private readonly RabbitMQServiceOptions _options;
/// <summary>
/// 初始化 RabbitMQ 客户端。
/// </summary>
/// <param name="logger">日志记录器。</param>
/// <param name="options">RabbitMQ 连接配置。</param>
public RabbitMQProducer(ILogger<RabbitMQProducer> logger, RabbitMQServiceOptions options)
{
_logger = logger;
_options = options;
}
/****
* RabbitMQ 交换机类型说明:
* 1. Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
* 例如,如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发。
* 2. Fanout Exchange – 不处理路由键。只需将队列绑定到交换机上,发送到交换机的消息会被转发到所有绑定的队列。
* 类似于广播,所有绑定的队列都会收到消息。
* 3. Topic Exchange – 将路由键和某模式进行匹配。队列需要绑定到一个模式上。
* 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
* 例如,“audit.#”能够匹配到“audit.irs.corporate”,但“audit.*”只会匹配到“audit.irs”。
****/
/// <summary>
/// 发布消息(工作队列模式,适用于多消费者负载均衡)。
/// </summary>
/// <param name="queueName">队列名称。</param>
/// <param name="message">消息内容。</param>
public void WorkQueueSendMessage(string queueName, string message)
{
using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
using var channel = connection.CreateModel();
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送消息到指定队列
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty, routingKey: queueName, basicProperties: properties, body: body);
}
/// <summary>
/// 推送消息(简单模式,适用于单生产者和单消费者)。
/// </summary>
/// <param name="queueName">队列名称。</param>
/// <param name="message">消息内容。</param>
public void SimpleSendMessage(string queueName, string message)
{
using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
using var channel = connection.CreateModel();
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送消息到指定队列
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty, routingKey: queueName, mandatory: false, basicProperties: properties, body: body);
}
/// <summary>
/// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。
/// </summary>
/// <param name="queueName">队列名称。</param>
/// <param name="message">消息内容。</param>
public void FanoutSendMessage(string queueName, string message)
{
using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
using var channel = connection.CreateModel();
// 创建 Fanout 类型的交换机
var exchangeName = $"{queueName}_fanout_exchange";
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
// 将队列绑定到交换机,routingKey 无需指定
channel.QueueBind(queueName, exchangeName, routingKey: string.Empty);
// 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送消息到交换机,所有绑定队列都会收到消息
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body);
}
/// <summary>
/// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。
/// </summary>
/// <param name="exchangeName">交换机名称。</param>
/// <param name="queueName">队列名称。</param>
/// <param name="message">消息内容。</param>
public void FanoutSendMessage(string exchangeName, string queueName, string message)
{
using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
using var channel = connection.CreateModel();
// 创建 Fanout 类型的交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
// 将队列绑定到交换机,routingKey 无需指定
channel.QueueBind(queueName, exchangeName, routingKey: string.Empty);
// 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送消息到交换机,所有绑定队列都会收到消息
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body);
}
/// <summary>
/// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。
/// </summary>
/// <param name="queueName">队列名称。</param>
/// <param name="message">消息内容。</param>
public void DirectSendMessage(string queueName, string message)
{
using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
using var channel = connection.CreateModel();
// 创建 Direct 类型的交换机
var exchangeName = $"{queueName}_direct_exchange";
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
// 将队列绑定到交换机,并指定路由键
var routingKey = $"{queueName}";
channel.QueueBind(queueName, exchangeName, routingKey);
// 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送消息到交换机,只有路由键完全匹配的队列才会收到消息
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey, properties, body: body);
}
/// <summary>
/// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。
/// </summary>
/// <param name="exchangeName">交换机名称。</param>
/// <param name="queueName">队列名称。</param>
/// <param name="routingKey">路由键。</param>
/// <param name="message">消息内容。</param>
public void DirectSendMessage(string exchangeName, string queueName, string routingKey, string message)
{
using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
using var channel = connection.CreateModel();
// 创建 Direct 类型的交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
// 将队列绑定到交换机,并指定路由键
channel.QueueBind(queueName, exchangeName, routingKey);
// 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送消息到交换机,只有路由键完全匹配的队列才会收到消息
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey, properties, body: body);
}
/// <summary>
/// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。
/// </summary>
/// <param name="queueName">队列名称。</param>
/// <param name="routingKey">路由键。</param>
/// <param name="message">消息内容。</param>
/// <param name="bindingKeys">绑定规则(可选)。</param>
public void TopicSendMessage(string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null)
{
using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
using var channel = connection.CreateModel();
// 创建 Topic 类型的交换机
var exchangeName = $"{queueName}_topic_exchange";
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
// 将队列绑定到交换机,并指定绑定规则
if (!bindingKeys.IsNullOrEmpty())
{
foreach (string bindingKey in bindingKeys)
{
channel.QueueBind(queueName, exchangeName, bindingKey);
}
}
// 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey, properties, body: body);
}
/// <summary>
/// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。
/// </summary>
/// <param name="exchangeName">交换机名称。</param>
/// <param name="queueName">队列名称。</param>
/// <param name="routingKey">路由键。</param>
/// <param name="message">消息内容。</param>
/// <param name="bindingKeys">绑定规则(可选)。</param>
public void TopicSendMessage(string exchangeName, string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null)
{
using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
using var channel = connection.CreateModel();
// 创建 Topic 类型的交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
// 将队列绑定到交换机,并指定绑定规则
if (!bindingKeys.IsNullOrEmpty())
{
foreach (string bindingKey in bindingKeys)
{
channel.QueueBind(queueName, exchangeName, bindingKey);
}
}
// 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey, properties, body: body);
}
}五、实现 RabbitMQ 消费者接下来我们来实现一个 RabbitMQ 消费者,用于发送消息到队列。创建一个名为 RabbitMQConsumer 的类:
/// <summary>
/// RabbitMQ 消费者,用于从 RabbitMQ 队列或交换机中消费消息。
/// </summary>
public class RabbitMQConsumer
{
private readonly ILogger<RabbitMQConsumer> _logger;
private readonly RabbitMQServiceOptions _options;
/// <summary>
/// 初始化 RabbitMQ 消费者。
/// </summary>
/// <param name="logger">日志记录器。</param>
/// <param name="options">RabbitMQ 连接配置。</param>
public RabbitMQConsumer(ILogger<RabbitMQConsumer> logger, RabbitMQServiceOptions options)
{
_logger = logger;
_options = options;
}
/// <summary>
/// 简单消费者(简单模式,适用于单生产者和单消费者)。
/// </summary>
/// <param name="queueName">队列名称。</param>
/// <param name="handler">消息处理逻辑。</param>
public void SimpleConsumer(string queueName, Action<string> handler)
{
var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
var channel = connection.CreateModel();
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// 创建消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// 处理消息
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
handler(message);
};
// 开始消费,自动确认消息
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
/// <summary>
/// 消费者(工作队列模式,适用于多消费者负载均衡)。
/// </summary>
/// <param name="queueName">队列名称。</param>
/// <param name="handler">消息处理逻辑。</param>
public void WorkConsumer(string queueName, Func<string, bool> handler)
{
var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
var channel = connection.CreateModel();
// 声明队列(如果不存在则创建),并设置为持久化
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
// 设置限流,避免消费者一次性接收过多消息
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// 创建消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// 处理消息
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var result = handler(message);
// 如果消息处理成功,手动确认消息
if (result)
{
channel.BasicAck(ea.DeliveryTag, false);
}
};
// 开始消费,手动确认消息
channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
}
/// <summary>
/// 消费者(发布/订阅模式,适用于广播消息到所有绑定队列)。
/// </summary>
/// <param name="exchangeName">交换机名称。</param>
/// <param name="queueName">队列名称。</param>
/// <param name="handler">消息处理逻辑。</param>
public void PubSubConsumer(string exchangeName, string queueName, Action<string> handler)
{
var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
var channel = connection.CreateModel();
// 声明 Fanout 类型的交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
// 声明队列(如果不存在则创建),并设置为持久化
var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// 将队列绑定到交换机
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");
// 创建消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// 处理消息
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
handler(message);
// 手动确认消息
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// 开始消费,自动确认消息
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
/// <summary>
/// 消费者(路由模式,适用于路由键完全匹配的消息分发)。
/// </summary>
/// <param name="exchangeName">交换机名称。</param>
/// <param name="queueName">队列名称。</param>
/// <param name="routingKey">路由键。</param>
/// <param name="handler">消息处理逻辑。</param>
public void RoutingConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler)
{
var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
var channel = connection.CreateModel();
// 声明 Direct 类型的交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
// 声明队列(如果不存在则创建),并设置为持久化
var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// 将队列绑定到交换机,并指定路由键
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
// 创建消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// 处理消息
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
handler(message);
// 手动确认消息
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// 开始消费,自动确认消息
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
/// <summary>
/// 消费者(主题模式,适用于路由键模式匹配的消息分发)。
/// </summary>
/// <param name="exchangeName">交换机名称。</param>
/// <param name="queueName">队列名称。</param>
/// <param name="routingKey">路由键。</param>
/// <param name="handler">消息处理逻辑。</param>
public void TopicConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler)
{
var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
var channel = connection.CreateModel();
// 声明 Topic 类型的交换机
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic);
// 声明队列(如果不存在则创建),并设置为持久化
var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// 将队列绑定到交换机,并指定路由键
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
// 创建消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// 处理消息
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
handler(message);
// 手动确认消息
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// 开始消费,手动确认消息
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
}六、整合到.Net6的注入依赖:在 .NET 应用中,通常我们会使用依赖注入来管理服务的生命周期。以下是一个扩展类,用于将 RabbitMQ 的生产者和消费者注册到服务集合中:
/// <summary>
/// RabbitMQ 服务集合扩展类,用于将 RabbitMQ 客户端和监听器添加到依赖注入容器。
/// </summary>
public static class RabbitMQServiceCollectionExtensions
{
/// <summary>
/// 添加 RabbitMQ 服务到服务集合。
/// </summary>
/// <param name="services">服务集合。</param>
/// <returns>服务集合。</returns>
/// <exception cref="ArgumentNullException">当配置选项无效时抛出。</exception>
public static IServiceCollection AddRabbmitMQ(this IServiceCollection services)
{
// 从容器中获取配置的 RabbitMQServiceOptions
var serviceProvider = services.BuildServiceProvider();
var serviceOptions = serviceProvider.GetRequiredService<IOptions<RabbitMQServiceOptions>>().Value;
// 验证服务选项是否有效
if (serviceOptions == null || serviceOptions.Host.IsNullOrEmpty() || serviceOptions.Port < 0)
{
throw new ArgumentNullException(nameof(serviceOptions), "RabbitMQ service options must be provided with valid settings.");
}
// 注册 RabbitMQ 客户端作为单例服务
services.AddSingleton(sp => new RabbitMQProducer(
sp.GetRequiredService<ILogger<RabbitMQProducer>>(),
serviceOptions));
// 注册 RabbitMQ消费端作为单例服务
services.AddSingleton(sp => new RabbitMQConsumer(
sp.GetRequiredService<ILogger<RabbitMQConsumer>>(),
serviceOptions));
return services;
}
}七、注入 RabbitMQ到程序(Program.cs):var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
// 加载 RabbitMQ 配置
builder.Services.Configure<RabbitMQServiceOptions>(builder.Configuration.GetSection("RabbitMQ"));
// 添加 RabbitMQ 客户端和监听器
builder.Services.AddRabbmitMQ();
var app = builder.Build();
// Configure the HTTP request pipeline.
app.UseAuthorization();
app.MapControllers();
app.Run();八、生产者的Demo:
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
class Program
{
static void Main(string[] args)
{
// 配置 RabbitMQ 连接选项
var options = new RabbitMQServiceOptions
{
Host = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest"
};
// 创建日志记录器(这里使用控制台日志)
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<RabbitMQProducer>();
// 创建 RabbitMQ 生产者
var producer = new RabbitMQProducer(logger, options);
// 发送工作队列消息
producer.WorkQueueSendMessage("work_queue", "Hello Work Queue");
// 发送简单队列消息
producer.SimpleSendMessage("simple_queue", "Hello Simple Queue");
// 发送 Fanout 交换机消息(自动生成交换机名称)
producer.FanoutSendMessage("fanout_queue", "Hello Fanout Queue");
// 发送 Fanout 交换机消息(指定交换机名称)
producer.FanoutSendMessage("my_fanout_exchange", "fanout_queue", "Hello Fanout Queue");
// 发送 Direct 交换机消息(自动生成交换机名称)
producer.DirectSendMessage("direct_queue", "Hello Direct Queue");
// 发送 Direct 交换机消息(指定交换机名称和路由键)
producer.DirectSendMessage("my_direct_exchange", "direct_queue", "direct_key", "Hello Direct Queue");
// 发送 Topic 交换机消息(自动生成交换机名称)
producer.TopicSendMessage("topic_queue", "topic.key", "Hello Topic Queue", new List<string> { "topic.*" });
// 发送 Topic 交换机消息(指定交换机名称和路由键)
producer.TopicSendMessage("my_topic_exchange", "topic_queue", "topic.key", "Hello Topic Queue", new List<string> { "topic.*" });
Console.WriteLine("消息发送完成!");
}
}通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息发送模式。每种模式都有其特定的应用场景,例如:
工作队列模式:适用于多消费者负载均衡。简单模式:适用于单生产者和单消费者。
Fanout 交换机模式:适用于广播消息到所有绑定队列。
Direct 交换机模式:适用于路由键完全匹配的消息分发。
Topic 交换机模式:适用于路由键模式匹配的消息分发。
九、消费者Demo:
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System;
using System.Text;
class Program
{
static void Main(string[] args)
{
// 配置 RabbitMQ 连接选项
var options = new RabbitMQServiceOptions
{
Host = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest"
};
// 创建日志记录器(这里使用控制台日志)
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<RabbitMQConsumer>();
// 创建 RabbitMQ 消费者
var consumer = new RabbitMQConsumer(logger, options);
// 简单消费者
consumer.SimpleConsumer("simple_queue", message =>
{
Console.WriteLine($"接收到简单队列消息: {message}");
});
// 工作队列消费者
consumer.WorkConsumer("work_queue", message =>
{
Console.WriteLine($"接收到工作队列消息: {message}");
return true; // 处理成功,手动确认消息
});
// 发布/订阅消费者
consumer.PubSubConsumer("my_fanout_exchange", "fanout_queue", message =>
{
Console.WriteLine($"接收到发布/订阅消息: {message}");
});
// 路由消费者
consumer.RoutingConsumer("my_direct_exchange", "direct_queue", "direct_key", message =>
{
Console.WriteLine($"接收到路由消息: {message}");
});
// 主题消费者
consumer.TopicConsumer("my_topic_exchange", "topic_queue", "topic.key", message =>
{
Console.WriteLine($"接收到主题消息: {message}");
});
Console.WriteLine("消费者已启动,等待接收消息...");
Console.ReadLine(); // 保持程序运行
}
}通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息消费模式。每种模式都有其特定的应用场景,例如:
简单消费者:适用于单生产者和单消费者。工作队列消费者:适用于多消费者负载均衡。
发布/订阅消费者:适用于广播消息到所有绑定队列。
路由消费者:适用于路由键完全匹配的消息分发。
主题消费者:适用于路由键模式匹配的消息分发。
END
