ServiceBusSub.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. using Azure.Messaging.ServiceBus;
  2. using Microsoft.Extensions.Hosting;
  3. using StackExchange.Redis;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Net.Http;
  8. using System.ServiceModel.Channels;
  9. using System.Text;
  10. using System.Text.Json;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using TEAMModelOS.SDK.DI;
  14. using TEAMModelOS.SDK.Extension;
  15. using TEAMModelOS.SDK.Models;
  16. using TEAMModelOS.SDK.Services;
  17. using static TEAMModelOS.FunctionV4.IESHttpTrigger;
  18. namespace TEAMModelOS.FunctionV4
  19. {
  20. public class BlobRootServiceBusSub : BackgroundService, IDisposable
  21. {
  22. private readonly AzureStorageFactory _azureStorage;
  23. private readonly AzureRedisFactory _azureRedis;
  24. private readonly ServiceBusReceiver _receiver;
  25. private readonly DingDing _dingDing;
  26. private const string Channel = "BlobRoot";
  27. public static SpinWait spinWait = new SpinWait(); // 构造SpinWait实例
  28. public BlobRootServiceBusSub(DingDing dingDing, AzureRedisFactory azureRedisFactory, AzureStorageFactory azureStorage, AzureServiceBusFactory azureService)
  29. {
  30. _receiver = azureService.GetServiceBusClient().CreateReceiver(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), Channel, new ServiceBusReceiverOptions { ReceiveMode = ServiceBusReceiveMode.PeekLock });
  31. _azureStorage = azureStorage;
  32. _azureRedis = azureRedisFactory;
  33. _dingDing = dingDing;
  34. }
  35. protected async override Task ExecuteAsync(CancellationToken stoppingToken)
  36. {
  37. while (true)
  38. {
  39. //30分钟。进入死信
  40. IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await _receiver.ReceiveMessagesAsync(maxMessages: 1,maxWaitTime:new TimeSpan(0,30,0));
  41. foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
  42. {
  43. try
  44. {
  45. string body = receivedMessage.Body.ToString();
  46. var jsonMsg = body.ToObject<BlobRefreshMessage>();
  47. if ( !string.IsNullOrWhiteSpace($"{jsonMsg.root}") && !string.IsNullOrWhiteSpace($"{jsonMsg.name}"))
  48. {
  49. string lockKey = $"Blob:Lock:{jsonMsg.name}:{jsonMsg.root}";
  50. bool exist = await _azureRedis.GetRedisClient(8).KeyExistsAsync(lockKey);
  51. if (exist) {
  52. string[] uls = System.Web.HttpUtility.UrlDecode($"{jsonMsg.root}", Encoding.UTF8).Split("/");
  53. string u = !string.IsNullOrEmpty(uls[0]) ? uls[0] : uls[1];
  54. string name = $"{jsonMsg.name}";
  55. await RefreshBlob(name, u);
  56. await _azureRedis.GetRedisClient(8).KeyDeleteAsync(lockKey);
  57. }
  58. }
  59. await _receiver.CompleteMessageAsync(receivedMessage);
  60. }
  61. catch
  62. {
  63. //失败则放回队列死信中
  64. await _receiver.DeadLetterMessageAsync(receivedMessage);
  65. }
  66. }
  67. //cpu空转一次
  68. spinWait.SpinOnce();
  69. }
  70. }
  71. private async Task RefreshBlob(string name, string u)
  72. {
  73. long statr = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  74. var client = _azureStorage.GetBlobContainerClient(name);
  75. var size = await client.GetBlobsSize(u);
  76. await _azureRedis.GetRedisClient(8).SortedSetRemoveAsync($"Blob:Catalog:{name}", u);
  77. await _azureRedis.GetRedisClient(8).SortedSetIncrementAsync($"Blob:Catalog:{name}", u, size.HasValue ? size.Value : 0);
  78. var scores = await _azureRedis.GetRedisClient(8).SortedSetRangeByRankWithScoresAsync($"Blob:Catalog:{name}");
  79. double blobsize = 0;
  80. if (scores != default && scores != null)
  81. {
  82. foreach (var score in scores)
  83. {
  84. blobsize = blobsize + score.Score;
  85. }
  86. }
  87. if (blobsize > 0) {
  88. await _azureRedis.GetRedisClient(8).HashSetAsync($"Blob:Record", new RedisValue(name), new RedisValue($"{blobsize}"));
  89. }
  90. long end = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
  91. long dis = (end - statr) / 1000;
  92. long timeout = 60 * 5;
  93. if (dis > timeout)
  94. {
  95. await _dingDing.SendBotMsg($"ServiceBus,RefreshBlob:空间计算已经超过{timeout}秒\n容器名:{name}\n文件夹:{u}\n计算时长:{dis}\n文件夹大小:{blobsize}", GroupNames.醍摩豆服務運維群組);
  96. }
  97. //await _dingDing.SendBotMsg($"ServiceBus,RefreshBlob:\n容器名:{name}\n文件夹:{u}\n计算时长:{dis}\n文件夹大小:{blobsize}", GroupNames.醍摩豆服務運維群組);
  98. }
  99. }
  100. }