AzureCosmosFactory.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. using Azure;
  2. using Azure.Cosmos;
  3. using DocumentFormat.OpenXml.Office2010.ExcelAc;
  4. using Microsoft.Azure.Cosmos;
  5. using Microsoft.Extensions.Configuration;
  6. using Microsoft.Extensions.Logging;
  7. using Microsoft.Extensions.Options;
  8. using System;
  9. using System.Collections.Concurrent;
  10. using System.Collections.Generic;
  11. using System.Linq;
  12. using System.Threading.Tasks;
  13. using TEAMModelOS.Models;
  14. using TEAMModelOS.SDK.Context.Attributes.Azure;
  15. using TEAMModelOS.SDK.Context.Configuration;
  16. using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
  17. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  18. using ContainerProperties = Azure.Cosmos.ContainerProperties;
  19. using CosmosClient = Azure.Cosmos.CosmosClient;
  20. using CosmosClientOptions = Azure.Cosmos.CosmosClientOptions;
  21. using OpenXmlPowerTools;
  22. using System.Diagnostics;
  23. using System.IO;
  24. using System.Linq.Expressions;
  25. using System.Net;
  26. using System.Reflection;
  27. using System.Text;
  28. using System.Text.Json;
  29. using System.Threading;
  30. using TEAMModelOS.SDK.Context.Exception;
  31. using TEAMModelOS.SDK.DI;
  32. using PartitionKey = Azure.Cosmos.PartitionKey;
  33. namespace TEAMModelOS.SDK.DI
  34. {
  35. public class AzureCosmosFactory
  36. {
  37. private readonly IServiceProvider _services;
  38. private readonly IOptionsMonitor<AzureCosmosFactoryOptions> _optionsMonitor;
  39. private readonly ILogger _logger;
  40. //private Option _option;
  41. private ConcurrentDictionary<string, CosmosClient> CosmosClients { get; } = new ConcurrentDictionary<string, CosmosClient>();
  42. public AzureCosmosDict CosmosDict { get; set; } = new AzureCosmosDict();
  43. // private CosmosDatabase database { get; set; }
  44. public AzureCosmosFactory(IServiceProvider services, IOptionsMonitor<AzureCosmosFactoryOptions> optionsMonitor, ILogger<AzureCosmosFactory> logger)
  45. {
  46. if (services == null) throw new ArgumentNullException(nameof(services));
  47. if (optionsMonitor == null) throw new ArgumentNullException(nameof(optionsMonitor));
  48. _services = services;
  49. _optionsMonitor = optionsMonitor;
  50. _logger = logger;
  51. InitializeDatabase().GetAwaiter().GetResult();
  52. }
  53. /// <summary>
  54. /// 取得CosmosClient,支持安全執行緒
  55. /// </summary>
  56. /// <param name="name"></param>
  57. /// <param name="region">可以使用Regions.{區域}設置,指定此屬性後,SDK會首選該區域來執行操作。此外,SDK會自動選擇後備的地理複製區域以實現高可用性。如果未指定此屬性,則SDK會將寫區域用作所有操作的首選區域</param>
  58. /// <returns></returns>
  59. public CosmosClient GetCosmosClient(string region = null, string name = "Default")
  60. {
  61. try
  62. {
  63. var cm = CosmosClients.GetOrAdd(name, x => new CosmosClient(_optionsMonitor.Get(name).CosmosConnectionString, new CosmosClientOptions() { ApplicationRegion = region }));
  64. return cm;
  65. }
  66. catch (Exception e)
  67. {
  68. _logger?.LogWarning(e, e.Message);
  69. return null;
  70. }
  71. }
  72. /// <summary>
  73. /// 调用表容器信息
  74. /// </summary>
  75. /// <typeparam name="T"></typeparam>
  76. /// <returns></returns>
  77. public async Task<AzureCosmosModel> InitializeCollection<T>()
  78. {
  79. Type type = typeof(T);
  80. string partitionKey =AzureCosmosUtil. GetPartitionKey<T>();
  81. CosmosDBAttribute cosmosDBAttribute = null;
  82. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  83. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  84. {
  85. cosmosDBAttribute = attributes.First<CosmosDBAttribute>();
  86. }
  87. else
  88. {
  89. throw new BizException(type.Name + "未指定CosmosDB表名", ResponseCode.PARAMS_ERROR);
  90. }
  91. return await InitializeCollection(cosmosDBAttribute, type.Name, partitionKey);
  92. }
  93. /// <summary>
  94. /// 调用表容器信息
  95. /// </summary>
  96. /// <param name="cosmosDBAttribute"></param>
  97. /// <param name="typeName"></param>
  98. /// <param name="PartitionKey"></param>
  99. /// <returns></returns>
  100. public async Task<AzureCosmosModel> InitializeCollection(CosmosDBAttribute cosmosDBAttribute, string typeName, string PartitionKey)
  101. {
  102. /////内存中已经存在这个表则直接返回
  103. if (CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel AzureCosmosModel))
  104. {
  105. return AzureCosmosModel;
  106. }
  107. else {
  108. return null;
  109. }
  110. ///如果没有则尝试默认创建
  111. // else
  112. // {
  113. // ContainerProperties containerProperties = new ContainerProperties { Id = cosmosDBAttribute.Name };
  114. // if (!string.IsNullOrEmpty(PartitionKey))
  115. // {
  116. // containerProperties.PartitionKeyPath = "/" + PartitionKey;
  117. // }
  118. // CosmosDatabase database = GetCosmosClient().GetDatabase(cosmosDBAttribute.Database);
  119. // CosmosContainer containerWithConsistentIndexing =await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: cosmosDBAttribute.RU);
  120. // AzureCosmosModel cosmosModel = new AzureCosmosModel { container = containerWithConsistentIndexing, cache = cosmosDBAttribute.Cache, monitor = cosmosDBAttribute.Monitor,database= database };
  121. // CosmosDict.nameCosmos.TryAdd(cosmosDBAttribute.Name, cosmosModel);
  122. // CosmosDict.typeCosmos.TryAdd(typeName, cosmosModel);
  123. // return cosmosModel;
  124. // }
  125. }
  126. /// <summary>
  127. /// 初始化数据配置信息
  128. /// </summary>
  129. /// <returns></returns>
  130. public async Task InitializeDatabase()
  131. {
  132. int CollectionThroughput = 400;
  133. string[] DatabaseIds = BaseConfigModel.Configuration.GetSection("Azure:Cosmos:Database").Get<string[]>();
  134. bool isMonitor = false;
  135. string leases = "leases";
  136. if (DatabaseIds != null)
  137. {
  138. foreach (string DatabaseId in DatabaseIds)
  139. {
  140. CosmosDatabase databaseDef = GetCosmosClient().GetDatabase(DatabaseId);
  141. AsyncPageable<ContainerProperties> resultSetIterator = databaseDef.GetContainerQueryIterator<ContainerProperties>();
  142. await foreach (var container in resultSetIterator)
  143. {
  144. CosmosDict.nameCosmos.TryAdd(container.Id, new AzureCosmosModel { container = databaseDef.GetContainer(container.Id), partitionKey = container.PartitionKeyPath.Replace("/", ""), cache = false, monitor = false, database = databaseDef });
  145. }
  146. }
  147. }
  148. //获取数据库所有的表
  149. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(BaseConfigModel.Configuration.GetSection("Azure:Cosmos:ScanModel").Get<string[]>() );
  150. foreach (Type type in types)
  151. {
  152. string PartitionKey = AzureCosmosUtil.GetPartitionKey(type);
  153. string CollectionName = "";
  154. int RU = 0;
  155. bool cache = false;
  156. bool monitor = false;
  157. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  158. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  159. {
  160. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  161. }
  162. else
  163. {
  164. throw new BizException("必须指定容器名", ResponseCode.PARAMS_ERROR);
  165. }
  166. if (attributes.First<CosmosDBAttribute>().Cache)
  167. {
  168. cache = attributes.First<CosmosDBAttribute>().Cache;
  169. }
  170. if (attributes.First<CosmosDBAttribute>().Monitor)
  171. {
  172. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  173. if (monitor)
  174. {
  175. isMonitor = true;
  176. }
  177. }
  178. if (attributes.First<CosmosDBAttribute>().RU > 400)
  179. {
  180. RU = attributes.First<CosmosDBAttribute>().RU;
  181. }
  182. else
  183. {
  184. RU = CollectionThroughput;
  185. }
  186. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  187. if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out AzureCosmosModel AzureCosmosModel))
  188. { //更新RU
  189. AzureCosmosModel.cache = cache;
  190. CosmosContainer container = GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id);
  191. int? throughputResponse = await container.ReadThroughputAsync();
  192. if (throughputResponse < RU)
  193. {
  194. await GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id).ReplaceThroughputAsync(RU);
  195. }
  196. AzureCosmosModel cosmos = new AzureCosmosModel { container = container, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey };
  197. CosmosDict.nameCosmos[CollectionName] = cosmos;
  198. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  199. }
  200. else
  201. {
  202. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
  203. if (!string.IsNullOrEmpty(PartitionKey))
  204. {
  205. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  206. }
  207. if (RU > CollectionThroughput)
  208. {
  209. CollectionThroughput = RU;
  210. }
  211. CosmosDatabase database = GetCosmosClient().GetDatabase(attributes.First().Database);
  212. CosmosContainer containerWithConsistentIndexing =await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  213. AzureCosmosModel cosmos = new AzureCosmosModel { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey,database=database };
  214. CosmosDict.nameCosmos[CollectionName] = cosmos;
  215. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  216. }
  217. }
  218. if (isMonitor)
  219. {
  220. if (DatabaseIds != null)
  221. {
  222. foreach (string DatabaseId in DatabaseIds)
  223. {
  224. CosmosDatabase database = GetCosmosClient().GetDatabase(DatabaseId);
  225. ContainerProperties leaseProperties = new ContainerProperties { Id = leases, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
  226. CosmosContainer leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
  227. CosmosDict.nameCosmos.TryAdd(leases, new AzureCosmosModel { container = leaseContainer, cache = false, monitor = false, partitionKey = "/id", database = database });
  228. }
  229. }
  230. }
  231. }
  232. }
  233. }