.netcore3.1 RabbitMq RPC 电脑版发表于:2021/1/28 16:28 ![](https://img.tnblog.net/arcimg/hb/585b0f1ffa7f4c2095baa20c175b32a0.png) >#.netcore3.1 RabbitMq RPC [TOC] tn>如果我们需要在远程计算机上运行功能并等待结果怎么办?这种模式通常称为`远程过程调用`或`RPC`。我们举一个简单的例子,如下图所示: ![](https://img.tnblog.net/arcimg/hb/4c27f4d72512448eacf25787df04f171.png) tn>这个就图片讲得就很清楚了,我也不用再讲了。<br/> 但这就引发了一个新问题,在获取结果队列中收到响应后,尚不清楚响应属于哪个请求。<br/> 那就是使用`CorrelationId`属性的时候 。我们将为每个请求将其设置为唯一值。稍后,当我们在获取结果队列中收到消息时,我们将查看该属性,并基于此属性将响应与请求进行匹配。如果看到未知的`CorrelationId`值,则可以安全地丢弃该消息-它不属于我们的请求。 RPC服务器端 ------------ ```csharp var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 声明离职队列 channel.QueueDeclare( queue: "leave_rpc_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "leave_rpc_queue", autoAck: false, consumer: consumer); // 处理事件 consumer.Received += (model, ea) => { string response = null; // 获取消息 var body = ea.Body.ToArray(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); // 获取客户端的唯一ID replyProps.CorrelationId = props.CorrelationId; try { // 人事做离职处理 var message = Encoding.UTF8.GetString(body); Console.WriteLine("HR Handle:{0}", message); response = "The exit formalities are complete"; } catch (Exception e) { Console.WriteLine("Have Error:" + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(string.Empty, routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } ``` ![](https://img.tnblog.net/arcimg/hb/fb5ec1b066ab42e48cf5bdb692307b3e.png) 客户端 ------------ >### 创建Rpc帮助类 ```csharp public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; // 关于BlockingCollection可以参考这篇:https://www.cnblogs.com/imxiangzi/articles/2801106.html private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; var connection = factory.CreateConnection(); channel = connection.CreateModel(); // 创建临时队列 replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); // 创建 ID var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; // 回调的队列 props.ReplyTo = replyQueueName; // 处理离职结果 consumer.Received += Consumer_Received; } // 处理离职结果的方法 private void Consumer_Received(object sender, BasicDeliverEventArgs ea) { var body = ea.Body.ToArray(); var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == props.CorrelationId) { // jiang respQueue.Add(response); } } /// <summary> /// 发送消息处理 /// </summary> /// <param name="message"></param> /// <returns></returns> public string Call(string message) { // 创建离职消息 var messageBytes = Encoding.UTF8.GetBytes(message); // 发送离职请求到人事 channel.BasicPublish( exchange: string.Empty, routingKey: "leave_rpc_queue", basicProperties: props, body: messageBytes); // 绑定临时队列与处理 channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true ); // 从BlockingCollection中移除一个项。 return respQueue.Take(); } public void Close() { connection.Close(); } } ``` >### Main代码 ```csharp var rpcClient = new RpcClient(); string result = rpcClient.Call("I want to leave"); Console.WriteLine($"Result Handle: {result}"); Console.ReadLine(); ``` >### 运行结果 ![](https://img.tnblog.net/arcimg/hb/78ebddc58fbf46de942ccf8edc21dfe5.png) 有关RPC的声明 ------------ >尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。 牢记这一点,请考虑以下建议: - 确保明显的是哪个函数调用是本地的,哪个是远程的。 - 记录您的系统。明确组件之间的依赖关系。 - 处理错误案例。RPC服务器长时间关闭后,客户端应如何反应? 如有疑问,请避免使用RPC。如果可以的话,应该使用异步管道-代替类似RPC的阻塞,将结果异步推送到下一个计算阶段。