SignalRScreenServerHub.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. using Microsoft.AspNetCore.SignalR;
  2. using Microsoft.Extensions.Primitives;
  3. using TEAMModelOS.SDK.DI;
  4. using TEAMModelOS.SDK.Extension;
  5. using TEAMModelOS.SDK;
  6. using System.Web;
  7. using System.Text;
  8. using StackExchange.Redis;
  9. namespace HTEX.Complex.Services
  10. {
  11. public class SignalRScreenServerHub : Hub<IClient>
  12. {
  13. private readonly ILogger<SignalRScreenServerHub> _logger;
  14. private readonly AzureRedisFactory _azureRedis;
  15. private readonly AzureStorageFactory _azureStorage;
  16. private readonly DingDing _dingDing;
  17. public SignalRScreenServerHub(AzureRedisFactory azureRedis, ILogger<SignalRScreenServerHub> logger ,AzureStorageFactory azureStorage, DingDing dingDing)
  18. {
  19. _logger = logger;
  20. _azureRedis = azureRedis;
  21. _azureStorage = azureStorage;
  22. _dingDing = dingDing;
  23. }
  24. /// <summary>
  25. /// 客户连接成功时触发
  26. /// </summary>
  27. /// <returns></returns>
  28. public override async Task OnConnectedAsync()
  29. {
  30. var connid = Context.ConnectionId;
  31. var httpContext = Context.GetHttpContext();
  32. if (httpContext != null)
  33. {
  34. //wss://www.winteach.cn/signalr/notify?grant_type=wechat_qrcode&scene=0a75aca57536490ba00fe62e27bb8f6c&id=U2MNiCFNPPuVcw2gUI_gRA
  35. //wss://www.winteach.cn/signalr/notify?grant_type=bookjs_api&clientid={clientid}&id=客户端自动生成的
  36. httpContext.Request.Query.TryGetValue("grant_type", out StringValues grant_type);
  37. httpContext.Request.Query.TryGetValue("clientid", out StringValues clientid);
  38. httpContext.Request.Query.TryGetValue("device", out StringValues _device);
  39. await Groups.AddToGroupAsync(connid, grant_type!);
  40. if (!clientid.Equals(StringValues.Empty) && !grant_type.Equals(StringValues.Empty)) {
  41. ///连接配置,并且使用钉钉 通知。
  42. ///
  43. var client = new SignalRClient
  44. {
  45. connid = connid,
  46. grant_type = grant_type,
  47. clientid= clientid
  48. };
  49. await _azureRedis.GetRedisClient(8).HashSetAsync($"SignalRClient:connects", connid, client.ToJsonString());
  50. ClientDevice device = HttpUtility.UrlDecode(_device, Encoding.Unicode).ToObject<ClientDevice>();
  51. switch (true)
  52. {
  53. case bool when grant_type.Equals(ScreenConstant.grant_type):
  54. ScreenClient screenClient ;
  55. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  56. if (value!=default && value.HasValue)
  57. {
  58. screenClient = value.ToString().ToObject<ScreenClient>();
  59. // 这里不强制设置free ,因为如果是重连,可能正在执行任务,需要等待执行完成
  60. //先检查状态是否是在忙碌,在时间戳范围里,如果不在时间戳范围,强制free。
  61. if (!screenClient.status!.Equals(ScreenConstant.idle) && screenClient.last_time + screenClient.timeout+ screenClient.delay + ScreenConstant.time_excess < DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
  62. {
  63. screenClient.status = ScreenConstant.idle;
  64. }
  65. }
  66. else
  67. {
  68. screenClient = new ScreenClient
  69. {
  70. status = ScreenConstant.idle,
  71. };
  72. }
  73. screenClient.connid=connid;
  74. screenClient.grant_type = grant_type;
  75. screenClient.clientid = clientid;
  76. screenClient.os = device.os;
  77. screenClient.port = device.port;
  78. screenClient.name = device.name;
  79. screenClient.region = device.region;
  80. screenClient.remote = device.remote;
  81. screenClient.networks = device.networks;
  82. screenClient.screenUrl = device.screenUrl;
  83. screenClient.delay = device.delay;
  84. screenClient.timeout = device.timeout;
  85. screenClient.cpu = device.cpu;
  86. screenClient.ram = device.ram;
  87. screenClient.last_time= DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  88. //连接成功,发送消息给客户端。
  89. await SendConnection(connid, new ConnectionMessage
  90. {
  91. connid=connid,
  92. clientid = clientid,
  93. status = screenClient.status,
  94. grant_type = grant_type,
  95. message_type= MessageType.conn_success,
  96. content = $"连接成功"
  97. });
  98. _logger.LogInformation($"客户端连接成功=>{screenClient.name},{screenClient.region},{clientid}:\n{screenClient.ToJsonString()}");
  99. if (screenClient.status!.Equals(ScreenConstant.idle)) {
  100. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},分发任务......");
  101. //连接成功,马上分发任务。
  102. var task = await TaskService.SentTask(_azureRedis,_azureStorage);
  103. if (task.genQueue!=null && task.genRedis!=null && !string.IsNullOrWhiteSpace(task.genQueue.cntName))
  104. {
  105. screenClient.status = ScreenConstant.busy;
  106. screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  107. await SendMessage(connid, new ScreenProcessMessage
  108. {
  109. connid = connid,
  110. clientid = clientid,
  111. status = ScreenConstant.busy,
  112. grant_type = grant_type,
  113. message_type= MessageType.task_send_success,
  114. content =$"{task.genQueue.ToJsonString()}",//从Redis中获取任务信息
  115. });
  116. }
  117. else {
  118. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},暂无任务可领取的任务......");
  119. if (task.genRedis!=null)
  120. {
  121. string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\ngenQueue:{task.genQueue?.ToJsonString()}\ngenRedis:{task.genRedis?.ToJsonString()}";
  122. _logger.LogInformation(msgError);
  123. await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
  124. }
  125. else {
  126. _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\n");
  127. }
  128. await SendMessage(connid, new ScreenProcessMessage
  129. {
  130. connid = connid,
  131. clientid = clientid,
  132. status = ScreenConstant.idle,
  133. grant_type = grant_type,
  134. message_type= MessageType.task_send_error,
  135. content = task.msg
  136. });
  137. }
  138. }
  139. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid, screenClient.ToJsonString());
  140. break;
  141. }
  142. }
  143. else
  144. {
  145. await SendConnection(connid, new ConnectionMessage
  146. {
  147. clientid = string.Empty,
  148. status =ScreenConstant.error,
  149. grant_type = grant_type,
  150. message_type= MessageType.conn_error,
  151. content = "客户端配置错误",
  152. connid = connid,
  153. });
  154. }
  155. }
  156. }
  157. public override async Task OnDisconnectedAsync(Exception? exception)
  158. {
  159. var connid = Context.ConnectionId;
  160. var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects", connid);
  161. _logger.LogInformation($"客户端断开连接=>{connid} ");
  162. ///连接配置,并且使用钉钉 离线通知。
  163. if (!redisData.IsNullOrEmpty)
  164. {
  165. var client = redisData.ToString().ToObject<SignalRClient>();
  166. await _azureRedis.GetRedisClient(8).HashDeleteAsync($"SignalRClient:connects", connid);
  167. if (client != null)
  168. {
  169. await Groups.RemoveFromGroupAsync(connid, client.grant_type!);
  170. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  171. if (value!=default && value.HasValue)
  172. {
  173. ScreenClient screenClient = value.ToString().ToObject<ScreenClient>() ;
  174. _logger.LogInformation($"客户端断开连接=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
  175. long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  176. // 判断是否过期
  177. if (screenClient.status!.Equals(ScreenConstant.busy ) && screenClient.last_time+screenClient.timeout+screenClient.delay+ ScreenConstant.time_excess <=now)
  178. {
  179. screenClient.status=ScreenConstant.offline;
  180. screenClient.connid= string.Empty;
  181. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid, screenClient.ToJsonString());
  182. }
  183. }
  184. }
  185. }
  186. }
  187. public async Task ReceiveMessage(ScreenProcessMessage message)
  188. {
  189. ////接收消息
  190. //如果是超时,放回队列。
  191. ///分发新任务。
  192. long nowNew = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  193. var connid = Context.ConnectionId;
  194. PDFGenQueue? task = message.content?.ToObject<PDFGenQueue>();
  195. ScreenClient screenClient = null;
  196. try
  197. { //释放客户端的忙碌状态。
  198. var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects", connid);
  199. var client = redisData.ToString().ToObject<SignalRClient>();
  200. await SendConnection(connid, new ConnectionMessage
  201. {
  202. connid=connid,
  203. clientid = client.clientid,
  204. status = ScreenConstant.idle,
  205. grant_type = client.grant_type,
  206. message_type= MessageType.conn_success,
  207. content = $"客户端空闲,等待任务分发......"
  208. });
  209. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  210. if (value!=default && value.HasValue)
  211. {
  212. screenClient = value.ToString().ToObject<ScreenClient>();
  213. screenClient.status=ScreenConstant.idle;
  214. screenClient.last_time=nowNew;
  215. screenClient.taskComplete++;
  216. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid,screenClient.ToJsonString());
  217. _logger.LogInformation($"客户端空闲,等待任务分发......=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
  218. }
  219. }
  220. catch (Exception ex)
  221. {
  222. _logger.LogError($"客户端状态重置异常,......=>{connid},{ex.Message},{ex.StackTrace}");
  223. }
  224. if (task!=null)
  225. {
  226. RedisValue redisValue = await _azureRedis.GetRedisClient(8).HashGetAsync($"PDFGen:{task.sessionId}", task.id);
  227. if (redisValue!=default)
  228. {
  229. var genRedis = redisValue.ToString().ToObject<PDFGenRedis>();
  230. genRedis.cost=nowNew-(genRedis.join+genRedis.wait);//拿到分发任务的时间,因为 等待时长=分发时的时间戳-任务生成的时间戳(join)。
  231. genRedis.status=message.result;
  232. genRedis.msg=message.msg;
  233. await _azureRedis.GetRedisClient(8).HashSetAsync($"PDFGen:{task.sessionId}", task.id, genRedis.ToJsonString());
  234. }
  235. ///如果是超时,放回队列。
  236. if (message.result==4)
  237. {
  238. await _azureRedis.GetRedisClient(8).ListLeftPushAsync($"PDFGen:Queue", task.ToJsonString());
  239. }
  240. }
  241. if (screenClient!=null && screenClient.status!.Equals(ScreenConstant.idle))
  242. {
  243. var taskData = await TaskService.SentTask(_azureRedis, _azureStorage);
  244. if (taskData.genQueue!=null && taskData.genRedis!=null && !string.IsNullOrWhiteSpace(taskData.genQueue.cntName))
  245. {
  246. screenClient.status = ScreenConstant.busy;
  247. screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  248. await SendMessage(connid, new ScreenProcessMessage
  249. {
  250. connid = connid,
  251. clientid =screenClient. clientid,
  252. status = ScreenConstant.busy,
  253. grant_type = screenClient.grant_type,
  254. message_type= MessageType.task_send_success,
  255. content =$"{taskData.genQueue.ToJsonString()}",//从Redis中获取任务信息
  256. });
  257. }
  258. else
  259. {
  260. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{screenClient.clientid},暂无任务可领取的任务......");
  261. if (taskData.genRedis!=null)
  262. {
  263. string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\ngenQueue:{taskData.genQueue?.ToJsonString()}\ngenRedis:{taskData.genRedis?.ToJsonString()}";
  264. _logger.LogInformation(msgError);
  265. await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
  266. }
  267. else
  268. {
  269. _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\n");
  270. }
  271. await SendMessage(connid, new ScreenProcessMessage
  272. {
  273. connid = connid,
  274. clientid = screenClient.clientid,
  275. status = ScreenConstant.idle,
  276. grant_type =screenClient. grant_type,
  277. message_type= MessageType.task_send_error,
  278. content = taskData.msg
  279. });
  280. }
  281. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", screenClient.clientid, screenClient.ToJsonString());
  282. }
  283. }
  284. public async Task SendConnection(string connectionId, MessageBody msg)
  285. {
  286. await Clients.Client(connectionId).ReceiveConnection(msg);
  287. }
  288. public async Task SendMessage(string connectionId, MessageBody msg)
  289. {
  290. await Clients.Client(connectionId).ReceiveMessage(msg);
  291. }
  292. public async Task SendDisConnection(string connectionId, MessageBody msg)
  293. {
  294. await Clients.Client(connectionId).ReceiveDisConnection(msg);
  295. }
  296. }
  297. }