123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- using Microsoft.AspNetCore.SignalR;
- using Microsoft.Extensions.Primitives;
- using TEAMModelOS.SDK.DI;
- using TEAMModelOS.SDK.Extension;
- using TEAMModelOS.SDK;
- using System.Web;
- using System.Text;
- using StackExchange.Redis;
- namespace HTEX.Complex.Services
- {
- public class SignalRScreenServerHub : Hub<IClient>
- {
- private readonly ILogger<SignalRScreenServerHub> _logger;
- private readonly AzureRedisFactory _azureRedis;
- private readonly AzureStorageFactory _azureStorage;
- private readonly DingDing _dingDing;
- public SignalRScreenServerHub(AzureRedisFactory azureRedis, ILogger<SignalRScreenServerHub> logger ,AzureStorageFactory azureStorage, DingDing dingDing)
- {
- _logger = logger;
- _azureRedis = azureRedis;
- _azureStorage = azureStorage;
- _dingDing = dingDing;
- }
- /// <summary>
- /// 客户连接成功时触发
- /// </summary>
- /// <returns></returns>
- public override async Task OnConnectedAsync()
- {
- var connid = Context.ConnectionId;
- var httpContext = Context.GetHttpContext();
- if (httpContext != null)
- {
- //wss://www.winteach.cn/signalr/notify?grant_type=wechat_qrcode&scene=0a75aca57536490ba00fe62e27bb8f6c&id=U2MNiCFNPPuVcw2gUI_gRA
- //wss://www.winteach.cn/signalr/notify?grant_type=bookjs_api&clientid={clientid}&id=客户端自动生成的
- httpContext.Request.Query.TryGetValue("grant_type", out StringValues grant_type);
- httpContext.Request.Query.TryGetValue("clientid", out StringValues clientid);
- httpContext.Request.Query.TryGetValue("device", out StringValues _device);
- await Groups.AddToGroupAsync(connid, grant_type!);
- if (!clientid.Equals(StringValues.Empty) && !grant_type.Equals(StringValues.Empty)) {
- ///连接配置,并且使用钉钉 通知。
- ///
- var client = new SignalRClient
- {
- connid = connid,
- grant_type = grant_type,
- clientid= clientid
- };
- await _azureRedis.GetRedisClient(8).HashSetAsync($"SignalRClient:connects", connid, client.ToJsonString());
- ClientDevice device = HttpUtility.UrlDecode(_device, Encoding.Unicode).ToObject<ClientDevice>();
- switch (true)
- {
- case bool when grant_type.Equals(ScreenConstant.grant_type):
- ScreenClient screenClient ;
- var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
- if (value!=default && value.HasValue)
- {
- screenClient = value.ToString().ToObject<ScreenClient>();
-
- // 这里不强制设置free ,因为如果是重连,可能正在执行任务,需要等待执行完成
-
- //先检查状态是否是在忙碌,在时间戳范围里,如果不在时间戳范围,强制free。
- if (!screenClient.status!.Equals(ScreenConstant.idle) && screenClient.last_time + screenClient.timeout+ screenClient.delay + ScreenConstant.time_excess < DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
- {
- screenClient.status = ScreenConstant.idle;
- }
- }
- else
- {
- screenClient = new ScreenClient
- {
- status = ScreenConstant.idle,
- };
- }
- screenClient.connid=connid;
- screenClient.grant_type = grant_type;
- screenClient.clientid = clientid;
- screenClient.os = device.os;
- screenClient.port = device.port;
- screenClient.name = device.name;
- screenClient.region = device.region;
- screenClient.remote = device.remote;
- screenClient.networks = device.networks;
- screenClient.screenUrl = device.screenUrl;
- screenClient.delay = device.delay;
- screenClient.timeout = device.timeout;
- screenClient.cpu = device.cpu;
- screenClient.ram = device.ram;
- screenClient.last_time= DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
- //连接成功,发送消息给客户端。
- await SendConnection(connid, new ConnectionMessage
- {
- connid=connid,
- clientid = clientid,
- status = screenClient.status,
- grant_type = grant_type,
- message_type= MessageType.conn_success,
- content = $"连接成功"
- });
- _logger.LogInformation($"客户端连接成功=>{screenClient.name},{screenClient.region},{clientid}:\n{screenClient.ToJsonString()}");
- if (screenClient.status!.Equals(ScreenConstant.idle)) {
- _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},分发任务......");
- //连接成功,马上分发任务。
- var task = await TaskService.SentTask(_azureRedis,_azureStorage);
- if (task.genQueue!=null && task.genRedis!=null && !string.IsNullOrWhiteSpace(task.genQueue.cntName))
- {
- screenClient.status = ScreenConstant.busy;
- screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
- await SendMessage(connid, new ScreenProcessMessage
- {
- connid = connid,
- clientid = clientid,
- status = ScreenConstant.busy,
- grant_type = grant_type,
- message_type= MessageType.task_send_success,
- content =$"{task.genQueue.ToJsonString()}",//从Redis中获取任务信息
- });
- }
- else {
- _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{clientid},暂无任务可领取的任务......");
- if (task.genRedis!=null)
- {
- string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\ngenQueue:{task.genQueue?.ToJsonString()}\ngenRedis:{task.genRedis?.ToJsonString()}";
- _logger.LogInformation(msgError);
- await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
- }
- else {
- _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{clientid}:{task.msg}\n");
- }
- await SendMessage(connid, new ScreenProcessMessage
- {
- connid = connid,
- clientid = clientid,
- status = ScreenConstant.idle,
- grant_type = grant_type,
- message_type= MessageType.task_send_error,
- content = task.msg
- });
- }
- }
- await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid, screenClient.ToJsonString());
- break;
- }
- }
- else
- {
- await SendConnection(connid, new ConnectionMessage
- {
- clientid = string.Empty,
- status =ScreenConstant.error,
- grant_type = grant_type,
- message_type= MessageType.conn_error,
- content = "客户端配置错误",
- connid = connid,
- });
- }
- }
- }
- public override async Task OnDisconnectedAsync(Exception? exception)
- {
- var connid = Context.ConnectionId;
- var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects", connid);
- _logger.LogInformation($"客户端断开连接=>{connid} ");
- ///连接配置,并且使用钉钉 离线通知。
- if (!redisData.IsNullOrEmpty)
- {
- var client = redisData.ToString().ToObject<SignalRClient>();
- await _azureRedis.GetRedisClient(8).HashDeleteAsync($"SignalRClient:connects", connid);
- if (client != null)
- {
- await Groups.RemoveFromGroupAsync(connid, client.grant_type!);
- var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
- if (value!=default && value.HasValue)
- {
- ScreenClient screenClient = value.ToString().ToObject<ScreenClient>() ;
- _logger.LogInformation($"客户端断开连接=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
- long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
- // 判断是否过期
- if (screenClient.status!.Equals(ScreenConstant.busy ) && screenClient.last_time+screenClient.timeout+screenClient.delay+ ScreenConstant.time_excess <=now)
- {
- screenClient.status=ScreenConstant.offline;
- screenClient.connid= string.Empty;
- await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid, screenClient.ToJsonString());
- }
- }
- }
- }
- }
- public async Task ReceiveMessage(ScreenProcessMessage message)
- {
- ////接收消息
- //如果是超时,放回队列。
- ///分发新任务。
- long nowNew = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
- var connid = Context.ConnectionId;
- PDFGenQueue? task = message.content?.ToObject<PDFGenQueue>();
- ScreenClient screenClient = null;
- try
- { //释放客户端的忙碌状态。
- var redisData = await _azureRedis.GetRedisClient(8).HashGetAsync($"SignalRClient:connects", connid);
- var client = redisData.ToString().ToObject<SignalRClient>();
- await SendConnection(connid, new ConnectionMessage
- {
- connid=connid,
- clientid = client.clientid,
- status = ScreenConstant.idle,
- grant_type = client.grant_type,
- message_type= MessageType.conn_success,
- content = $"客户端空闲,等待任务分发......"
- });
- var value = await _azureRedis.GetRedisClient(8).HashGetAsync($"ScreenApi:clients", client.clientid);
- if (value!=default && value.HasValue)
- {
- screenClient = value.ToString().ToObject<ScreenClient>();
- screenClient.status=ScreenConstant.idle;
- screenClient.last_time=nowNew;
- screenClient.taskComplete++;
- await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", client.clientid,screenClient.ToJsonString());
- _logger.LogInformation($"客户端空闲,等待任务分发......=>{connid},{screenClient.name},{screenClient.region},{screenClient.clientid} ");
- }
-
- }
- catch (Exception ex)
- {
- _logger.LogError($"客户端状态重置异常,......=>{connid},{ex.Message},{ex.StackTrace}");
- }
- if (task!=null)
- {
- RedisValue redisValue = await _azureRedis.GetRedisClient(8).HashGetAsync($"PDFGen:{task.sessionId}", task.id);
- if (redisValue!=default)
- {
- var genRedis = redisValue.ToString().ToObject<PDFGenRedis>();
- genRedis.cost=nowNew-(genRedis.join+genRedis.wait);//拿到分发任务的时间,因为 等待时长=分发时的时间戳-任务生成的时间戳(join)。
- genRedis.status=message.result;
- genRedis.msg=message.msg;
- await _azureRedis.GetRedisClient(8).HashSetAsync($"PDFGen:{task.sessionId}", task.id, genRedis.ToJsonString());
- }
- ///如果是超时,放回队列。
- if (message.result==4)
- {
- await _azureRedis.GetRedisClient(8).ListLeftPushAsync($"PDFGen:Queue", task.ToJsonString());
- }
- }
- if (screenClient!=null && screenClient.status!.Equals(ScreenConstant.idle))
- {
- var taskData = await TaskService.SentTask(_azureRedis, _azureStorage);
- if (taskData.genQueue!=null && taskData.genRedis!=null && !string.IsNullOrWhiteSpace(taskData.genQueue.cntName))
- {
- screenClient.status = ScreenConstant.busy;
- screenClient.last_time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
- await SendMessage(connid, new ScreenProcessMessage
- {
- connid = connid,
- clientid =screenClient. clientid,
- status = ScreenConstant.busy,
- grant_type = screenClient.grant_type,
- message_type= MessageType.task_send_success,
- content =$"{taskData.genQueue.ToJsonString()}",//从Redis中获取任务信息
- });
- }
- else
- {
- _logger.LogInformation($"客户端当前空闲=>{screenClient.name},{screenClient.region},{screenClient.clientid},暂无任务可领取的任务......");
- if (taskData.genRedis!=null)
- {
- string msgError = $"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\ngenQueue:{taskData.genQueue?.ToJsonString()}\ngenRedis:{taskData.genRedis?.ToJsonString()}";
- _logger.LogInformation(msgError);
- await _dingDing.SendBotMsg(msgError, GroupNames.成都开发測試群組);
- }
- else
- {
- _logger.LogInformation($"分发任务异常原因=>{screenClient.name},{screenClient.region},{screenClient.clientid}:{taskData.msg}\n");
- }
- await SendMessage(connid, new ScreenProcessMessage
- {
- connid = connid,
- clientid = screenClient.clientid,
- status = ScreenConstant.idle,
- grant_type =screenClient. grant_type,
- message_type= MessageType.task_send_error,
- content = taskData.msg
- });
- }
- await _azureRedis.GetRedisClient(8).HashSetAsync($"ScreenApi:clients", screenClient.clientid, screenClient.ToJsonString());
- }
- }
- public async Task SendConnection(string connectionId, MessageBody msg)
- {
- await Clients.Client(connectionId).ReceiveConnection(msg);
- }
- public async Task SendMessage(string connectionId, MessageBody msg)
- {
- await Clients.Client(connectionId).ReceiveMessage(msg);
- }
- public async Task SendDisConnection(string connectionId, MessageBody msg)
- {
- await Clients.Client(connectionId).ReceiveDisConnection(msg);
- }
- }
- }
|