回忆2012初秋

.NetCore3.1及以上Server-Sent Events(SSE)轻量级主动推送和Redis发布订阅

电脑版发表于:2024/12/27 16:41

1.前言

服务端推送,也称为消息推送或通知推送,是一种允许应用服务器主动将信息发送到客户端的能力,为客户端提供了实时的信息更新和通知,增强了用户体验。

服务端推送的背景与需求主要基于以下几个诉求:

实时通知:在很多情况下,用户期望实时接收到应用的通知,如记录、新订单、新支付

节省资源:如果没有服务端推送,客户端需要通过轮询的方式来获取新信息,会造成客户端、服务端的资源损耗。通过服务端推送,客户端只需要在收到通知时做出响应,大大减少了资源的消耗。

常见推送场景有:一些实时数据,订单状态、称重数据等


一、解决方案:

1、传统实时处理方案:

轮询:这是一种较为传统的方式,客户端会定时地向服务端发送请求,询问是否有新数据。服务端只需要检查数据状态,然后将结果返回给客户端。轮询的优点是实现简单,兼容性好;缺点是可能产生较大的延迟,且对服务端资源消耗较高。

长轮询(Long Polling):轮询的改进版。客户端向服务器发送请求,服务器收到请求后,如果有新的数据,立即返回给客户端;如果没有新数据,服务器会等待一定时间(比如30秒超时时间),在这段时间内,如果有新数据,就返回给客户端,否则返回空数据。客户端处理完服务器返回的响应后,再次发起新的请求,如此反复。长轮询相较于传统的轮询方式减少了请求次数,但仍然存在一定的延迟。


2、HTML5 标准引入的实时处理方案:

WebSocket:一种双向通信协议,同时支持服务端和客户端之间的实时交互。WebSocket 是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。

SSE:SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术。服务端可以使用 SSE 来向客户端推送数据,但客户端不能通过SSE向服务端发送数据。相较于 WebSocket,SSE 更简单、更轻量级,但只能实现单向通信。

两者的主要区别:
1.通信区别
    Server-Sent Events: 单向通信由服务端指向客户端
    WebSocket:             双工通信,可以交换数据
2.协议区别
    Server-Sent Events: HTTP
    WebSocket:             WebSocket
3.自动重连
    Server-Sent Events: 支持
    WebSocket:             不支持需要轮询实现
4.响应文本信息:
    Server-Sent Events: 文本格式
    WebSocket:             文本格式需要二进制编译或者自定义json文本也是可以
5.浏览器支持:
 Server-Sent Events:大部分支持,但在Internet Explorer及早期的Edge浏览器中并不被支持
 WebSocket:            主流浏览器(包括移动端)的支持较好 

二、前期准备
    由于我们是需要SSE和Redis发布与订阅实现数据的主动推送和接受前端入参实现定向订阅操作。
    我们在编写程序的时候需要使用到Redis组件:我这里推荐FreeRedis

三、代码编写
    高质量的代码往往写的很朴素且很沉默(我们前端就用HTML5,后端就使用控制台和Api了)

下面进入示例了:
1.前端代码

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE Client</title>
</head>
<body>
    <h1>Receive: <span id="sse"></span></h1>
    <script>
        const numberElement = document.getElementById("sse");
        //在这里Code是可以去自定义获取链接的和普通的Query一样,是为了定向订阅Redis的发布数据
        const source = new EventSource('http://localhost:8080/GetSSEMessage?code=12345');

        source.onmessage = (event) => {
            numberElement.innerText = event.data;
        };

        source.onerror = (error) => {
            console.error("SSE error:", error);
        };
    </script>
</body>
</html>


2.后端Redis发布消息(记得在Nuget上拉取FreeSql)

class Program
{

    static void Main()
    {

        for (int i = 0; i < 10000; i++)
        {
            RedisClient cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=2");

            //发布消息
            cli.Publish("12345", "ok");

            Console.WriteLine($"发送消息成功,等待接收{DateTime.Now:yyyy-MM-dd HH:mm:ss}");
            Thread.Sleep(2000); //10s一次
        }

    }



}

上面就是Redis发布的代码,都说了朴实无华

3.后端服务代码:
1.WeatherForecastController.cs

using FreeRedis;
using Masuit.Tools;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using SSEDemo.Dto;
using System.Runtime.CompilerServices;

