SignalRScreenServerHub.cs 17 KB


  1. using Grpc.Core;
  2. using Microsoft.AspNetCore.SignalR;
  3. using Microsoft.Azure.Cosmos.Linq;
  4. using Microsoft.Extensions.Primitives;
  5. using TEAMModelOS.SDK.DI;
  6. using TEAMModelOS.SDK.Extension;
  7. using TEAMModelOS.SDK;
  8. using System.Web;
  9. using System.Text;
  10. using StackExchange.Redis;
  11. using Azure.Storage.Blobs.Models;
  12. using Azure.Storage.Sas;
  13. namespace HTEX.Complex.Services
  14. {
  15. public class SignalRScreenServerHub : Hub<IClient>
  16. {
  17. private readonly ILogger<SignalRScreenServerHub> _logger;
  18. private readonly AzureRedisFactory _azureRedis;
  19. private readonly AzureStorageFactory _azureStorage;
  20. private readonly DingDing _dingDing;
  21. public SignalRScreenServerHub(AzureRedisFactory azureRedis, ILogger<SignalRScreenServerHub> logger ,AzureStorageFactory azureStorage, DingDing dingDing)
  22. {
  23. _logger = logger;
  24. _azureRedis = azureRedis;
  25. _azureStorage = azureStorage;
  26. _dingDing = dingDing;
  27. }
  28. /// <summary>
  29. /// 客户连接成功时触发
  30. /// </summary>
  31. /// <returns></returns>
  32. public override async Task OnConnectedAsync()
  33. {
  34. var connid = Context.ConnectionId;
  35. var httpContext = Context.GetHttpContext();
  36. if (httpContext != null)
  37. {
  38. //wss://www.winteach.cn/signalr/notify?grant_type=wechat_qrcode&scene=0a75aca57536490ba00fe62e27bb8f6c&id=U2MNiCFNPPuVcw2gUI_gRA
  39. //wss://www.winteach.cn/signalr/notify?grant_type=bookjs_api&clientid={clientid}&id=客户端自动生成的
  40. httpContext.Request.Query.TryGetValue("grant_type", out StringValues grant_type);
  41. httpContext.Request.Query.TryGetValue("clientid", out StringValues clientid);
  42. httpContext.Request.Query.TryGetValue("device", out StringValues _device);
  43. await Groups.AddToGroupAsync(connid, grant_type!);
  44. if (!clientid.Equals(StringValues.Empty) && !grant_type.Equals(StringValues.Empty)) {
  45. ///连接配置,并且使用钉钉 通知。
  46. ///
  47. var client = new SignalRClient
  48. {
  49. connid = connid,
  50. grant_type = grant_type,
  51. clientid= clientid
  52. };
  53. await _azureRedis.GetRedisClient(8).HashSetAsync($"SignalRClient:connects", connid, client.ToJsonString());
  54. ClientDevice device = HttpUtility.UrlDecode(_device, Encoding.Unicode).ToObject<ClientDevice>();
  55. switch (true)
  56. {
  57. case bool when grant_type.Equals(ScreenConstant.grant_type):
  58. ScreenClient screenClient ;
  59. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  60. if (value!=default && value.HasValue)
  61. {
  62. screenClient = value.ToString().ToObject<ScreenClient>();
  63. // 这里不强制设置free ,因为如果是重连,可能正在执行任务,需要等待执行完成
  64. //先检查状态是否是在忙碌,在时间戳范围里,如果不在时间戳范围,强制free。
  65. if (!screenClient.status!.Equals(ScreenConstant.idle) && screenClient.last_time + screenClient.timeout+ screenClient.delay + ScreenConstant.time_excess < DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
  66. {
  67. screenClient.status = ScreenConstant.idle;
  68. }
  69. }
  70. else
  71. {
  72. screenClient = new ScreenClient
  73. {
  74. status = ScreenConstant.idle,
  75. };
  76. }
  77. screenClient.connid=connid;
  78. screenClient.grant_type = grant_type;
  79. screenClient.clientid = clientid;
  80. screenClient.os = device.os;
  81. screenClient.port = device.port;
  82. screenClient.name = device.name;
  83. screenClient.region = device.region;
  84. screenClient.remote = device.remote;
  85. screenClient.networks = device.networks;
  86. screenClient.screenUrl = device.screenUrl;
  87. screenClient.delay = device.delay;
  88. screenClient.timeout = device.timeout;
  89. screenClient.last_time= DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  90. //连接成功,发送消息给客户端。
  91. await SendConnection(connid, new ConnectionMessage
  92. {
  93. connid=connid,
  94. clientid = clientid,
  95. status = screenClient.status,
  96. grant_type = grant_type,
  97. message_type= MessageType.conn_success,
  98. content = $"连接成功"
  99. });
  100. _logger.LogInformation($"客户端连接成功=>{screenClient.name},{screenClient.region},{clientid}:\n{screenClient.ToJsonString()}");
  101. if (screenClient.status!.Equals(ScreenConstant.idle)) {
  102. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},分发任务......");
  103. //连接成功,马上分发任务。
  104. var task = await GenPDFService.SentTask(_azureRedis,_azureStorage);
  105. if (task.genQueue!=null && task.genRedis!=null && !string.IsNullOrWhiteSpace(task.genQueue.cntName))
  106. {
  107. screenClient.status = ScreenConstant.busy;
  108. screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  109. await SendMessage(connid, new ScreenProcessMessage
  110. {
  111. connid = connid,
  112. clientid = clientid,
  113. status = ScreenConstant.busy,
  114. grant_type = grant_type,
  115. message_type= MessageType.task_send_success,
  116. content =$"{task.genQueue.ToJsonString()}",//从Redis中获取任务信息
  117. });
  118. }
  119. else {
  120. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},暂无任务可领取的任务......");
  121. if (task.genRedis!=null)
  122. {
  123. string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\ngenQueue:{task.genQueue?.ToJsonString()}\ngenRedis:{task.genRedis?.ToJsonString()}";
  124. _logger.LogInformation(msgError);
  125. await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
  126. }
  127. else {
  128. _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\n");
  129. }
  130. await SendMessage(connid, new ScreenProcessMessage
  131. {
  132. connid = connid,
  133. clientid = clientid,
  134. status = ScreenConstant.idle,
  135. grant_type = grant_type,
  136. message_type= MessageType.task_send_error,
  137. content = task.msg
  138. });
  139. }
  140. }
  141. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid, screenClient.ToJsonString());
  142. break;
  143. }
  144. }
  145. else
  146. {
  147. await SendConnection(connid, new ConnectionMessage
  148. {
  149. clientid = string.Empty,
  150. status =ScreenConstant.error,
  151. grant_type = grant_type,
  152. message_type= MessageType.conn_error,
  153. content = "客户端配置错误",
  154. connid = connid,
  155. });
  156. }
  157. }
  158. }
  159. public override async Task OnDisconnectedAsync(Exception? exception)
  160. {
  161. var connid = Context.ConnectionId;
  162. var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects", connid);
  163. _logger.LogInformation($"客户端断开连接=>{connid} ");
  164. ///连接配置,并且使用钉钉 离线通知。
  165. if (!redisData.IsNullOrEmpty)
  166. {
  167. var client = redisData.ToString().ToObject<SignalRClient>();
  168. await _azureRedis.GetRedisClient(8).HashDeleteAsync($"SignalRClient:connects", connid);
  169. if (client != null)
  170. {
  171. await Groups.RemoveFromGroupAsync(connid, client.grant_type!);
  172. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  173. if (value!=default && value.HasValue)
  174. {
  175. ScreenClient screenClient = value.ToString().ToObject<ScreenClient>() ;
  176. _logger.LogInformation($"客户端断开连接=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
  177. long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  178. // 判断是否过期
  179. if (screenClient.status!.Equals(ScreenConstant.busy ) && screenClient.last_time+screenClient.timeout+screenClient.delay+ ScreenConstant.time_excess <=now)
  180. {
  181. screenClient.status=ScreenConstant.offline;
  182. screenClient.connid= string.Empty;
  183. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid, screenClient.ToJsonString());
  184. }
  185. }
  186. }
  187. }
  188. }
  189. public async Task ReceiveMessage(ScreenProcessMessage message)
  190. {
  191. ////接收消息
  192. //如果是超时,放回队列。
  193. ///分发新任务。
  194. long nowNew = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  195. var connid = Context.ConnectionId;
  196. PDFGenQueue? task = message.content?.ToObject<PDFGenQueue>();
  197. ScreenClient screenClient = null;
  198. try
  199. { //释放客户端的忙碌状态。
  200. var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects", connid);
  201. var client = redisData.ToString().ToObject<SignalRClient>();
  202. await SendConnection(connid, new ConnectionMessage
  203. {
  204. connid=connid,
  205. clientid = client.clientid,
  206. status = ScreenConstant.idle,
  207. grant_type = client.grant_type,
  208. message_type= MessageType.conn_success,
  209. content = $"客户端空闲,等待任务分发......"
  210. });
  211. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  212. if (value!=default && value.HasValue)
  213. {
  214. screenClient = value.ToString().ToObject<ScreenClient>();
  215. screenClient.status=ScreenConstant.idle;
  216. screenClient.last_time=nowNew;
  217. screenClient.taskComplete++;
  218. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid,screenClient.ToJsonString());
  219. _logger.LogInformation($"客户端空闲,等待任务分发......=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
  220. }
  221. }
  222. catch (Exception ex)
  223. {
  224. _logger.LogError($"客户端状态重置异常,......=>{connid},{ex.Message},{ex.StackTrace}");
  225. }
  226. if (task!=null)
  227. {
  228. RedisValue redisValue = await _azureRedis.GetRedisClient(8).HashGetAsync($"PDFGen:{task.sessionId}", task.id);
  229. if (redisValue!=default)
  230. {
  231. var genRedis = redisValue.ToString().ToObject<PDFGenRedis>();
  232. genRedis.cost=nowNew-(genRedis.join+genRedis.wait);//拿到分发任务的时间,因为 等待时长=分发时的时间戳-任务生成的时间戳(join)。
  233. genRedis.status=message.result;
  234. genRedis.msg=message.msg;
  235. await _azureRedis.GetRedisClient(8).HashSetAsync($"PDFGen:{task.sessionId}", task.id, genRedis.ToJsonString());
  236. }
  237. ///如果是超时,放回队列。
  238. if (message.result==4)
  239. {
  240. await _azureRedis.GetRedisClient(8).ListLeftPushAsync($"PDFGen:Queue", task.ToJsonString());
  241. }
  242. }
  243. if (screenClient!=null && screenClient.status!.Equals(ScreenConstant.idle))
  244. {
  245. var taskData = await GenPDFService.SentTask(_azureRedis, _azureStorage);
  246. if (taskData.genQueue!=null && taskData.genRedis!=null && !string.IsNullOrWhiteSpace(taskData.genQueue.cntName))
  247. {
  248. screenClient.status = ScreenConstant.busy;
  249. screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  250. await SendMessage(connid, new ScreenProcessMessage
  251. {
  252. connid = connid,
  253. clientid =screenClient. clientid,
  254. status = ScreenConstant.busy,
  255. grant_type = screenClient.grant_type,
  256. message_type= MessageType.task_send_success,
  257. content =$"{taskData.genQueue.ToJsonString()}",//从Redis中获取任务信息
  258. });
  259. }
  260. else
  261. {
  262. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{screenClient.clientid},暂无任务可领取的任务......");
  263. if (taskData.genRedis!=null)
  264. {
  265. string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\ngenQueue:{taskData.genQueue?.ToJsonString()}\ngenRedis:{taskData.genRedis?.ToJsonString()}";
  266. _logger.LogInformation(msgError);
  267. await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
  268. }
  269. else
  270. {
  271. _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\n");
  272. }
  273. await SendMessage(connid, new ScreenProcessMessage
  274. {
  275. connid = connid,
  276. clientid = screenClient.clientid,
  277. status = ScreenConstant.idle,
  278. grant_type =screenClient. grant_type,
  279. message_type= MessageType.task_send_error,
  280. content = taskData.msg
  281. });
  282. }
  283. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", screenClient.clientid, screenClient.ToJsonString());
  284. }
  285. }
  286. public async Task SendConnection(string connectionId, MessageBody msg)
  287. {
  288. await Clients.Client(connectionId).ReceiveConnection(msg);
  289. }
  290. public async Task SendMessage(string connectionId, MessageBody msg)
  291. {
  292. await Clients.Client(connectionId).ReceiveMessage(msg);
  293. }
  294. public async Task SendDisConnection(string connectionId, MessageBody msg)
  295. {
  296. await Clients.Client(connectionId).ReceiveDisConnection(msg);
  297. }
  298. }
  299. }