消息队列(消息消费)
电脑版发表于:2022/2/5 22:58
前言
消息队列中几个重要角色
交换机
负责把消息放在队列当中
队列
存放消息
路由键
交换机通过路由键去找到对应的路由(多个队列通过路由键来关联)
消息消费的两种模式
1:主动模式
主动从队列中去拉取消息。
优点:消费灵活,可以随时消费
2:被动模式
一旦有消息进来,马上触发消费
优点:即时性好
消息消费(以事件的方式消费)
//创建链接工厂类 ConnectionFactory connectionFactory = new ConnectionFactory() { HostName ="localHost",UserName ="guest",Password = "guest",Port= 5672 }; services.AddSingleton<ConnectionFactory>(connectionFactory);
public IActionResult Xiaofei() { //创建链接 using (var conn = factory.CreateConnection()) { //打开一个通道 using (var channel = conn.CreateModel()) { //绑定一个通道 EventingBasicConsumer eventingBasicConsumer = new EventingBasicConsumer(channel); eventingBasicConsumer.Received += EventingBasicConsumer_Received; //使用事件的方式(队列名称,是否立马应答,绑定事件)(如果不想消息被重复读取将 false改为true) channel.BasicConsume("maoyuan", false, eventingBasicConsumer); } } return View(); } /// <summary> /// 消息消费事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void EventingBasicConsumer_Received(object sender, BasicDeliverEventArgs e) { var body = e.Body.ToArray(); var message = Encoding.UTF8.GetString(body); }
消息消费(一条一条的读取)
public IActionResult Xiaofei() { //创建链接 using (var conn = factory.CreateConnection()) { //打开一个通道 using (var channel = conn.CreateModel()) { //一行一行的获取(队列名,是否自动删除) var result = channel.BasicGet("maoyuan", false); string message= Encoding.UTF8.GetString(result.Body.ToArray()); } } return View(); }
如何遍历呢
List<string> list = new List<string>(); //获取消息总条数 uint messagecount = channel.MessageCount("maoyuan"); BasicGetResult result; //一行一行的获取(队列名,是否自动删除) for (int i = 0; i < messagecount; i++) { result = channel.BasicGet("maoyuan", false); list.Add(Encoding.UTF8.GetString(result.Body.ToArray())); }
消息队列答应模式
自动确认:
获取消息后自动删除,更方便
手动确认:
获取消息后用户自动调用是否确认收到消息,然后在删除消息,可控性更强