暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

自定义WebSocket中间件完整版

码农游乐场 2021-09-08
1351

关于websocket,这里也不做过多的介绍,借用一个图,就知道了。还不了解的同志可以去网上查下其他的资料。。。

前面我们介绍了如何自定义中间件,那么今天我们实战一把,废话不多说,撸代码。

  • 项目结构

分为三个项目,分别是中间件项目、后台服务、前端测试。

WebSocketMiddleware.csproj WebSocket中间件

  • WebSocketConnectionManager.cs

  • WebSocketExtensions.cs

  • WebSocketHandler.cs

  • WebSocketManagerMiddleware.cs

CustomMiddleware.Test.csproj 后台API服务

Controller

...

Service

  • WebSocketService.cs

  • Startup.cs

  • Program.cs

Custom.WebApp.csproj 前端MVC项目

wwwroot

css js lib

Controller

  • HomeController.cs

Views

Home

  • Index.html

  • Startup.cs

  • Program.cs

自定义中间件

新建一个 .NET Standard的类库项目


  • 新建WebSocketManagerMiddleware.cs

public class WebSocketManagerMiddleware
{
private readonly RequestDelegate _next;
private WebSocketHandler _webSocketHandler { get; set; }

public WebSocketManagerMiddleware(RequestDelegate next,
WebSocketHandler webSocketHandler
)

{
_next = next;
_webSocketHandler = webSocketHandler;
}

public async Task Invoke(HttpContext context)
{
//自定义逻辑
}
}

是不是很熟悉,这个和上一篇那个demo很像,WebSocketHandler是一个抽象类,提供给应用程序实现自己的一些自定义逻辑处理。

  • 完善Invoke方法中的逻辑

public async Task Invoke(HttpContext context)
{
if (!context.WebSockets.IsWebSocketRequest)
return;
string key = context.Request.Query["Key"];

if(string.IsNullOrEmpty(key))
{
return;
}
//接受WebSocket
var socket = await context.WebSockets.AcceptWebSocketAsync();

//建立连接之后,对socket进行管理起来
_webSocketHandler.OnConnected(socket, key);

await Receive(socket, async (result, buffer) =>
{
if (result.MessageType == WebSocketMessageType.Text)
{
await _webSocketHandler.ReceiveAsync(socket, result, buffer);
return;
}
else if (result.MessageType == WebSocketMessageType.Close)
{
await _webSocketHandler.OnDisconnected(socket);
return;
}
});
}

/// <summary>
/// 接收webSocket信息
/// </summary>
/// <param name="socket"></param>
/// <param name="handleMessage"></param>
/// <returns></returns>
private async Task Receive(WebSocket socket, Action<WebSocketReceiveResult, byte[]> handleMessage)
{
try
{
var buffer = new byte[1024 * 4];

while (socket.State == WebSocketState.Open)
{
var result = await socket.ReceiveAsync(buffer: new ArraySegment<byte>(buffer),cancellationToken: CancellationToken.None);

handleMessage(result, buffer);
}
}
catch (Exception ex)
{
}
}

  • WebSocketHandler抽象类

    public abstract class WebSocketHandler
{
public WebSocketConnectionManager WebSocketConnectionManager { get; set; }
private readonly ILogger<WebSocketHandler> _logger;
public WebSocketHandler(ILogger<WebSocketHandler> logger, WebSocketConnectionManager webSocketConnectionManager)
{
_logger = logger;
WebSocketConnectionManager = webSocketConnectionManager;
}

public virtual void OnConnected(WebSocket socket,string key)
{
_logger.LogInformation($"{key} Socket 连接上了...");
WebSocketConnectionManager.AddSocket(socket, key);
}

public virtual async Task OnDisconnected(WebSocket socket)
{
var key=WebSocketConnectionManager.GetId(socket);
_logger.LogInformation($"{key} Socket 断开了!!!");
await WebSocketConnectionManager.RemoveSocket(key);
}

/// <summary>
/// 回复某个人
/// </summary>
/// <param name="socket"></param>
/// <param name="message"></param>
/// <returns></returns>
public async Task SendMessageAsync(WebSocket socket, string message)
{
if (socket.State != WebSocketState.Open)
return;
var bytes = Encoding.UTF8.GetBytes(message);
await socket.SendAsync(buffer: new ArraySegment<byte>(array: bytes, offset: 0, count: bytes.Length), messageType: WebSocketMessageType.Text, endOfMessage: true, cancellationToken: CancellationToken.None);
}

/// <summary>
/// 给某个人发送消息
/// </summary>
/// <param name="socketId"></param>
/// <param name="message"></param>
/// <returns></returns>
public async Task SendMessageAsync(string socketId, string message)
{
try
{
await SendMessageAsync(WebSocketConnectionManager.GetSocketById(socketId), message);
}
catch (Exception)
{

}
}

/// <summary>
/// 发送给所有人
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public async Task SendMessageToAllAsync(string message)
{
foreach (var pair in WebSocketConnectionManager.GetAll())
{
if (pair.Value.State == WebSocketState.Open)
await SendMessageAsync(pair.Value, message);
}
}
/// <summary>
/// 获取一些连接
/// </summary>
/// <param name="keys"></param>
/// <returns></returns>
public IEnumerable<WebSocket> GetSomeWebsocket(string[] keys)
{
foreach (var key in keys)
{
yield return WebSocketConnectionManager.GetWebSocket(key);
}
}

/// <summary>
/// 给一堆人发消息
/// </summary>
/// <param name="webSockets"></param>
/// <param name="message"></param>
/// <returns></returns>
public async Task SendMessageToSome(WebSocket[] webSockets, string message)
{
webSockets.ToList().ForEach(async a => { await SendMessageAsync(a, message); });
}
/// <summary>
/// 抽象方法,供应用程序实现
/// </summary>
/// <param name="socket"></param>
/// <param name="result"></param>
/// <param name="buffer"></param>
/// <returns></returns>
public abstract Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer);
}

