关于websocket,这里也不做过多的介绍,借用一个图,就知道了。还不了解的同志可以去网上查下其他的资料。。。

前面我们介绍了如何自定义中间件,那么今天我们实战一把,废话不多说,撸代码。
项目结构
分为三个项目,分别是中间件项目、后台服务、前端测试。
WebSocketMiddleware.csproj WebSocket中间件
WebSocketConnectionManager.cs
WebSocketExtensions.cs
WebSocketHandler.cs
WebSocketManagerMiddleware.cs
CustomMiddleware.Test.csproj 后台API服务
Controller
...
Service
WebSocketService.cs
Startup.cs
Program.cs
Custom.WebApp.csproj 前端MVC项目
wwwroot
css js lib
Controller
HomeController.cs
Views
Home
Index.html
Startup.cs
Program.cs
自定义中间件
新建一个 .NET Standard的类库项目

新建WebSocketManagerMiddleware.cs
public class WebSocketManagerMiddleware
{
private readonly RequestDelegate _next;
private WebSocketHandler _webSocketHandler { get; set; }
public WebSocketManagerMiddleware(RequestDelegate next,
WebSocketHandler webSocketHandler)
{
_next = next;
_webSocketHandler = webSocketHandler;
}
public async Task Invoke(HttpContext context)
{
//自定义逻辑
}
}
是不是很熟悉,这个和上一篇那个demo很像,WebSocketHandler是一个抽象类,提供给应用程序实现自己的一些自定义逻辑处理。
完善Invoke方法中的逻辑
public async Task Invoke(HttpContext context)
{
if (!context.WebSockets.IsWebSocketRequest)
return;
string key = context.Request.Query["Key"];
if(string.IsNullOrEmpty(key))
{
return;
}
//接受WebSocket
var socket = await context.WebSockets.AcceptWebSocketAsync();
//建立连接之后,对socket进行管理起来
_webSocketHandler.OnConnected(socket, key);
await Receive(socket, async (result, buffer) =>
{
if (result.MessageType == WebSocketMessageType.Text)
{
await _webSocketHandler.ReceiveAsync(socket, result, buffer);
return;
}
else if (result.MessageType == WebSocketMessageType.Close)
{
await _webSocketHandler.OnDisconnected(socket);
return;
}
});
}
/// <summary>
/// 接收webSocket信息
/// </summary>
/// <param name="socket"></param>
/// <param name="handleMessage"></param>
/// <returns></returns>
private async Task Receive(WebSocket socket, Action<WebSocketReceiveResult, byte[]> handleMessage)
{
try
{
var buffer = new byte[1024 * 4];
while (socket.State == WebSocketState.Open)
{
var result = await socket.ReceiveAsync(buffer: new ArraySegment<byte>(buffer),cancellationToken: CancellationToken.None);
handleMessage(result, buffer);
}
}
catch (Exception ex)
{
}
}
WebSocketHandler抽象类
public abstract class WebSocketHandler
{
public WebSocketConnectionManager WebSocketConnectionManager { get; set; }
private readonly ILogger<WebSocketHandler> _logger;
public WebSocketHandler(ILogger<WebSocketHandler> logger, WebSocketConnectionManager webSocketConnectionManager)
{
_logger = logger;
WebSocketConnectionManager = webSocketConnectionManager;
}
public virtual void OnConnected(WebSocket socket,string key)
{
_logger.LogInformation($"{key} Socket 连接上了...");
WebSocketConnectionManager.AddSocket(socket, key);
}
public virtual async Task OnDisconnected(WebSocket socket)
{
var key=WebSocketConnectionManager.GetId(socket);
_logger.LogInformation($"{key} Socket 断开了!!!");
await WebSocketConnectionManager.RemoveSocket(key);
}
/// <summary>
/// 回复某个人
/// </summary>
/// <param name="socket"></param>
/// <param name="message"></param>
/// <returns></returns>
public async Task SendMessageAsync(WebSocket socket, string message)
{
if (socket.State != WebSocketState.Open)
return;
var bytes = Encoding.UTF8.GetBytes(message);
await socket.SendAsync(buffer: new ArraySegment<byte>(array: bytes, offset: 0, count: bytes.Length), messageType: WebSocketMessageType.Text, endOfMessage: true, cancellationToken: CancellationToken.None);
}
/// <summary>
/// 给某个人发送消息
/// </summary>
/// <param name="socketId"></param>
/// <param name="message"></param>
/// <returns></returns>
public async Task SendMessageAsync(string socketId, string message)
{
try
{
await SendMessageAsync(WebSocketConnectionManager.GetSocketById(socketId), message);
}
catch (Exception)
{
}
}
/// <summary>
/// 发送给所有人
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public async Task SendMessageToAllAsync(string message)
{
foreach (var pair in WebSocketConnectionManager.GetAll())
{
if (pair.Value.State == WebSocketState.Open)
await SendMessageAsync(pair.Value, message);
}
}
/// <summary>
/// 获取一些连接
/// </summary>
/// <param name="keys"></param>
/// <returns></returns>
public IEnumerable<WebSocket> GetSomeWebsocket(string[] keys)
{
foreach (var key in keys)
{
yield return WebSocketConnectionManager.GetWebSocket(key);
}
}
/// <summary>
/// 给一堆人发消息
/// </summary>
/// <param name="webSockets"></param>
/// <param name="message"></param>
/// <returns></returns>
public async Task SendMessageToSome(WebSocket[] webSockets, string message)
{
webSockets.ToList().ForEach(async a => { await SendMessageAsync(a, message); });
}
/// <summary>
/// 抽象方法,供应用程序实现
/// </summary>
/// <param name="socket"></param>
/// <param name="result"></param>
/// <param name="buffer"></param>
/// <returns></returns>
public abstract Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer);
}
这里面包含了各种发送消息的方法供应用程序调用,以及两个虚方法OnConnected,OnDisconnected;应用程序可以根据自己的需求来重写这两个方法。还有一个ReceiveAsync的抽象方法,这是必须由应用程序实现的。至于虚方法和抽象方法的一些知识,可以去查一下资料。
然后新建管理类WebSocketConnectionManager
public class WebSocketConnectionManager
{
private ConcurrentDictionary<string, WebSocket> _sockets = new ConcurrentDictionary<string, WebSocket>();
public int GetCount()
{
return _sockets.Count;
}
public WebSocket GetSocketById(string id)
{
return _sockets.FirstOrDefault(p => p.Key == id).Value;
}
public ConcurrentDictionary<string, WebSocket> GetAll()
{
return _sockets;
}
public WebSocket GetWebSocket(string key)
{
WebSocket _socket;
_sockets.TryGetValue(key, out _socket);
return _socket;
}
public string GetId(WebSocket socket)
{
return _sockets.FirstOrDefault(p => p.Value == socket).Key;
}
public void AddSocket(WebSocket socket, string key)
{
if (GetWebSocket(key) != null)
{
_sockets.TryRemove(key, out WebSocket destoryWebsocket);
}
_sockets.TryAdd(key, socket);
}
public async Task RemoveSocket(string id)
{
try
{
WebSocket socket;
_sockets.TryRemove(id, out socket);
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
}
catch (Exception)
{
}
}
public async Task CloseSocket(WebSocket socket)
{
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
}
private string CreateConnectionId()
{
return Guid.NewGuid().ToString();
}
}
用WebSocketConnectionManager类来管理socket连接,用一个字典来存所有的websocket连接,ConcurrentDictionary是一个线程安全的字典,这样不需要加锁。其实也没啥特别的功能,就是对sokcet连接的管理,增删改查。
最后新建一个扩展类
public static class WebSocketExtensions
{
public static IApplicationBuilder MapWebSocketManager(this IApplicationBuilder app, PathString path, WebSocketHandler handler)
{
return app.Map(path, (_app) => _app.UseMiddleware<WebSocketManagerMiddleware>(handler));
}
public static IServiceCollection AddWebSocketManager(this IServiceCollection services)
{
services.AddTransient<WebSocketConnectionManager>();
foreach (var type in Assembly.GetEntryAssembly().ExportedTypes)
{
if (type.GetTypeInfo().BaseType == typeof(WebSocketHandler))
{
services.AddSingleton(type);
}
}
return services;
}
}
这样一个webSocket中间件就完成了。。。
为了更贴近实际,我做了一个前后端分离的假设,新建一个纯后端WEB API项目
新建webapi服务
这个服务除了提供webapi,主要是作为websocket的宿主服务。
新建CustomMiddleware.Test项目
添加WebSocketMiddleware引用
修改ConfigureServices方法,注入Manager服务
public void ConfigureServices(IServiceCollection services)
{
services.AddWebSocketManager();
services.AddControllers();
}
修改Configure方法,启用Websocket
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceProvider serviceProvider)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
//websocket中间件
var webSocketOptions = new WebSocketOptions()
{
KeepAliveInterval = TimeSpan.FromSeconds(120),
ReceiveBufferSize = 4 * 1024
};
app.UseWebSockets(webSocketOptions);
app.MapWebSocketManager("/ws", serviceProvider.GetService<WebSocketService>());
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
WebSocketService.cs,继承并实现了WebSocketHandler抽象类。
public class WebSocketService : WebSocketHandler
{
private readonly ILogger<WebSocketService> _logger;
public WebSocketService(ILogger<WebSocketService> logger, WebSocketConnectionManager webSocketConnectionManager) : base(logger, webSocketConnectionManager)
{
_logger = logger;
}
public override async Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer)
{
var ret = Encoding.UTF8.GetString(buffer,0,result.Count);
_logger.LogInformation($"rececive message {ret}.");
await SendMessageAsync(socket, ret);
}
}
我这里很简单的,当服务端接收到消息之后,自动返回给客户,已验证是否成功。
新建MVC项目
新建MVC Web应用程序
修改Index.cshtml
@{
ViewData["Title"] = "Home Page";
}
<form>
<div class="form-group">
<label for="url">WebSocket Url: </label>
<input type="text" class="form-control" id="url" aria-describedby="urlHelp" value="ws://localhost:5000/ws?key=test" >
<small id="urlHelp" class="form-text text-muted">WebSocket Url.</small>
</div>
<div class="form-group">
<label for="message">Message: </label>
<input type="text" class="form-control" id="message" aria-describedby="messageHelp" value="Hello world!" >
<small id="messageHelp" class="form-text text-muted">Please enter the Message you want to send.</small>
</div>
<button type="button" class="btn btn-success" id="connect">Connect</button>
<button type="button" class="btn btn-primary" id="send">Send</button>
<div class="form-group">
<label for="receiveMessage">Receive Message:</label>
<textarea class="form-control" id="receiveMessage" rows="10"></textarea>
</div>
</form>
@section Scripts
{
<script type="text/javascript">
$.extend({
socketWeb: function (opt) {
if ("WebSocket" in window) {
var setting = $.extend({
url: '',
opens: {},
messages: {},
closes: {},
err: {}
}, opt);
var ws = new WebSocket(setting.url);
ws.onopen = setting.opens;
ws.onmessage = setting.messages;
ws.onclose = setting.closes;
ws.onerror = setting.err;
return ws;
} else {
alert("Your browser does not support WebSocket!");
}
}
});
function receiveData(data) {
var msg = "Res: "+data;
var o = $("#receiveMessage").val();//旧数据,追加
if (o == "") {
$("#receiveMessage").val(msg)
}
else {
$("#receiveMessage").val(o+"\n" + msg)
}
}
//随机数
function randomNum(minNum, maxNum) {
switch (arguments.length) {
case 1:
return parseInt(Math.random() * minNum + 1, 10);
case 2:
return parseInt(Math.random() * (maxNum - minNum) + minNum, 10);
default:
return 0;
}
}
//websocket对象
var ws;
$(function () {
//连接
$("#connect").on("click", function () {
var wsurl = $("#url").val();
var reqId = randomNum(1000, 10000);
ws = $.socketWeb({
url: wsurl,
opens: function (ev) {
if (ws.readyState == 1) {
$("#connect").html("Connected");
$("#connect").attr("disabled", "disabled");
}
},
messages: function (ev) {
receiveData(ev.data);
},
closes: function (ev) {
console.log("Connect is closed.");
},
err: function (ev) {
console.log("Connect Error.");
}
});
})
//发送动作
$("#send").on("click", function () {
try {
var msg = $("#message").val();
if (ws !== undefined && ws.readyState == 1) {
ws.send(msg);
}
else {
alert("Please establish the webSocket connection first!!!")
}
} catch (e) {
console.log(e);
}
})
//心跳
setInterval(function () {
try {
var heart = '{ "reqType": "heartbreak" }';
if (ws !== undefined && ws.readyState == 1) {
ws.send(heart);
}
} catch (e) {
console.log(e);
}
}, 30000);
})
</script>
}
是不是很简单,可以测试了。
测试
运行webapi服务

运行MVC应用服务 点击【Connect】,返回输入message,点击【Send】。

除了手动发送信息之外,我还做了个心跳检查的机制,客户端每隔10s会自动发送信息给服务端。

我们从上图看出,此时服务端确实也接收到了信息,大功告成了。
其他的中间件就类似的做法。。。
如果觉得还不错的话,请动动手指点个赞,感谢各位小伙伴支持!!!
源代码下载:https://gitee.com/zhanwei103/CustomMiddleware.git




