RabbitMq集群镜像同步与应用 电脑版发表于:2021/2/9 15:50 ![](https://img.tnblog.net/arcimg/hb/585b0f1ffa7f4c2095baa20c175b32a0.png) >#RabbitMq集群镜像同步与应用 [TOC] Queue存放问题 ------------ tn>队列在集群中存储有两种模式。第一种是`default`模式,它就是消息队列存储再哪儿就在哪儿。第二种是`mirror`模式,通过镜像的方式将`master`的消息队列映射到各个`slave`中去,消息在每个节点都有,实现高可用的方式。 启动镜像 ------------ tn>队列已通过`policy`启用了镜像。`policy`可以随时更改;创建一个非镜像队列是有效的,然后在以后再进行镜像(反之亦然)。非镜像队列与不具有任何镜像的镜像队列之间是有区别的-前者缺乏额外的镜像基础结构,可能会提供更高的吞吐量。 >### 设置为ha-all政策 tn>我们做一个简单的测试简单的为`test`开头的队列把参数`ha-mode`设置为`all`模式,该模式可以将队列镜像到所有的`rabbitmq`集群中。针对不同的系统,多少会有一些差异,我会列一张表在下面。重点我们放在WebUI的操作上进行。 | 系统 | 操作 | | ------------ | ------------ | | rabbitmqctl | `rabbitmqctl set_policy ha-all "^test" '{"ha-mode":"all"}'` | | rabbitmqctl (Windows) | `rabbitmqctl.bat set_policy ha-all "^test" "{""ha-mode"":""all""}"` | | HTTP API | `PUT /api/policies/%2f/ha-all {"pattern":"^test", "definition":{"ha-mode":"all"}}` | ![](https://img.tnblog.net/arcimg/hb/1f9e559a692a4fbd81039ea41d1b4772.png) ![](https://img.tnblog.net/arcimg/hb/157a35f96e8b4c2ead3217453985ad0b.png) tn>接着我就在这`queue`页面上面创建一个`test1`队列。 ![](https://img.tnblog.net/arcimg/hb/36fa4ca14a7445be93f6ca40f314c36f.png) tn>我们发现这里有个`+2`表示已经有两台子机器已经开启了镜像。 ![](https://img.tnblog.net/arcimg/hb/bfcad72759dc4e2e94ec2ed6db6d1ff5.png) >### 问题1 tn>但我们会产生一个问题,当一台机子发生了故障挂掉了,但又有新的消息发布到队列中来,然后那台机子又启动起来后是否会同步消息?我们来测试一下,先通过`./rabbitmqctl stop_app`关闭一台机器。 ![](https://img.tnblog.net/arcimg/hb/005e4d13733647b7bc6b790019e3071b.png) ![](https://img.tnblog.net/arcimg/hb/f11b760dcc8d48a38c6bde91687987f6.png) tn>然后我们可以看见这里只有一个`+1`了。 ![](https://img.tnblog.net/arcimg/hb/47be0a94ea624298a39441122b18f6c3.png) tn>接着我们在我们的队列中添加一条信息 ![](https://img.tnblog.net/arcimg/hb/5783c676b21d4fe09766356863b5629e.png) tn>然后将再次启动slave节点,我们就会发现它的数据并没有及时更新,是落后的。在图表中也出现了红色的`+1`。 ![](https://img.tnblog.net/arcimg/hb/c481991adc3d4b4e97a32b3dd8ce4d09.png) ![](https://img.tnblog.net/arcimg/hb/d219d28eb40740e5b875f54d21341036.png) ![](https://img.tnblog.net/arcimg/hb/7e2a32abfe7b4aa58cb3326e23af3b45.png) >### 自动同步策略 tn>我们可以通过参数`ha-sync-mode: automatic`设置为自动同步。我们先删除原来的策略`policy`再创建新的`policy`并把这个参数带上,随后立即生效。 ![](https://img.tnblog.net/arcimg/hb/911ddc2e25ad471c8fa0e92ea2a9e1ec.png) ![](https://img.tnblog.net/arcimg/hb/2a69cd8ca6fb432db6e51beb3a4264b8.png) ![](https://img.tnblog.net/arcimg/hb/e00124a2ea7d42bf9300d1d2b8af92e4.png) >### 指定随机机器同步策略 tn>在不同的系统上随机策略也有所不同,我也为大家列一张表。这里随机两台机器做自动同步队列与消息。 | 系统 | 操作 | | ------------ | ------------ | | rabbitmqctl | `rabbitmqctl set_policy ha-two "^test" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'` | | rabbitmqctl (Windows) | `rabbitmqctl.bat set_policy ha-two "^test" ^"{""ha-mode"":""exactly"",""ha-params"":2,""ha-sync-mode"":""automatic""}"` | > HTTP API ```bash PUT /api/policies/%2f/ha-two { "pattern":"^test", "definition": { "ha-mode":"exactly", "ha-params":2, "ha-sync-mode":"automatic" } } ``` tn>同样我们先删除`policy`政策,在创建一个组的策略,并随机两台机器做自动同步队列,交换机与消息。(添加好后立即生效,我们在UI中也发现`+2`变`+1`了) ![](https://img.tnblog.net/arcimg/hb/aad8a7affb04418ca4a685c547cdb62f.png) ![](https://img.tnblog.net/arcimg/hb/bccd233a83b748cc87fe549ee85466dc.png) >### 指定节点同步策略 tn>我们通过node参数还可以指定集群中的任意节点,当我们指定`rabbit@rabbitmqmaster`与`rabbit@rabbitmqslave1`可以按照如下进行操作 | 系统 | 操作 | | ------------ | ------------ | | rabbitmqctl | `rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmqmaster", "rabbit@rabbitmqslave1"]}'` | | rabbitmqctl (Windows) | `rabbitmqctl set_policy ha-nodes "^nodes\." "{""ha-mode"":""nodes"",""ha-params"":[""rabbit@rabbitmqmaster"", ""rabbit@rabbitmqslave1""]}"` | ![](https://img.tnblog.net/arcimg/hb/8b0feb5b445f4baebcc4cbed266ac289.png) ![](https://img.tnblog.net/arcimg/hb/d967b1da10af4e95b72cf5f02a8264ca.png) .netcore重试连接集群 ------------ tn>我们通过发布消息的方式将发布到队列中去,但在这之前我们需要了解两个连接对象里面的参数。 | 参数 | 描述 | | ------------ | ------------ | | `AutomaticRecoveryEnabled` | 如果连接断开,底层会尝试去重连rabbitmq集群... | | `TopologyRecoveryEnabled` | 当`connection`连接成功了,里面的`exchange`,`queue`,`binding`都要恢复原样。。 | tn>创建代码,并连接到这些节点中去。 ```csharp var factory = new ConnectionFactory() { UserName = "bob", Password = "bob", AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true }; // 创建一个链接 using (var connection = factory.CreateConnection(new string[] { "10.211.55.4", "10.211.55.5", "10.211.55.6" })) { // 创建一个通道 using (var channel = connection.CreateModel()) { var properties = channel.CreateBasicProperties(); // 我们创建15条消息 for (var i = 0; i < int.MaxValue; i++) { try { channel.BasicPublish( exchange: string.Empty, routingKey: "test1", basicProperties: properties, body: Encoding.UTF8.GetBytes($"新消息{i}") ); System.Console.WriteLine($"Publish: 新消息{i}"); } catch (Exception ex) { System.Console.WriteLine($"[ERROR]:{ex.Message}"); } finally { Thread.Sleep(3000); } } Console.WriteLine("Finish"); Console.ReadLine(); } } ``` ![](https://img.tnblog.net/arcimg/hb/39dfd6f5c1c44e46994fb8a9cfbc506a.png) tn>接着我们来集群中查看它连上哪台机子上了。我们发现它连上了`rabbit@rabbitmqslave2`这台机子 ![](https://img.tnblog.net/arcimg/hb/9beba0edae7d4b80bcbdf37331ff7af6.png) tn>当我们让`rabbit@rabbitmqslave2`挂掉会发生什么事。 ![](https://img.tnblog.net/arcimg/hb/f6edde3720c74c4a9ca07ddcdc039627.png) ![](https://img.tnblog.net/arcimg/hb/89030eef8e1c4e0ab88f440bbb881b1b.png) ![](https://img.tnblog.net/arcimg/hb/1016bbbbaa1c4c5baa2e4db29f0535bb.png) tn>我们发现虽然它挂掉连但我们的客户端仍然好好的。今天就讲这么多,我们休息一下。