这里面包含了各种发送消息的方法供应用程序调用,以及两个虚方法OnConnected,OnDisconnected;应用程序可以根据自己的需求来重写这两个方法。还有一个ReceiveAsync的抽象方法,这是必须由应用程序实现的。至于虚方法和抽象方法的一些知识,可以去查一下资料。

  • 然后新建管理类WebSocketConnectionManager

    public class WebSocketConnectionManager
{
private ConcurrentDictionary<string, WebSocket> _sockets = new ConcurrentDictionary<string, WebSocket>();

public int GetCount()
{
return _sockets.Count;
}

public WebSocket GetSocketById(string id)
{
return _sockets.FirstOrDefault(p => p.Key == id).Value;
}

public ConcurrentDictionary<string, WebSocket> GetAll()
{
return _sockets;
}
public WebSocket GetWebSocket(string key)
{
WebSocket _socket;
_sockets.TryGetValue(key, out _socket);
return _socket;

}

public string GetId(WebSocket socket)
{
return _sockets.FirstOrDefault(p => p.Value == socket).Key;
}
public void AddSocket(WebSocket socket, string key)
{
if (GetWebSocket(key) != null)
{
_sockets.TryRemove(key, out WebSocket destoryWebsocket);
}
_sockets.TryAdd(key, socket);
}

public async Task RemoveSocket(string id)
{
try
{
WebSocket socket;

_sockets.TryRemove(id, out socket);
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);

}
catch (Exception)
{

}

}

public async Task CloseSocket(WebSocket socket)
{
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
}

private string CreateConnectionId()
{
return Guid.NewGuid().ToString();
}
}

用WebSocketConnectionManager类来管理socket连接,用一个字典来存所有的websocket连接,ConcurrentDictionary是一个线程安全的字典,这样不需要加锁。其实也没啥特别的功能,就是对sokcet连接的管理,增删改查。

  • 最后新建一个扩展类

public static class WebSocketExtensions
{
public static IApplicationBuilder MapWebSocketManager(this IApplicationBuilder app, PathString path, WebSocketHandler handler)
{
return app.Map(path, (_app) => _app.UseMiddleware<WebSocketManagerMiddleware>(handler));
}

public static IServiceCollection AddWebSocketManager(this IServiceCollection services)
{
services.AddTransient<WebSocketConnectionManager>();

foreach (var type in Assembly.GetEntryAssembly().ExportedTypes)
{
if (type.GetTypeInfo().BaseType == typeof(WebSocketHandler))
{
services.AddSingleton(type);
}
}
return services;
}
}

这样一个webSocket中间件就完成了。。。

为了更贴近实际,我做了一个前后端分离的假设,新建一个纯后端WEB API项目

新建webapi服务

这个服务除了提供webapi,主要是作为websocket的宿主服务。

  • 新建CustomMiddleware.Test项目

  • 添加WebSocketMiddleware引用

  • 修改ConfigureServices方法,注入Manager服务

 public void ConfigureServices(IServiceCollection services)
{
services.AddWebSocketManager();
services.AddControllers();
}

  • 修改Configure方法,启用Websocket

public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceProvider serviceProvider)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}

//websocket中间件
var webSocketOptions = new WebSocketOptions()
{
KeepAliveInterval = TimeSpan.FromSeconds(120),
ReceiveBufferSize = 4 * 1024
};

