AzureCosmosFactory.cs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. using Azure;
  2. using Azure.Cosmos;
  3. using DocumentFormat.OpenXml.Office2010.ExcelAc;
  4. using Microsoft.Azure.Cosmos;
  5. using Microsoft.Extensions.Configuration;
  6. using Microsoft.Extensions.Logging;
  7. using Microsoft.Extensions.Options;
  8. using System;
  9. using System.Collections.Concurrent;
  10. using System.Collections.Generic;
  11. using System.Linq;
  12. using System.Threading.Tasks;
  13. using TEAMModelOS.Models;
  14. using TEAMModelOS.SDK.Context.Attributes.Azure;
  15. using TEAMModelOS.SDK.Context.Configuration;
  16. using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
  17. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  18. using ContainerProperties = Azure.Cosmos.ContainerProperties;
  19. using CosmosClient = Azure.Cosmos.CosmosClient;
  20. using CosmosClientOptions = Azure.Cosmos.CosmosClientOptions;
  21. using OpenXmlPowerTools;
  22. using System.Diagnostics;
  23. using System.IO;
  24. using System.Linq.Expressions;
  25. using System.Net;
  26. using System.Reflection;
  27. using System.Text;
  28. using System.Text.Json;
  29. using System.Threading;
  30. using TEAMModelOS.SDK.Context.Exception;
  31. using TEAMModelOS.SDK.DI;
  32. using PartitionKey = Azure.Cosmos.PartitionKey;
  33. namespace TEAMModelOS.SDK.DI
  34. {
  35. public class AzureCosmosFactory
  36. {
  37. private readonly IServiceProvider _services;
  38. private readonly IOptionsMonitor<AzureCosmosFactoryOptions> _optionsMonitor;
  39. private readonly ILogger _logger;
  40. //private Option _option;
  41. private ConcurrentDictionary<string, CosmosClient> CosmosClients { get; } = new ConcurrentDictionary<string, CosmosClient>();
  42. public AzureCosmosDict CosmosDict { get; set; } = new AzureCosmosDict();
  43. // private CosmosDatabase database { get; set; }
  44. public AzureCosmosFactory(IServiceProvider services, IOptionsMonitor<AzureCosmosFactoryOptions> optionsMonitor, ILogger<AzureCosmosFactory> logger)
  45. {
  46. if (services == null) throw new ArgumentNullException(nameof(services));
  47. if (optionsMonitor == null) throw new ArgumentNullException(nameof(optionsMonitor));
  48. _services = services;
  49. _optionsMonitor = optionsMonitor;
  50. _logger = logger;
  51. InitializeDatabase().GetAwaiter().GetResult();
  52. }
  53. /// <summary>
  54. /// 取得CosmosClient,支持安全執行緒
  55. /// </summary>
  56. /// <param name="name"></param>
  57. /// <param name="region">可以使用Regions.{區域}設置,指定此屬性後,SDK會首選該區域來執行操作。此外,SDK會自動選擇後備的地理複製區域以實現高可用性。如果未指定此屬性,則SDK會將寫區域用作所有操作的首選區域</param>
  58. /// <returns></returns>
  59. public CosmosClient GetCosmosClient(string region = null, string name = "Default")
  60. {
  61. try
  62. {
  63. var cm = CosmosClients.GetOrAdd(name, x => new CosmosClient(_optionsMonitor.Get(name).CosmosConnectionString, new CosmosClientOptions() { ApplicationRegion = region }));
  64. return cm;
  65. }
  66. catch (Exception e)
  67. {
  68. _logger?.LogWarning(e, e.Message);
  69. return null;
  70. }
  71. }
  72. /// <summary>
  73. /// 获取容器信息
  74. /// </summary>
  75. /// <param name="cosmosDBAttribute"></param>
  76. /// <param name="typeName"></param>
  77. /// <param name="PartitionKey"></param>
  78. /// <returns></returns>
  79. public AzureCosmosModel GetCosmosModel( string typeName )
  80. {
  81. /////内存中已经存在这个表则直接返回
  82. if (CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel AzureCosmosModel))
  83. {
  84. return AzureCosmosModel;
  85. }
  86. else {
  87. return null;
  88. }
  89. }
  90. /// <summary>
  91. /// 初始化数据配置信息
  92. /// </summary>
  93. /// <returns></returns>
  94. public async Task InitializeDatabase()
  95. {
  96. // int CollectionThroughput = 400;
  97. string[] DatabaseIds = BaseConfigModel.Configuration.GetSection("Azure:Cosmos:Database").Get<string[]>();
  98. bool isMonitor = false;
  99. string leases = "leases";
  100. if (DatabaseIds != null)
  101. {
  102. foreach (string DatabaseId in DatabaseIds)
  103. {
  104. CosmosDatabase databaseDef = GetCosmosClient().GetDatabase(DatabaseId);
  105. AsyncPageable<ContainerProperties> resultSetIterator = databaseDef.GetContainerQueryIterator<ContainerProperties>();
  106. await foreach (var container in resultSetIterator)
  107. {
  108. CosmosDict.nameCosmos.TryAdd(container.Id, new AzureCosmosModel { container = databaseDef.GetContainer(container.Id), partitionKey = container.PartitionKeyPath.Replace("/", ""), cache = false, monitor = false, database = databaseDef });
  109. }
  110. }
  111. }
  112. //获取数据库所有的表
  113. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(BaseConfigModel.Configuration.GetSection("Azure:Cosmos:ScanModel").Get<string[]>() );
  114. foreach (Type type in types)
  115. {
  116. string PartitionKey = AzureCosmosUtil.GetPartitionKey(type);
  117. string CollectionName = "";
  118. // int RU = 0;
  119. bool cache = false;
  120. bool monitor = false;
  121. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  122. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  123. {
  124. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  125. }
  126. else
  127. {
  128. throw new BizException("必须指定容器名", ResponseCode.PARAMS_ERROR);
  129. }
  130. if (attributes.First<CosmosDBAttribute>().Cache)
  131. {
  132. cache = attributes.First<CosmosDBAttribute>().Cache;
  133. }
  134. if (attributes.First<CosmosDBAttribute>().Monitor)
  135. {
  136. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  137. if (monitor)
  138. {
  139. isMonitor = true;
  140. }
  141. }
  142. //if (attributes.First<CosmosDBAttribute>().RU > 400)
  143. //{
  144. // RU = attributes.First<CosmosDBAttribute>().RU;
  145. //}
  146. //else
  147. //{
  148. // RU = CollectionThroughput;
  149. //}
  150. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  151. if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out AzureCosmosModel AzureCosmosModel))
  152. { //更新RU
  153. AzureCosmosModel.cache = cache;
  154. CosmosContainer container = GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id);
  155. //int? throughputResponse = await container.ReadThroughputAsync();
  156. //if (throughputResponse < RU)
  157. //{
  158. // await GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id).ReplaceThroughputAsync(RU);
  159. //}
  160. AzureCosmosModel cosmos = new AzureCosmosModel { container = container, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey };
  161. CosmosDict.nameCosmos[CollectionName] = cosmos;
  162. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  163. }
  164. else
  165. {
  166. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
  167. if (!string.IsNullOrEmpty(PartitionKey))
  168. {
  169. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  170. }
  171. //if (RU > CollectionThroughput)
  172. //{
  173. // CollectionThroughput = RU;
  174. //}
  175. CosmosDatabase database = GetCosmosClient().GetDatabase(attributes.First().Database);
  176. CosmosContainer containerWithConsistentIndexing =await database.CreateContainerIfNotExistsAsync(containerProperties);
  177. AzureCosmosModel cosmos = new AzureCosmosModel { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey,database=database };
  178. CosmosDict.nameCosmos[CollectionName] = cosmos;
  179. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  180. }
  181. }
  182. if (isMonitor)
  183. {
  184. if (DatabaseIds != null)
  185. {
  186. foreach (string DatabaseId in DatabaseIds)
  187. {
  188. CosmosDatabase database = GetCosmosClient().GetDatabase(DatabaseId);
  189. ContainerProperties leaseProperties = new ContainerProperties { Id = leases, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
  190. CosmosContainer leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties);
  191. CosmosDict.nameCosmos.TryAdd(leases, new AzureCosmosModel { container = leaseContainer, cache = false, monitor = false, partitionKey = "/id", database = database });
  192. }
  193. }
  194. }
  195. }
  196. }
  197. }