using Azure.Messaging.ServiceBus; using Microsoft.Extensions.Hosting; using StackExchange.Redis; using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.ServiceModel.Channels; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using TEAMModelOS.SDK.DI; using TEAMModelOS.SDK.Extension; using TEAMModelOS.SDK.Models; using TEAMModelOS.SDK.Services; using static TEAMModelOS.FunctionV4.IESHttpTrigger; namespace TEAMModelOS.FunctionV4 { public class BlobRootServiceBusSub : BackgroundService, IDisposable { private readonly AzureStorageFactory _azureStorage; private readonly AzureRedisFactory _azureRedis; private readonly ServiceBusReceiver _receiver; private readonly DingDing _dingDing; private const string Channel = "BlobRoot"; public static SpinWait spinWait = new SpinWait(); // 构造SpinWait实例 public BlobRootServiceBusSub(DingDing dingDing, AzureRedisFactory azureRedisFactory, AzureStorageFactory azureStorage, AzureServiceBusFactory azureService) { _receiver = azureService.GetServiceBusClient().CreateReceiver(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), Channel, new ServiceBusReceiverOptions { ReceiveMode = ServiceBusReceiveMode.PeekLock }); _azureStorage = azureStorage; _azureRedis = azureRedisFactory; _dingDing = dingDing; } protected async override Task ExecuteAsync(CancellationToken stoppingToken) { while (true) { //30分钟。进入死信 IReadOnlyList receivedMessages = await _receiver.ReceiveMessagesAsync(maxMessages: 1,maxWaitTime:new TimeSpan(0,30,0)); foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages) { try { string body = receivedMessage.Body.ToString(); var jsonMsg = body.ToObject(); if ( !string.IsNullOrWhiteSpace($"{jsonMsg.root}") && !string.IsNullOrWhiteSpace($"{jsonMsg.name}")) { string lockKey = $"Blob:Lock:{jsonMsg.name}:{jsonMsg.root}"; bool exist = await _azureRedis.GetRedisClient(8).KeyExistsAsync(lockKey); if (exist) { string[] uls = System.Web.HttpUtility.UrlDecode($"{jsonMsg.root}", Encoding.UTF8).Split("/"); string u = !string.IsNullOrEmpty(uls[0]) ? uls[0] : uls[1]; string name = $"{jsonMsg.name}"; await RefreshBlob(name, u); await _azureRedis.GetRedisClient(8).KeyDeleteAsync(lockKey); } } await _receiver.CompleteMessageAsync(receivedMessage); } catch { //失败则放回队列死信中 await _receiver.DeadLetterMessageAsync(receivedMessage); } } //cpu空转一次 spinWait.SpinOnce(); } } private async Task RefreshBlob(string name, string u) { long statr = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var client = _azureStorage.GetBlobContainerClient(name); var size = await client.GetBlobsSize(u); await _azureRedis.GetRedisClient(8).SortedSetRemoveAsync($"Blob:Catalog:{name}", u); await _azureRedis.GetRedisClient(8).SortedSetIncrementAsync($"Blob:Catalog:{name}", u, size.HasValue ? size.Value : 0); var scores = await _azureRedis.GetRedisClient(8).SortedSetRangeByRankWithScoresAsync($"Blob:Catalog:{name}"); double blobsize = 0; if (scores != default && scores != null) { foreach (var score in scores) { blobsize = blobsize + score.Score; } } if (blobsize > 0) { await _azureRedis.GetRedisClient(8).HashSetAsync($"Blob:Record", new RedisValue(name), new RedisValue($"{blobsize}")); } long end = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); long dis = (end - statr) / 1000; long timeout = 60 * 5; if (dis > timeout) { await _dingDing.SendBotMsg($"ServiceBus,RefreshBlob:空间计算已经超过{timeout}秒\n容器名:{name}\n文件夹:{u}\n计算时长:{dis}\n文件夹大小:{blobsize}", GroupNames.醍摩豆服務運維群組); } //await _dingDing.SendBotMsg($"ServiceBus,RefreshBlob:\n容器名:{name}\n文件夹:{u}\n计算时长:{dis}\n文件夹大小:{blobsize}", GroupNames.醍摩豆服務運維群組); } } }