app.UseWebSockets(webSocketOptions);
app.MapWebSocketManager("/ws", serviceProvider.GetService<WebSocketService>());

app.UseRouting();

app.UseAuthorization();

app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}

  • WebSocketService.cs,继承并实现了WebSocketHandler抽象类。

public class WebSocketService : WebSocketHandler
{
private readonly ILogger<WebSocketService> _logger;

public WebSocketService(ILogger<WebSocketService> logger, WebSocketConnectionManager webSocketConnectionManager) : base(logger, webSocketConnectionManager)
{
_logger = logger;
}

public override async Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer)
{
var ret = Encoding.UTF8.GetString(buffer,0,result.Count);

_logger.LogInformation($"rececive message {ret}.");

await SendMessageAsync(socket, ret);
}
}

我这里很简单的,当服务端接收到消息之后,自动返回给客户,已验证是否成功。

新建MVC项目

  • 新建MVC Web应用程序

  • 修改Index.cshtml

@{
ViewData["Title"] = "Home Page";
}
<form>
<div class="form-group">
<label for="url">WebSocket Url: </label>
<input type="text" class="form-control" id="url" aria-describedby="urlHelp" value="ws://localhost:5000/ws?key=test" >
<small id="urlHelp" class="form-text text-muted">WebSocket Url.</small>
</div>
<div class="form-group">
<label for="message">Message: </label>
<input type="text" class="form-control" id="message" aria-describedby="messageHelp" value="Hello world!" >
<small id="messageHelp" class="form-text text-muted">Please enter the Message you want to send.</small>
</div>
<button type="button" class="btn btn-success" id="connect">Connect</button>
<button type="button" class="btn btn-primary" id="send">Send</button>

<div class="form-group">
<label for="receiveMessage">Receive Message:</label>
<textarea class="form-control" id="receiveMessage" rows="10"></textarea>
</div>
</form>

@section Scripts
{
<script type="text/javascript">
$.extend({
socketWeb: function (opt) {
if ("WebSocket" in window) {
var setting = $.extend({
url: '',
opens: {},
messages: {},
closes: {},
err: {}
}, opt);
var ws = new WebSocket(setting.url);
ws.onopen = setting.opens;
ws.onmessage = setting.messages;
ws.onclose = setting.closes;
ws.onerror = setting.err;
return ws;
} else {
alert("Your browser does not support WebSocket!");
}

}
});

function receiveData(data) {
var msg = "Res: "+data;
var o = $("#receiveMessage").val();//旧数据,追加
if (o == "") {
$("#receiveMessage").val(msg)
}
else {
$("#receiveMessage").val(o+"\n" + msg)
}
}
//随机数
function randomNum(minNum, maxNum) {
switch (arguments.length) {
case 1:
return parseInt(Math.random() * minNum + 1, 10);

case 2:
return parseInt(Math.random() * (maxNum - minNum) + minNum, 10);
default:
return 0;
}
}

//websocket对象
var ws;
$(function () {

//连接
$("#connect").on("click", function () {
var wsurl = $("#url").val();
var reqId = randomNum(1000, 10000);

ws = $.socketWeb({
url: wsurl,
opens: function (ev) {
if (ws.readyState == 1) {
$("#connect").html("Connected");
$("#connect").attr("disabled", "disabled");
}
},
messages: function (ev) {
receiveData(ev.data);
},
closes: function (ev) {
console.log("Connect is closed.");
},
err: function (ev) {
console.log("Connect Error.");
}
});
})

//发送动作
$("#send").on("click", function () {
try {
var msg = $("#message").val();
if (ws !== undefined && ws.readyState == 1) {
ws.send(msg);
}
else {
alert("Please establish the webSocket connection first!!!")
}
} catch (e) {
console.log(e);
}
})

//心跳
setInterval(function () {
try {
var heart = '{ "reqType": "heartbreak" }';
if (ws !== undefined && ws.readyState == 1) {
ws.send(heart);
}
} catch (e) {
console.log(e);
}
}, 30000);

})

</script>
}

是不是很简单,可以测试了。

测试

  • 运行webapi服务 

  • 运行MVC应用服务 点击【Connect】,返回输入message,点击【Send】。

  •  除了手动发送信息之外,我还做了个心跳检查的机制,客户端每隔10s会自动发送信息给服务端。

  •  我们从上图看出,此时服务端确实也接收到了信息,大功告成了。

其他的中间件就类似的做法。。。

如果觉得还不错的话,请动动手指点个赞,感谢各位小伙伴支持!!!

源代码下载:https://gitee.com/zhanwei103/CustomMiddleware.git 

上一篇《ASP.NET Core中间件&自定义中间件

文章转载自码农游乐场,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论