namespace SSEDemo.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class WeatherForecastController : ControllerBase
    {
        private static readonly string[] Summaries = new[]
        {
            "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
        };

        private readonly ILogger<WeatherForecastController> _logger;

        private readonly IRedisClient _freeRedis;

        public WeatherForecastController(ILogger<WeatherForecastController> logger, IRedisClient freeRedis)
        {
            _logger = logger;
            _freeRedis = freeRedis;
        }

        /// <summary>
        /// 获取天气预报
        /// </summary>
        /// <returns></returns>
        [HttpGet]
        public IEnumerable<WeatherForecast> Get()
        {
            return Enumerable.Range(1, 5).Select(index => new WeatherForecast
            {
                Date = DateTime.Now.AddDays(index),
                TemperatureC = Random.Shared.Next(-20, 55),
                Summary = Summaries[Random.Shared.Next(Summaries.Length)]
            })
            .ToArray();
        }



        /// <summary>
        /// Server-Sent Event Message
        /// </summary>
        /// <param name="code">消息标识</param>
        /// <returns></returns>
        [HttpGet("GetSSEMessage")]
        [AllowAnonymous]
        public async IAsyncEnumerable<string> GetSSEMessage([FromQuery] string code, [EnumeratorCancellation] CancellationToken cancellationToken)
        {
            // 设置响应头
            Response.Headers["Content-Type"] = "text/event-stream";
            Response.Headers["Cache-Control"] = "no-cache";
            Response.Headers["Connection"] = "keep-alive";

            if (string.IsNullOrEmpty(code))
            {
                var data = CreateMessage(string.Empty, "请传入有效标识!", false);
                yield return $"data: {data.ToJsonString()}\n\n"; // 返回错误信息
                yield break; // 结束方法
            }


            IDisposable subscription = null;
            try
            {
                // 创建异步流用于心跳
                var heartbeatTask = SendHeartbeatAsync(code, cancellationToken);
          
                // 创建频道订阅
                subscription = _freeRedis.Subscribe(code, async (channel, message) =>
                {
                    var model = CreateMessage(code, string.Empty, true);
                    await WriteResponse(model.ToJsonString(), cancellationToken);
                });
                // 等待直到取消请求
                await heartbeatTask; // 等待心跳任务完成
            }
            finally
            {
                subscription?.Dispose();
            }
        }

        /// <summary>
        /// 发送心跳包
        /// </summary>
        private async Task SendHeartbeatAsync(string code, CancellationToken ct)
        {
            while (!ct.IsCancellationRequested)
            {
                await Task.Delay(5000, ct); // 每5秒发送一次心跳
                var model = CreateMessage(code, string.Empty, true);
                model.IsHeartbeat = true;
                await WriteResponse(model.ToJsonString(), ct); // 心跳消息
            }
        }

        /// <summary>
        /// 输出数据
        /// </summary>
        /// <param name="jsonData"></param>
        /// <param name="ct"></param>
        /// <returns></returns>
        private async Task WriteResponse(string jsonData, CancellationToken ct)
        {
            if (ct.IsCancellationRequested) return;

            await Response.WriteAsync($"data: {jsonData}\n\n", ct);
            await Response.Body.FlushAsync(ct);
        }

        /// <summary>
        /// 消息输出
        /// </summary>
        /// <param name="vehicleCode"></param>
        /// <param name="message"></param>
        /// <param name="isSuccess"></param>
        /// <returns></returns>
        private MessageOutDto CreateMessage(string code, string message, bool isSuccess)
        {
            return new MessageOutDto
            {
                Code = code,
                IsSuccess = isSuccess,
                Message = message,
            };
        } 
    }
}

2.MessageOutDto.cs

namespace SSEDemo.Dto
{
    public class MessageOutDto
    {
        /// <summary>
        /// 是否成功
        /// </summary>
        public bool IsSuccess { get; set; }

        /// <summary>
        /// 消息
        /// </summary>
        public string Message { get; set; }

        /// <summary>
        /// 标记
        /// </summary>
        public string Code { get; set; }

        /// <summary>
        /// 是否心跳包
        /// </summary>
        public bool IsHeartbeat { get; set; } = false;
    }
}

3.Program.cs

using FreeRedis;

var builder = WebApplication.CreateBuilder(args);


builder.Services.AddControllers();

// 注册 FreeRedis 客户端
builder.Services.AddSingleton<IRedisClient>(sp =>
{
    var redisConnectionString = "127.0.0.1:6379,password=,defaultDatabase=2";
    return new RedisClient(redisConnectionString);
});

var app = builder.Build();

// Configure the HTTP request pipeline.

app.UseAuthorization();



app.MapControllers();

app.Run();


轻量级SSE配合Redis的订阅发布,实现后端主动推送到前端的代码就完成了

关于TNBLOG
TNBLOG,技术分享。技术交流:群号677373950
ICP备案 :渝ICP备18016597号-1
App store Android
精彩评论
{{item.replyName}}
{{item.content}}
{{item.time}}
{{subpj.replyName}}
@{{subpj.beReplyName}}{{subpj.content}}
{{subpj.time}}
猜你喜欢