using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Primitives; using TEAMModelOS.SDK.DI; using TEAMModelOS.SDK.Extension; using TEAMModelOS.SDK; using System.Web; using System.Text; using StackExchange.Redis; using Microsoft.Extensions.Logging; using TEAMModelOS.SDK.Models; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.DependencyInjection; using TEAMModelOS.SDK.DI.Device; namespace HTEX.Complex.Services { public class SignalRScreenServerHub : Hub { private readonly ILogger _logger; private readonly AzureRedisFactory _azureRedis; private readonly AzureStorageFactory _azureStorage; private readonly DingDing _dingDing; private readonly CoreDevice _device; public SignalRScreenServerHub(AzureRedisFactory azureRedis, ILogger logger, AzureStorageFactory azureStorage, DingDing dingDing, CoreDevice device) { _logger = logger; _azureRedis = azureRedis; _azureStorage = azureStorage; _dingDing = dingDing; _device=device; } /// /// 客户连接成功时触发 /// /// public override async Task OnConnectedAsync() { var serverDevice = await _device.GetCoreDevice(); var connid = Context.ConnectionId; var httpContext = Context.GetHttpContext(); if (httpContext != null) { //wss://www.winteach.cn/signalr/notify?grant_type=wechat_qrcode&scene=0a75aca57536490ba00fe62e27bb8f6c&id=U2MNiCFNPPuVcw2gUI_gRA //wss://www.winteach.cn/signalr/notify?grant_type=bookjs_api&clientid={clientid}&id=客户端自动生成的 httpContext.Request.Query.TryGetValue("grant_type", out StringValues grant_type); httpContext.Request.Query.TryGetValue("clientid", out StringValues clientid); httpContext.Request.Query.TryGetValue("device", out StringValues _device); await Groups.AddToGroupAsync(connid, grant_type!); if (!clientid.Equals(StringValues.Empty) && !grant_type.Equals(StringValues.Empty)) { ///连接配置,并且使用钉钉 通知。 /// var client = new SignalRClient { connid = connid, grant_type = grant_type, clientid= clientid, serverid=serverDevice.deviceId, }; await _azureRedis.GetRedisClient(8).HashSetAsync($"SignalRClient:connects:{serverDevice.deviceId}", connid, client.ToJsonString()); ScreenClient device = HttpUtility.UrlDecode(_device, Encoding.Unicode).ToObject(); switch (true) { case bool when grant_type.Equals(ScreenConstant.grant_type): ScreenClient screenClient ; var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid); if (value!=default && value.HasValue) { screenClient = value.ToString().ToObject(); // 这里不强制设置free ,因为如果是重连,可能正在执行任务,需要等待执行完成 //先检查状态是否是在忙碌,在时间戳范围里,如果不在时间戳范围,强制free。 if (!screenClient.status!.Equals(ScreenConstant.idle) && screenClient.last_time + screenClient.timeout+ screenClient.delay + ScreenConstant.time_excess < DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) { screenClient.status = ScreenConstant.idle; } } else { screenClient = new ScreenClient { status = ScreenConstant.idle, }; } screenClient.connid=connid; screenClient.grant_type = grant_type; screenClient.clientid = clientid; screenClient.os = device.os; screenClient.port = device.port; screenClient.name = device.name; screenClient.region = device.region; screenClient.remote = device.remote; screenClient.networks = device.networks; screenClient.screenUrl = device.screenUrl; screenClient.delay = device.delay; screenClient.timeout = device.timeout; screenClient.cpu = device.cpu; screenClient.ram = device.ram; screenClient.last_time= DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); screenClient.deviceId = device.deviceId; //连接成功,发送消息给客户端。 await SendConnection(connid, new ConnectionMessage { connid=connid, clientid = clientid, status = screenClient.status, grant_type = grant_type, message_type= MessageType.conn_success, content = $"连接成功" }); _logger.LogInformation($"客户端连接成功=>{screenClient.name},{screenClient.region},{clientid}:\n{screenClient.ToJsonString()}"); if (screenClient.status!.Equals(ScreenConstant.idle)) { _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},分发任务......"); //连接成功,马上分发任务。 var task = await TaskService.SentTask(_azureRedis,_azureStorage, _logger, serverDevice); if (task.genQueue!=null && task.genRedis!=null && !string.IsNullOrWhiteSpace(task.genQueue.cntName)) { screenClient.status = ScreenConstant.busy; screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); await SendMessage(connid, new ScreenProcessMessage { connid = connid, clientid = clientid, status = ScreenConstant.busy, grant_type = grant_type, message_type= MessageType.task_send_success, content =$"{task.genQueue.ToJsonString()}",//从Redis中获取任务信息 }); _logger.LogInformation($"分发任务:{task.genQueue.ToJsonString()}"); } else { // _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},暂无任务可领取的任务......"); if (task.genRedis!=null) { string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\ngenQueue:{task.genQueue?.ToJsonString()}\ngenRedis:{task.genRedis?.ToJsonString()}"; _logger.LogInformation(msgError); await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組); } else { _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\n"); } await SendMessage(connid, new ScreenProcessMessage { connid = connid, clientid = clientid, status = ScreenConstant.idle, grant_type = grant_type, message_type= MessageType.task_send_error, content = task.msg }); } } await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid, screenClient.ToJsonString()); break; } } else { await SendConnection(connid, new ConnectionMessage { clientid = string.Empty, status =ScreenConstant.error, grant_type = grant_type, message_type= MessageType.conn_error, content = "客户端配置错误", connid = connid, }); } } } public override async Task OnDisconnectedAsync(Exception? exception) { var serverDevice= await _device.GetCoreDevice(); var connid = Context.ConnectionId; var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects:{serverDevice.deviceId}", connid); _logger.LogInformation($"客户端断开连接=>{connid} "); ///连接配置,并且使用钉钉 离线通知。 if (!redisData.IsNullOrEmpty) { var client = redisData.ToString().ToObject(); await _azureRedis.GetRedisClient(8).HashDeleteAsync($"SignalRClient:connects:{serverDevice.deviceId}", connid); if (client != null) { await Groups.RemoveFromGroupAsync(connid, client.grant_type!); var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid); if (value!=default && value.HasValue) { ScreenClient screenClient = value.ToString().ToObject() ; _logger.LogInformation($"客户端断开连接=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} "); long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); // 判断是否过期 if (screenClient.status!.Equals(ScreenConstant.busy)) { //超时了 if (screenClient.last_time+screenClient.timeout+screenClient.delay+ ScreenConstant.time_excess <=now) { screenClient.status=ScreenConstant.offline; screenClient.connid= string.Empty; } } else { screenClient.status=ScreenConstant.offline; screenClient.connid= string.Empty; } await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid, screenClient.ToJsonString()); } } } } public async Task ReceiveMessage(ScreenProcessMessage message) { ////接收消息 //如果是超时,放回队列。 ///分发新任务。 var serverDevice = await _device.GetCoreDevice(); long nowNew = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var connid = Context.ConnectionId; PDFGenQueue? task = message.content?.ToObject(); ScreenClient screenClient = null; try { //释放客户端的忙碌状态。 var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects:{serverDevice.deviceId}", connid); var client = redisData.ToString().ToObject(); await SendConnection(connid, new ConnectionMessage { connid=connid, clientid = client.clientid, status = ScreenConstant.idle, grant_type = client.grant_type, message_type= MessageType.conn_success, content = $"客户端空闲,等待任务分发......" }); var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid); if (value!=default && value.HasValue) { screenClient = value.ToString().ToObject(); screenClient.status=ScreenConstant.idle; screenClient.last_time=nowNew; screenClient.taskComplete++; await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid,screenClient.ToJsonString()); _logger.LogInformation($"客户端空闲,等待任务分发......=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} "); } } catch (Exception ex) { _logger.LogError($"客户端状态重置异常,......=>{connid},{ex.Message},{ex.StackTrace}"); } if (task!=null) { RedisValue redisValue = await _azureRedis.GetRedisClient(8).HashGetAsync($"PDFGen:{task.sessionId}", task.id); if (redisValue!=default) { var genRedis = redisValue.ToString().ToObject(); genRedis.cost=nowNew-(genRedis.join+genRedis.wait);//拿到分发任务的时间,因为 等待时长=分发时的时间戳-任务生成的时间戳(join)。 genRedis.status=message.result; genRedis.msg=message.msg; await _azureRedis.GetRedisClient(8).HashSetAsync($"PDFGen:{task.sessionId}", task.id, genRedis.ToJsonString()); } ///如果是超时,放回队列。 if (message.result==4) { await _azureRedis.GetRedisClient(8).ListLeftPushAsync($"PDFGen:Queue:{serverDevice.deviceId}", task.ToJsonString()); } } if (screenClient!=null && screenClient.status!.Equals(ScreenConstant.idle)) { var taskData = await TaskService.SentTask(_azureRedis, _azureStorage, _logger, serverDevice); if (taskData.genQueue!=null && taskData.genRedis!=null && !string.IsNullOrWhiteSpace(taskData.genQueue.cntName)) { screenClient.status = ScreenConstant.busy; screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); await SendMessage(connid, new ScreenProcessMessage { connid = connid, clientid =screenClient. clientid, status = ScreenConstant.busy, grant_type = screenClient.grant_type, message_type= MessageType.task_send_success, content =$"{taskData.genQueue.ToJsonString()}",//从Redis中获取任务信息 }); _logger.LogInformation($"分发任务:{taskData.genQueue.ToJsonString()}"); } else { //_logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{screenClient.clientid},暂无任务可领取的任务......"); if (taskData.genRedis!=null) { string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\ngenQueue:{taskData.genQueue?.ToJsonString()}\ngenRedis:{taskData.genRedis?.ToJsonString()}"; _logger.LogInformation(msgError); await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組); } else { _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\n"); } await SendMessage(connid, new ScreenProcessMessage { connid = connid, clientid = screenClient.clientid, status = ScreenConstant.idle, grant_type =screenClient. grant_type, message_type= MessageType.task_send_error, content = taskData.msg }); } await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients:{serverDevice.deviceId}", screenClient.clientid, screenClient.ToJsonString()); } } public async Task SendConnection(string connectionId, MessageBody msg) { await Clients.Client(connectionId).ReceiveConnection(msg); } public async Task SendMessage(string connectionId, MessageBody msg) { await Clients.Client(connectionId).ReceiveMessage(msg); } public async Task SendDisConnection(string connectionId, MessageBody msg) { await Clients.Client(connectionId).ReceiveDisConnection(msg); } } }