SignalRScreenServerHub.cs 19 KB


  1. using Microsoft.AspNetCore.SignalR;
  2. using Microsoft.Extensions.Primitives;
  3. using TEAMModelOS.SDK.DI;
  4. using TEAMModelOS.SDK.Extension;
  5. using TEAMModelOS.SDK;
  6. using System.Web;
  7. using System.Text;
  8. using StackExchange.Redis;
  9. using Microsoft.Extensions.Logging;
  10. using TEAMModelOS.SDK.Models;
  11. using Google.Protobuf.WellKnownTypes;
  12. using Microsoft.Extensions.DependencyInjection;
  13. using TEAMModelOS.SDK.DI.Device;
  14. namespace HTEX.Complex.Services
  15. {
  16. public class SignalRScreenServerHub : Hub<IClient>
  17. {
  18. private readonly ILogger<SignalRScreenServerHub> _logger;
  19. private readonly AzureRedisFactory _azureRedis;
  20. private readonly AzureStorageFactory _azureStorage;
  21. private readonly DingDing _dingDing;
  22. private readonly CoreDevice _device;
  23. public SignalRScreenServerHub(AzureRedisFactory azureRedis, ILogger<SignalRScreenServerHub> logger, AzureStorageFactory azureStorage, DingDing dingDing, CoreDevice device)
  24. {
  25. _logger = logger;
  26. _azureRedis = azureRedis;
  27. _azureStorage = azureStorage;
  28. _dingDing = dingDing;
  29. _device=device;
  30. }
  31. /// <summary>
  32. /// 客户连接成功时触发
  33. /// </summary>
  34. /// <returns></returns>
  35. public override async Task OnConnectedAsync()
  36. {
  37. var serverDevice = await _device.GetCoreDevice();
  38. var connid = Context.ConnectionId;
  39. var httpContext = Context.GetHttpContext();
  40. if (httpContext != null)
  41. {
  42. //wss://www.winteach.cn/signalr/notify?grant_type=wechat_qrcode&scene=0a75aca57536490ba00fe62e27bb8f6c&id=U2MNiCFNPPuVcw2gUI_gRA
  43. //wss://www.winteach.cn/signalr/notify?grant_type=bookjs_api&clientid={clientid}&id=客户端自动生成的
  44. httpContext.Request.Query.TryGetValue("grant_type", out StringValues grant_type);
  45. httpContext.Request.Query.TryGetValue("clientid", out StringValues clientid);
  46. httpContext.Request.Query.TryGetValue("device", out StringValues _device);
  47. await Groups.AddToGroupAsync(connid, grant_type!);
  48. if (!clientid.Equals(StringValues.Empty) && !grant_type.Equals(StringValues.Empty)) {
  49. ///连接配置,并且使用钉钉 通知。
  50. ///
  51. var client = new SignalRClient
  52. {
  53. connid = connid,
  54. grant_type = grant_type,
  55. clientid= clientid,
  56. serverid=serverDevice.deviceId,
  57. };
  58. await _azureRedis.GetRedisClient(8).HashSetAsync($"SignalRClient:connects:{serverDevice.deviceId}", connid, client.ToJsonString());
  59. ScreenClient device = HttpUtility.UrlDecode(_device, Encoding.Unicode).ToObject<ScreenClient>();
  60. switch (true)
  61. {
  62. case bool when grant_type.Equals(ScreenConstant.grant_type):
  63. ScreenClient screenClient ;
  64. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid);
  65. if (value!=default && value.HasValue)
  66. {
  67. screenClient = value.ToString().ToObject<ScreenClient>();
  68. // 这里不强制设置free ,因为如果是重连,可能正在执行任务,需要等待执行完成
  69. //先检查状态是否是在忙碌,在时间戳范围里,如果不在时间戳范围,强制free。
  70. if (!screenClient.status!.Equals(ScreenConstant.idle) && screenClient.last_time + screenClient.timeout+ screenClient.delay + ScreenConstant.time_excess < DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
  71. {
  72. screenClient.status = ScreenConstant.idle;
  73. }
  74. }
  75. else
  76. {
  77. screenClient = new ScreenClient
  78. {
  79. status = ScreenConstant.idle,
  80. };
  81. }
  82. screenClient.connid=connid;
  83. screenClient.grant_type = grant_type;
  84. screenClient.clientid = clientid;
  85. screenClient.os = device.os;
  86. screenClient.port = device.port;
  87. screenClient.name = device.name;
  88. screenClient.region = device.region;
  89. screenClient.remote = device.remote;
  90. screenClient.networks = device.networks;
  91. screenClient.screenUrl = device.screenUrl;
  92. screenClient.delay = device.delay;
  93. screenClient.timeout = device.timeout;
  94. screenClient.cpu = device.cpu;
  95. screenClient.ram = device.ram;
  96. screenClient.last_time= DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  97. screenClient.deviceId = device.deviceId;
  98. //连接成功,发送消息给客户端。
  99. await SendConnection(connid, new ConnectionMessage
  100. {
  101. connid=connid,
  102. clientid = clientid,
  103. status = screenClient.status,
  104. grant_type = grant_type,
  105. message_type= MessageType.conn_success,
  106. content = $"连接成功"
  107. });
  108. _logger.LogInformation($"客户端连接成功=>{screenClient.name},{screenClient.region},{clientid}:\n{screenClient.ToJsonString()}");
  109. if (screenClient.status!.Equals(ScreenConstant.idle)) {
  110. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},分发任务......");
  111. //连接成功,马上分发任务。
  112. var task = await TaskService.SentTask(_azureRedis,_azureStorage, _logger, serverDevice);
  113. if (task.genQueue!=null && task.genRedis!=null && !string.IsNullOrWhiteSpace(task.genQueue.cntName))
  114. {
  115. screenClient.status = ScreenConstant.busy;
  116. screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  117. await SendMessage(connid, new ScreenProcessMessage
  118. {
  119. connid = connid,
  120. clientid = clientid,
  121. status = ScreenConstant.busy,
  122. grant_type = grant_type,
  123. message_type= MessageType.task_send_success,
  124. content =$"{task.genQueue.ToJsonString()}",//从Redis中获取任务信息
  125. });
  126. _logger.LogInformation($"分发任务:{task.genQueue.ToJsonString()}");
  127. }
  128. else {
  129. // _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},暂无任务可领取的任务......");
  130. if (task.genRedis!=null)
  131. {
  132. string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\ngenQueue:{task.genQueue?.ToJsonString()}\ngenRedis:{task.genRedis?.ToJsonString()}";
  133. _logger.LogInformation(msgError);
  134. await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
  135. }
  136. else {
  137. _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\n");
  138. }
  139. await SendMessage(connid, new ScreenProcessMessage
  140. {
  141. connid = connid,
  142. clientid = clientid,
  143. status = ScreenConstant.idle,
  144. grant_type = grant_type,
  145. message_type= MessageType.task_send_error,
  146. content = task.msg
  147. });
  148. }
  149. }
  150. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid, screenClient.ToJsonString());
  151. break;
  152. }
  153. }
  154. else
  155. {
  156. await SendConnection(connid, new ConnectionMessage
  157. {
  158. clientid = string.Empty,
  159. status =ScreenConstant.error,
  160. grant_type = grant_type,
  161. message_type= MessageType.conn_error,
  162. content = "客户端配置错误",
  163. connid = connid,
  164. });
  165. }
  166. }
  167. }
  168. public override async Task OnDisconnectedAsync(Exception? exception)
  169. {
  170. var serverDevice= await _device.GetCoreDevice();
  171. var connid = Context.ConnectionId;
  172. var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects:{serverDevice.deviceId}", connid);
  173. _logger.LogInformation($"客户端断开连接=>{connid} ");
  174. ///连接配置,并且使用钉钉 离线通知。
  175. if (!redisData.IsNullOrEmpty)
  176. {
  177. var client = redisData.ToString().ToObject<SignalRClient>();
  178. await _azureRedis.GetRedisClient(8).HashDeleteAsync($"SignalRClient:connects:{serverDevice.deviceId}", connid);
  179. if (client != null)
  180. {
  181. await Groups.RemoveFromGroupAsync(connid, client.grant_type!);
  182. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid);
  183. if (value!=default && value.HasValue)
  184. {
  185. ScreenClient screenClient = value.ToString().ToObject<ScreenClient>() ;
  186. _logger.LogInformation($"客户端断开连接=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
  187. long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  188. // 判断是否过期
  189. if (screenClient.status!.Equals(ScreenConstant.busy))
  190. {
  191. //超时了
  192. if (screenClient.last_time+screenClient.timeout+screenClient.delay+ ScreenConstant.time_excess <=now)
  193. {
  194. screenClient.status=ScreenConstant.offline;
  195. screenClient.connid= string.Empty;
  196. }
  197. }
  198. else {
  199. screenClient.status=ScreenConstant.offline;
  200. screenClient.connid= string.Empty;
  201. }
  202. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid, screenClient.ToJsonString());
  203. }
  204. }
  205. }
  206. }
  207. public async Task ReceiveMessage(ScreenProcessMessage message)
  208. {
  209. ////接收消息
  210. //如果是超时,放回队列。
  211. ///分发新任务。
  212. var serverDevice = await _device.GetCoreDevice();
  213. long nowNew = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  214. var connid = Context.ConnectionId;
  215. PDFGenQueue? task = message.content?.ToObject<PDFGenQueue>();
  216. ScreenClient screenClient = null;
  217. try
  218. { //释放客户端的忙碌状态。
  219. var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects:{serverDevice.deviceId}", connid);
  220. var client = redisData.ToString().ToObject<SignalRClient>();
  221. await SendConnection(connid, new ConnectionMessage
  222. {
  223. connid=connid,
  224. clientid = client.clientid,
  225. status = ScreenConstant.idle,
  226. grant_type = client.grant_type,
  227. message_type= MessageType.conn_success,
  228. content = $"客户端空闲,等待任务分发......"
  229. });
  230. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid);
  231. if (value!=default && value.HasValue)
  232. {
  233. screenClient = value.ToString().ToObject<ScreenClient>();
  234. screenClient.status=ScreenConstant.idle;
  235. screenClient.last_time=nowNew;
  236. screenClient.taskComplete++;
  237. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients:{serverDevice.deviceId}", client.clientid,screenClient.ToJsonString());
  238. _logger.LogInformation($"客户端空闲,等待任务分发......=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
  239. }
  240. }
  241. catch (Exception ex)
  242. {
  243. _logger.LogError($"客户端状态重置异常,......=>{connid},{ex.Message},{ex.StackTrace}");
  244. }
  245. if (task!=null)
  246. {
  247. RedisValue redisValue = await _azureRedis.GetRedisClient(8).HashGetAsync($"PDFGen:{task.sessionId}", task.id);
  248. if (redisValue!=default)
  249. {
  250. var genRedis = redisValue.ToString().ToObject<PDFGenRedis>();
  251. genRedis.cost=nowNew-(genRedis.join+genRedis.wait);//拿到分发任务的时间,因为 等待时长=分发时的时间戳-任务生成的时间戳(join)。
  252. genRedis.status=message.result;
  253. genRedis.msg=message.msg;
  254. await _azureRedis.GetRedisClient(8).HashSetAsync($"PDFGen:{task.sessionId}", task.id, genRedis.ToJsonString());
  255. }
  256. ///如果是超时,放回队列。
  257. if (message.result==4)
  258. {
  259. await _azureRedis.GetRedisClient(8).ListLeftPushAsync($"PDFGen:Queue:{serverDevice.deviceId}", task.ToJsonString());
  260. }
  261. }
  262. if (screenClient!=null && screenClient.status!.Equals(ScreenConstant.idle))
  263. {
  264. var taskData = await TaskService.SentTask(_azureRedis, _azureStorage, _logger, serverDevice);
  265. if (taskData.genQueue!=null && taskData.genRedis!=null && !string.IsNullOrWhiteSpace(taskData.genQueue.cntName))
  266. {
  267. screenClient.status = ScreenConstant.busy;
  268. screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  269. await SendMessage(connid, new ScreenProcessMessage
  270. {
  271. connid = connid,
  272. clientid =screenClient. clientid,
  273. status = ScreenConstant.busy,
  274. grant_type = screenClient.grant_type,
  275. message_type= MessageType.task_send_success,
  276. content =$"{taskData.genQueue.ToJsonString()}",//从Redis中获取任务信息
  277. });
  278. _logger.LogInformation($"分发任务:{taskData.genQueue.ToJsonString()}");
  279. }
  280. else
  281. {
  282. //_logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{screenClient.clientid},暂无任务可领取的任务......");
  283. if (taskData.genRedis!=null)
  284. {
  285. string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\ngenQueue:{taskData.genQueue?.ToJsonString()}\ngenRedis:{taskData.genRedis?.ToJsonString()}";
  286. _logger.LogInformation(msgError);
  287. await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
  288. }
  289. else
  290. {
  291. _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\n");
  292. }
  293. await SendMessage(connid, new ScreenProcessMessage
  294. {
  295. connid = connid,
  296. clientid = screenClient.clientid,
  297. status = ScreenConstant.idle,
  298. grant_type =screenClient. grant_type,
  299. message_type= MessageType.task_send_error,
  300. content = taskData.msg
  301. });
  302. }
  303. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients:{serverDevice.deviceId}", screenClient.clientid, screenClient.ToJsonString());
  304. }
  305. }
  306. public async Task SendConnection(string connectionId, MessageBody msg)
  307. {
  308. await Clients.Client(connectionId).ReceiveConnection(msg);
  309. }
  310. public async Task SendMessage(string connectionId, MessageBody msg)
  311. {
  312. await Clients.Client(connectionId).ReceiveMessage(msg);
  313. }
  314. public async Task SendDisConnection(string connectionId, MessageBody msg)
  315. {
  316. await Clients.Client(connectionId).ReceiveDisConnection(msg);
  317. }
  318. }
  319. }