SignalRScreenServerHub.cs 17 KB


  1. using Microsoft.AspNetCore.SignalR;
  2. using Microsoft.Extensions.Primitives;
  3. using TEAMModelOS.SDK.DI;
  4. using TEAMModelOS.SDK.Extension;
  5. using TEAMModelOS.SDK;
  6. using System.Web;
  7. using System.Text;
  8. using StackExchange.Redis;
  9. namespace HTEX.Complex.Services
  10. {
  11. public class SignalRScreenServerHub : Hub<IClient>
  12. {
  13. private readonly ILogger<SignalRScreenServerHub> _logger;
  14. private readonly AzureRedisFactory _azureRedis;
  15. private readonly AzureStorageFactory _azureStorage;
  16. private readonly DingDing _dingDing;
  17. public SignalRScreenServerHub(AzureRedisFactory azureRedis, ILogger<SignalRScreenServerHub> logger ,AzureStorageFactory azureStorage, DingDing dingDing)
  18. {
  19. _logger = logger;
  20. _azureRedis = azureRedis;
  21. _azureStorage = azureStorage;
  22. _dingDing = dingDing;
  23. }
  24. /// <summary>
  25. /// 客户连接成功时触发
  26. /// </summary>
  27. /// <returns></returns>
  28. public override async Task OnConnectedAsync()
  29. {
  30. var connid = Context.ConnectionId;
  31. var httpContext = Context.GetHttpContext();
  32. if (httpContext != null)
  33. {
  34. //wss://www.winteach.cn/signalr/notify?grant_type=wechat_qrcode&scene=0a75aca57536490ba00fe62e27bb8f6c&id=U2MNiCFNPPuVcw2gUI_gRA
  35. //wss://www.winteach.cn/signalr/notify?grant_type=bookjs_api&clientid={clientid}&id=客户端自动生成的
  36. httpContext.Request.Query.TryGetValue("grant_type", out StringValues grant_type);
  37. httpContext.Request.Query.TryGetValue("clientid", out StringValues clientid);
  38. httpContext.Request.Query.TryGetValue("device", out StringValues _device);
  39. await Groups.AddToGroupAsync(connid, grant_type!);
  40. if (!clientid.Equals(StringValues.Empty) && !grant_type.Equals(StringValues.Empty)) {
  41. ///连接配置,并且使用钉钉 通知。
  42. ///
  43. var client = new SignalRClient
  44. {
  45. connid = connid,
  46. grant_type = grant_type,
  47. clientid= clientid
  48. };
  49. await _azureRedis.GetRedisClient(8).HashSetAsync($"SignalRClient:connects", connid, client.ToJsonString());
  50. ClientDevice device = HttpUtility.UrlDecode(_device, Encoding.Unicode).ToObject<ClientDevice>();
  51. switch (true)
  52. {
  53. case bool when grant_type.Equals(ScreenConstant.grant_type):
  54. ScreenClient screenClient ;
  55. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  56. if (value!=default && value.HasValue)
  57. {
  58. screenClient = value.ToString().ToObject<ScreenClient>();
  59. // 这里不强制设置free ,因为如果是重连,可能正在执行任务,需要等待执行完成
  60. //先检查状态是否是在忙碌,在时间戳范围里,如果不在时间戳范围,强制free。
  61. if (!screenClient.status!.Equals(ScreenConstant.idle) && screenClient.last_time + screenClient.timeout+ screenClient.delay + ScreenConstant.time_excess < DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
  62. {
  63. screenClient.status = ScreenConstant.idle;
  64. }
  65. }
  66. else
  67. {
  68. screenClient = new ScreenClient
  69. {
  70. status = ScreenConstant.idle,
  71. };
  72. }
  73. screenClient.connid=connid;
  74. screenClient.grant_type = grant_type;
  75. screenClient.clientid = clientid;
  76. screenClient.os = device.os;
  77. screenClient.port = device.port;
  78. screenClient.name = device.name;
  79. screenClient.region = device.region;
  80. screenClient.remote = device.remote;
  81. screenClient.networks = device.networks;
  82. screenClient.screenUrl = device.screenUrl;
  83. screenClient.delay = device.delay;
  84. screenClient.timeout = device.timeout;
  85. screenClient.last_time= DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  86. //连接成功,发送消息给客户端。
  87. await SendConnection(connid, new ConnectionMessage
  88. {
  89. connid=connid,
  90. clientid = clientid,
  91. status = screenClient.status,
  92. grant_type = grant_type,
  93. message_type= MessageType.conn_success,
  94. content = $"连接成功"
  95. });
  96. _logger.LogInformation($"客户端连接成功=>{screenClient.name},{screenClient.region},{clientid}:\n{screenClient.ToJsonString()}");
  97. if (screenClient.status!.Equals(ScreenConstant.idle)) {
  98. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},分发任务......");
  99. //连接成功,马上分发任务。
  100. var task = await TaskService.SentTask(_azureRedis,_azureStorage);
  101. if (task.genQueue!=null && task.genRedis!=null && !string.IsNullOrWhiteSpace(task.genQueue.cntName))
  102. {
  103. screenClient.status = ScreenConstant.busy;
  104. screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  105. await SendMessage(connid, new ScreenProcessMessage
  106. {
  107. connid = connid,
  108. clientid = clientid,
  109. status = ScreenConstant.busy,
  110. grant_type = grant_type,
  111. message_type= MessageType.task_send_success,
  112. content =$"{task.genQueue.ToJsonString()}",//从Redis中获取任务信息
  113. });
  114. }
  115. else {
  116. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},暂无任务可领取的任务......");
  117. if (task.genRedis!=null)
  118. {
  119. string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\ngenQueue:{task.genQueue?.ToJsonString()}\ngenRedis:{task.genRedis?.ToJsonString()}";
  120. _logger.LogInformation(msgError);
  121. await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
  122. }
  123. else {
  124. _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\n");
  125. }
  126. await SendMessage(connid, new ScreenProcessMessage
  127. {
  128. connid = connid,
  129. clientid = clientid,
  130. status = ScreenConstant.idle,
  131. grant_type = grant_type,
  132. message_type= MessageType.task_send_error,
  133. content = task.msg
  134. });
  135. }
  136. }
  137. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid, screenClient.ToJsonString());
  138. break;
  139. }
  140. }
  141. else
  142. {
  143. await SendConnection(connid, new ConnectionMessage
  144. {
  145. clientid = string.Empty,
  146. status =ScreenConstant.error,
  147. grant_type = grant_type,
  148. message_type= MessageType.conn_error,
  149. content = "客户端配置错误",
  150. connid = connid,
  151. });
  152. }
  153. }
  154. }
  155. public override async Task OnDisconnectedAsync(Exception? exception)
  156. {
  157. var connid = Context.ConnectionId;
  158. var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects", connid);
  159. _logger.LogInformation($"客户端断开连接=>{connid} ");
  160. ///连接配置,并且使用钉钉 离线通知。
  161. if (!redisData.IsNullOrEmpty)
  162. {
  163. var client = redisData.ToString().ToObject<SignalRClient>();
  164. await _azureRedis.GetRedisClient(8).HashDeleteAsync($"SignalRClient:connects", connid);
  165. if (client != null)
  166. {
  167. await Groups.RemoveFromGroupAsync(connid, client.grant_type!);
  168. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  169. if (value!=default && value.HasValue)
  170. {
  171. ScreenClient screenClient = value.ToString().ToObject<ScreenClient>() ;
  172. _logger.LogInformation($"客户端断开连接=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
  173. long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  174. // 判断是否过期
  175. if (screenClient.status!.Equals(ScreenConstant.busy ) && screenClient.last_time+screenClient.timeout+screenClient.delay+ ScreenConstant.time_excess <=now)
  176. {
  177. screenClient.status=ScreenConstant.offline;
  178. screenClient.connid= string.Empty;
  179. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid, screenClient.ToJsonString());
  180. }
  181. }
  182. }
  183. }
  184. }
  185. public async Task ReceiveMessage(ScreenProcessMessage message)
  186. {
  187. ////接收消息
  188. //如果是超时,放回队列。
  189. ///分发新任务。
  190. long nowNew = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  191. var connid = Context.ConnectionId;
  192. PDFGenQueue? task = message.content?.ToObject<PDFGenQueue>();
  193. ScreenClient screenClient = null;
  194. try
  195. { //释放客户端的忙碌状态。
  196. var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects", connid);
  197. var client = redisData.ToString().ToObject<SignalRClient>();
  198. await SendConnection(connid, new ConnectionMessage
  199. {
  200. connid=connid,
  201. clientid = client.clientid,
  202. status = ScreenConstant.idle,
  203. grant_type = client.grant_type,
  204. message_type= MessageType.conn_success,
  205. content = $"客户端空闲,等待任务分发......"
  206. });
  207. var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
  208. if (value!=default && value.HasValue)
  209. {
  210. screenClient = value.ToString().ToObject<ScreenClient>();
  211. screenClient.status=ScreenConstant.idle;
  212. screenClient.last_time=nowNew;
  213. screenClient.taskComplete++;
  214. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid,screenClient.ToJsonString());
  215. _logger.LogInformation($"客户端空闲,等待任务分发......=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
  216. }
  217. }
  218. catch (Exception ex)
  219. {
  220. _logger.LogError($"客户端状态重置异常,......=>{connid},{ex.Message},{ex.StackTrace}");
  221. }
  222. if (task!=null)
  223. {
  224. RedisValue redisValue = await _azureRedis.GetRedisClient(8).HashGetAsync($"PDFGen:{task.sessionId}", task.id);
  225. if (redisValue!=default)
  226. {
  227. var genRedis = redisValue.ToString().ToObject<PDFGenRedis>();
  228. genRedis.cost=nowNew-(genRedis.join+genRedis.wait);//拿到分发任务的时间,因为 等待时长=分发时的时间戳-任务生成的时间戳(join)。
  229. genRedis.status=message.result;
  230. genRedis.msg=message.msg;
  231. await _azureRedis.GetRedisClient(8).HashSetAsync($"PDFGen:{task.sessionId}", task.id, genRedis.ToJsonString());
  232. }
  233. ///如果是超时,放回队列。
  234. if (message.result==4)
  235. {
  236. await _azureRedis.GetRedisClient(8).ListLeftPushAsync($"PDFGen:Queue", task.ToJsonString());
  237. }
  238. }
  239. if (screenClient!=null && screenClient.status!.Equals(ScreenConstant.idle))
  240. {
  241. var taskData = await TaskService.SentTask(_azureRedis, _azureStorage);
  242. if (taskData.genQueue!=null && taskData.genRedis!=null && !string.IsNullOrWhiteSpace(taskData.genQueue.cntName))
  243. {
  244. screenClient.status = ScreenConstant.busy;
  245. screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  246. await SendMessage(connid, new ScreenProcessMessage
  247. {
  248. connid = connid,
  249. clientid =screenClient. clientid,
  250. status = ScreenConstant.busy,
  251. grant_type = screenClient.grant_type,
  252. message_type= MessageType.task_send_success,
  253. content =$"{taskData.genQueue.ToJsonString()}",//从Redis中获取任务信息
  254. });
  255. }
  256. else
  257. {
  258. _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{screenClient.clientid},暂无任务可领取的任务......");
  259. if (taskData.genRedis!=null)
  260. {
  261. string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\ngenQueue:{taskData.genQueue?.ToJsonString()}\ngenRedis:{taskData.genRedis?.ToJsonString()}";
  262. _logger.LogInformation(msgError);
  263. await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
  264. }
  265. else
  266. {
  267. _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\n");
  268. }
  269. await SendMessage(connid, new ScreenProcessMessage
  270. {
  271. connid = connid,
  272. clientid = screenClient.clientid,
  273. status = ScreenConstant.idle,
  274. grant_type =screenClient. grant_type,
  275. message_type= MessageType.task_send_error,
  276. content = taskData.msg
  277. });
  278. }
  279. await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", screenClient.clientid, screenClient.ToJsonString());
  280. }
  281. }
  282. public async Task SendConnection(string connectionId, MessageBody msg)
  283. {
  284. await Clients.Client(connectionId).ReceiveConnection(msg);
  285. }
  286. public async Task SendMessage(string connectionId, MessageBody msg)
  287. {
  288. await Clients.Client(connectionId).ReceiveMessage(msg);
  289. }
  290. public async Task SendDisConnection(string connectionId, MessageBody msg)
  291. {
  292. await Clients.Client(connectionId).ReceiveDisConnection(msg);
  293. }
  294. }
  295. }