AzureCosmosFactory.cs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. using Azure;
  2. using Azure.Cosmos;
  3. using Microsoft.Extensions.Configuration;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Options;
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Threading.Tasks;
  11. using TEAMModelOS.Models;
  12. using TEAMModelOS.SDK.Context.Attributes.Azure;
  13. using TEAMModelOS.SDK.Context.Configuration;
  14. using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
  15. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  16. using OpenXmlPowerTools;
  17. using System.Diagnostics;
  18. using System.IO;
  19. using System.Linq.Expressions;
  20. using System.Net;
  21. using System.Reflection;
  22. using System.Text;
  23. using System.Text.Json;
  24. using System.Threading;
  25. using TEAMModelOS.SDK.Context.Exception;
  26. using TEAMModelOS.SDK.DI;
  27. namespace TEAMModelOS.SDK.DI
  28. {
  29. public class AzureCosmosFactory
  30. {
  31. private readonly IServiceProvider _services;
  32. private readonly IOptionsMonitor<AzureCosmosFactoryOptions> _optionsMonitor;
  33. private readonly ILogger _logger;
  34. //private Option _option;
  35. private ConcurrentDictionary<string, CosmosClient> CosmosClients { get; } = new ConcurrentDictionary<string, CosmosClient>();
  36. public AzureCosmosDict CosmosDict { get; set; } = new AzureCosmosDict();
  37. // private CosmosDatabase database { get; set; }
  38. public AzureCosmosFactory(IServiceProvider services, IOptionsMonitor<AzureCosmosFactoryOptions> optionsMonitor, ILogger<AzureCosmosFactory> logger)
  39. {
  40. if (services == null) throw new ArgumentNullException(nameof(services));
  41. if (optionsMonitor == null) throw new ArgumentNullException(nameof(optionsMonitor));
  42. _services = services;
  43. _optionsMonitor = optionsMonitor;
  44. _logger = logger;
  45. InitializeDatabase().GetAwaiter().GetResult();
  46. }
  47. /// <summary>
  48. /// 取得CosmosClient,支持安全執行緒
  49. /// </summary>
  50. /// <param name="name"></param>
  51. /// <param name="region">可以使用Regions.{區域}設置,指定此屬性後,SDK會首選該區域來執行操作。此外,SDK會自動選擇後備的地理複製區域以實現高可用性。如果未指定此屬性,則SDK會將寫區域用作所有操作的首選區域</param>
  52. /// <returns></returns>
  53. public CosmosClient GetCosmosClient(string region = null, string name = "Default")
  54. {
  55. try
  56. {
  57. var cm = CosmosClients.GetOrAdd(name, x => new CosmosClient(_optionsMonitor.Get(name).ConnectionString, new CosmosClientOptions() { ApplicationRegion = region }));
  58. return cm;
  59. }
  60. catch (Exception e)
  61. {
  62. _logger?.LogWarning(e, e.Message);
  63. return null;
  64. }
  65. }
  66. /// <summary>
  67. /// 获取容器信息
  68. /// </summary>
  69. /// <param name="cosmosDBAttribute"></param>
  70. /// <param name="typeName"></param>
  71. /// <param name="PartitionKey"></param>
  72. /// <returns></returns>
  73. public AzureCosmosModel GetCosmosModel( string typeName )
  74. {
  75. /////内存中已经存在这个表则直接返回
  76. if (CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel AzureCosmosModel))
  77. {
  78. return AzureCosmosModel;
  79. }
  80. else {
  81. return null;
  82. }
  83. }
  84. /// <summary>
  85. /// 初始化数据配置信息
  86. /// </summary>
  87. /// <returns></returns>
  88. public async Task InitializeDatabase()
  89. {
  90. // string[] DatabaseIds = BaseConfigModel.Configuration.GetSection("Azure:Cosmos:Database").Get<string[]>();
  91. string[] DatabaseIds =_optionsMonitor.Get("Default").Database;
  92. bool isMonitor = false;
  93. string leases = "leases";
  94. if (DatabaseIds != null)
  95. {
  96. foreach (string DatabaseId in DatabaseIds)
  97. {
  98. CosmosDatabase databaseDef = GetCosmosClient().GetDatabase(DatabaseId);
  99. AsyncPageable<ContainerProperties> resultSetIterator = databaseDef.GetContainerQueryIterator<ContainerProperties>();
  100. await foreach (var container in resultSetIterator)
  101. {
  102. CosmosDict.nameCosmos.TryAdd(container.Id, new AzureCosmosModel { container = databaseDef.GetContainer(container.Id), partitionKey = container.PartitionKeyPath.Replace("/", ""), cache = false, monitor = false, database = databaseDef });
  103. }
  104. }
  105. }
  106. //获取数据库所有的表
  107. // List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(BaseConfigModel.Configuration.GetSection("Azure:Cosmos:ScanModel").Get<string[]>() );
  108. List<Type> types = new List<Type>() ;
  109. try {
  110. types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(_optionsMonitor.Get("Default").ScanModel);
  111. } catch(Exception e) {
  112. Console.WriteLine(e.StackTrace);
  113. }
  114. foreach (Type type in types)
  115. {
  116. string PartitionKey = AzureCosmosUtil.GetPartitionKey(type);
  117. string CollectionName = "";
  118. bool cache = false;
  119. bool monitor = false;
  120. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  121. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  122. {
  123. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  124. }
  125. else
  126. {
  127. throw new BizException("必须指定容器名", ResponseCode.PARAMS_ERROR);
  128. }
  129. if (attributes.First<CosmosDBAttribute>().Cache)
  130. {
  131. cache = attributes.First<CosmosDBAttribute>().Cache;
  132. }
  133. if (attributes.First<CosmosDBAttribute>().Monitor)
  134. {
  135. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  136. if (monitor)
  137. {
  138. isMonitor = true;
  139. }
  140. }
  141. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  142. if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out AzureCosmosModel AzureCosmosModel))
  143. { //更新RU
  144. AzureCosmosModel.cache = cache;
  145. CosmosContainer container = GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id);
  146. AzureCosmosModel cosmos = new AzureCosmosModel { container = container, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey };
  147. CosmosDict.nameCosmos[CollectionName] = cosmos;
  148. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  149. }
  150. else
  151. {
  152. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
  153. if (!string.IsNullOrEmpty(PartitionKey))
  154. {
  155. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  156. }
  157. CosmosDatabase database = GetCosmosClient().GetDatabase(attributes.First().Database);
  158. CosmosContainer containerWithConsistentIndexing = database.GetContainer(CollectionName);
  159. AzureCosmosModel cosmos = new AzureCosmosModel { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey,database=database };
  160. CosmosDict.nameCosmos[CollectionName] = cosmos;
  161. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  162. }
  163. }
  164. if (isMonitor)
  165. {
  166. if (DatabaseIds != null)
  167. {
  168. foreach (string DatabaseId in DatabaseIds)
  169. {
  170. CosmosDatabase database = GetCosmosClient().GetDatabase(DatabaseId);
  171. CosmosContainer leaseContainer = database.GetContainer(leases);
  172. CosmosDict.nameCosmos.TryAdd(leases, new AzureCosmosModel { container = leaseContainer, cache = false, monitor = false, partitionKey = "/id", database = database });
  173. }
  174. }
  175. }
  176. }
  177. }
  178. }