Dapr .NetCore 订阅与发布(上) 电脑版发表于:2021/10/17 18:15 ![](https://img.tnblog.net/arcimg/hb/896fd38e95b346f9a0d98c54b135bb94.jpg) >#Dapr .NetCore 订阅与发布 [TOC] 介绍 ------------ tn2>Pub/Sub 是分布式系统中的一种常见模式,具有许多想要利用解耦异步消息传递的服务。使用 Pub/Sub,您可以启用事件使用者与事件生产者分离的场景。 Dapr 提供了一个具有 At-Least-Once 保证的可扩展 Pub/Sub 系统,允许开发者发布和订阅主题。Dapr 为 pub/sub 提供组件,使运营商能够使用他们喜欢的基础设施,例如 Redis Streams、Kafka 等。 工作原理 ------------ ![](https://img.tnblog.net/arcimg/hb/397c312f4fd0411099547cea8d2fb163.png) tn2>Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。 服务将消息发布到指定主题, 业务服务订阅主题以使用消息。 服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。 任何编程平台都可以使用 Dapr 本机 API 通过 HTTP 或 gRPC 调用构建基块。 若要发布消息,请进行以下 API 调用: ```bash http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic> ``` tn2>上述调用中有几个特定于 Dapr 的 URL 段: `<dapr-port>` 提供 Dapr sidecar 侦听的端口号。 `<pub-sub-name>` 提供所选 Dapr pub/sub 组件的名称。 `<topic>` 提供消息发布到的主题的名称。 设置 Pub/Sub 组件 ------------ tn2>Redis Streams 在运行时默认安装在本地机器上`dapr init`。 通过`%UserProfile%\.dapr\components\pubsub.yaml`在 Windows 或`~/.dapr/components/pubsub.yaml`Linux/MacOS上打开您的组件文件进行验证: ```yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub spec: type: pubsub.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: "" ``` 订阅主题 ------------ tn2>Dapr 支持两种订阅主题的方法: ——声明性地,订阅是在外部文件中定义的。 ——以编程方式,其中订阅在用户代码中定义。 >### 声明式订阅 tn2>在`.dapr/components`目录下,创建`subscription.yaml`来自定义资源定义 (CRD) 订阅主题。 ```yaml apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: mydepositevent-subscription spec: topic: deposit route: /TopicSub pubsubname: pubsub scopes: - topicserver ``` tn2>`deposit`是`pubsub`组件的事件订阅者。 `route` 告诉 Dapr 将所有主题消息发送到应用程序中的 `/TopicSub` 端点。 `scopes` 为 Id是 `topicserver`启用订阅 >### 代码定义 tn2>创建InvokeMethodServer API 项目,配置与依赖如下。 ```xml <ItemGroup> <PackageReference Include="Dapr.AspNetCore" Version="1.4.0" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.1" /> </ItemGroup> ``` ```bash services.AddControllers().AddDapr(); ``` tn2>在`InvokeMethodServer`项目中创建`TopicSubController`控制器和对应的订阅方法 ```csharp [ApiController] [Route("[controller]")] public class TopicSubController: ControllerBase { private readonly ILogger<TopicSubController> _logger; public TopicSubController(ILogger<TopicSubController> logger) { _logger = logger; } [Topic("pubsub", "deposit")] [HttpPost("deposit")] public ActionResult<Account> Deposit(Transaction transaction) { _logger.LogInformation($"Enter deposit {transaction.Id} {transaction.Amount}"); var result = new Account() { Id = transaction.Id, Balance = transaction.Amount }; return result; } [Topic("pubsub", "more")] [HttpPost("more")] public ActionResult<Account> More(Transaction transaction) { _logger.LogInformation($"Enter more {transaction.Id} {transaction.Amount}"); var result = new Account() { Id = transaction.Id, Balance = transaction.Amount }; return result; } } /// <summary> /// 接收的类型 /// </summary> public class Transaction { public string Id { get; set; } public decimal Amount { get; set; } } /// <summary> /// 返回的类型 /// </summary> public class Account { public string Id { get; set; } public decimal Balance { get; set; } } ``` tn2>需要在Startup的Configure中开启重复读取Body才能读取到数据 ```csharp app.Use((context, next) => { context.Request.EnableBuffering(); return next(); }); ``` tn2>`UseCloudEvents`在请求处理管道中注册云事件中间件。 `MapSubscribeHandler`注册一个端点,Dapr运行时将调用该端点来注册发布/订阅主题。除非使用pub/sub,否则不需要这样做。 >### 运行 ```bash dapr run --app-id topicserver --app-port 5002 --dapr-http-port 3500 -- dotnet run ``` ![](https://img.tnblog.net/arcimg/hb/c869054a8ae64865beb9ef444c1d054e.png) tn2>这里虽然报错单并不影响运行与工作 创建客户端控制器(PublishController) ------------ tn2>添加PublishController发布代码 ```csharp [ApiController] [Route("[controller]")] public class PublishController: ControllerBase { private readonly ILogger<PublishController> _logger; private readonly DaprClient _client; public PublishController(ILogger<PublishController> logger,DaprClient client ) { _logger = logger; _client = client; } private static readonly string pubsubName = "pubsub"; private static readonly string topicname1 = "deposit"; private static readonly string topicname2 = "more"; [HttpGet] public async Task<string> Get() { var eventData = new { Id = "17", Amount = 10m }; var metadata = new Dictionary<string, string>() { { "ttlInSeconds","10" } }; await _client.PublishEventAsync(pubsubName, topicname1, eventData,metadata); _logger.LogInformation("Published deposit event!"); return "ok"; } [HttpGet("more")] public async Task<string> GetMore() { var eventData = new { Id = "17", Amount = 10m }; var metadata = new Dictionary<string, string>() { { "ttlInSeconds","10" } }; await _client.PublishEventAsync(pubsubName, topicname2, eventData,metadata); _logger.LogInformation("Published more event!"); return "ok"; } } ``` tn2>写了两个发布事件,并将消息存在时间设置为10秒。 >### 运行与测试 ```bash dapr run --app-id myclient --app-port 5001 --dapr-http-port 3501 -- dotnet run ``` ![](https://img.tnblog.net/arcimg/hb/ca858cc98f354a6cbb64f162b7dc0cd5.png) tn2>可以使用Dapr的cli进行测试 ```bash dapr publish --publish-app-id topicserver --pubsub pubsub --topic deposit --data '{"Id":"17","Amount":10}' dapr publish --publish-app-id topicserver --pubsub pubsub --topic more --data '{"Id":"17","Amount":10}' ``` tn2>也可以用http请求的方式 ```bash curl http://localhost:5001/Publish/ curl http://localhost:5001/Publish/more ``` ![](https://img.tnblog.net/arcimg/hb/19ee9282acc3485db4a685a0d70f4677.png) ![](https://img.tnblog.net/arcimg/hb/058b051bdc8b41e5a6070a1204c31ebd.png) ![](https://img.tnblog.net/arcimg/hb/1bf14370c9c84d0789e6f1ff07c5b890.png)