123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- using Azure.Messaging.ServiceBus;
- using Microsoft.Extensions.Hosting;
- using StackExchange.Redis;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net.Http;
- using System.ServiceModel.Channels;
- using System.Text;
- using System.Text.Json;
- using System.Threading;
- using System.Threading.Tasks;
- using TEAMModelOS.SDK.DI;
- using TEAMModelOS.SDK.Extension;
- using TEAMModelOS.SDK.Models;
- using TEAMModelOS.SDK.Services;
- using static TEAMModelOS.FunctionV4.IESHttpTrigger;
- namespace TEAMModelOS.FunctionV4
- {
- public class BlobRootServiceBusSub : BackgroundService, IDisposable
- {
- private readonly AzureStorageFactory _azureStorage;
- private readonly AzureRedisFactory _azureRedis;
- private readonly ServiceBusReceiver _receiver;
- private readonly DingDing _dingDing;
- private const string Channel = "BlobRoot";
- public static SpinWait spinWait = new SpinWait(); // 构造SpinWait实例
- public BlobRootServiceBusSub(DingDing dingDing, AzureRedisFactory azureRedisFactory, AzureStorageFactory azureStorage, AzureServiceBusFactory azureService)
- {
- _receiver = azureService.GetServiceBusClient().CreateReceiver(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), Channel, new ServiceBusReceiverOptions { ReceiveMode = ServiceBusReceiveMode.PeekLock });
- _azureStorage = azureStorage;
- _azureRedis = azureRedisFactory;
- _dingDing = dingDing;
- }
- protected async override Task ExecuteAsync(CancellationToken stoppingToken)
- {
- while (true)
- {
- //30分钟。进入死信
- IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await _receiver.ReceiveMessagesAsync(maxMessages: 1,maxWaitTime:new TimeSpan(0,30,0));
- foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
- {
- try
- {
- string body = receivedMessage.Body.ToString();
- var jsonMsg = body.ToObject<BlobRefreshMessage>();
- if ( !string.IsNullOrWhiteSpace($"{jsonMsg.root}") && !string.IsNullOrWhiteSpace($"{jsonMsg.name}"))
- {
- string lockKey = $"Blob:Lock:{jsonMsg.name}:{jsonMsg.root}";
- bool exist = await _azureRedis.GetRedisClient(8).KeyExistsAsync(lockKey);
- if (exist) {
- string[] uls = System.Web.HttpUtility.UrlDecode($"{jsonMsg.root}", Encoding.UTF8).Split("/");
- string u = !string.IsNullOrEmpty(uls[0]) ? uls[0] : uls[1];
- string name = $"{jsonMsg.name}";
- await RefreshBlob(name, u);
- await _azureRedis.GetRedisClient(8).KeyDeleteAsync(lockKey);
- }
- }
- await _receiver.CompleteMessageAsync(receivedMessage);
- }
- catch
- {
- //失败则放回队列死信中
- await _receiver.DeadLetterMessageAsync(receivedMessage);
- }
- }
- //cpu空转一次
- spinWait.SpinOnce();
- }
- }
-
-
- private async Task RefreshBlob(string name, string u)
- {
- long statr = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
- var client = _azureStorage.GetBlobContainerClient(name);
- var size = await client.GetBlobsSize(u);
- await _azureRedis.GetRedisClient(8).SortedSetRemoveAsync($"Blob:Catalog:{name}", u);
- await _azureRedis.GetRedisClient(8).SortedSetIncrementAsync($"Blob:Catalog:{name}", u, size.HasValue ? size.Value : 0);
- var scores = await _azureRedis.GetRedisClient(8).SortedSetRangeByRankWithScoresAsync($"Blob:Catalog:{name}");
- double blobsize = 0;
- if (scores != default && scores != null)
- {
- foreach (var score in scores)
- {
- blobsize = blobsize + score.Score;
- }
- }
- if (blobsize > 0) {
- await _azureRedis.GetRedisClient(8).HashSetAsync($"Blob:Record", new RedisValue(name), new RedisValue($"{blobsize}"));
- }
- long end = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
- long dis = (end - statr) / 1000;
- long timeout = 60 * 5;
- if (dis > timeout)
- {
- await _dingDing.SendBotMsg($"ServiceBus,RefreshBlob:空间计算已经超过{timeout}秒\n容器名:{name}\n文件夹:{u}\n计算时长:{dis}\n文件夹大小:{blobsize}", GroupNames.醍摩豆服務運維群組);
- }
- //await _dingDing.SendBotMsg($"ServiceBus,RefreshBlob:\n容器名:{name}\n文件夹:{u}\n计算时长:{dis}\n文件夹大小:{blobsize}", GroupNames.醍摩豆服務運維群組);
- }
- }
- }
|