AzureCosmosFactory.cs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. using Azure;
  2. using Azure.Cosmos;
  3. using Microsoft.Extensions.Configuration;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Options;
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Threading.Tasks;
  11. using TEAMModelOS.Models;
  12. using TEAMModelOS.SDK.Context.Attributes.Azure;
  13. using TEAMModelOS.SDK.Context.Configuration;
  14. using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
  15. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  16. using OpenXmlPowerTools;
  17. using System.Diagnostics;
  18. using System.IO;
  19. using System.Linq.Expressions;
  20. using System.Net;
  21. using System.Reflection;
  22. using System.Text;
  23. using System.Text.Json;
  24. using System.Threading;
  25. using TEAMModelOS.SDK.Context.Exception;
  26. using TEAMModelOS.SDK.DI;
  27. using Azure.Cosmos.Serialization;
  28. namespace TEAMModelOS.SDK.DI
  29. {
  30. public class AzureCosmosFactory
  31. {
  32. private readonly IServiceProvider _services;
  33. private readonly IOptionsMonitor<AzureCosmosFactoryOptions> _optionsMonitor;
  34. private readonly ILogger _logger;
  35. //private Option _option;
  36. private ConcurrentDictionary<string, CosmosClient> CosmosClients { get; } = new ConcurrentDictionary<string, CosmosClient>();
  37. public AzureCosmosDict CosmosDict { get; set; } = new AzureCosmosDict();
  38. // private CosmosDatabase database { get; set; }
  39. public AzureCosmosFactory(IServiceProvider services, IOptionsMonitor<AzureCosmosFactoryOptions> optionsMonitor, ILogger<AzureCosmosFactory> logger)
  40. {
  41. if (services == null) throw new ArgumentNullException(nameof(services));
  42. if (optionsMonitor == null) throw new ArgumentNullException(nameof(optionsMonitor));
  43. _services = services;
  44. _optionsMonitor = optionsMonitor;
  45. _logger = logger;
  46. InitializeDatabase().GetAwaiter().GetResult();
  47. }
  48. /// <summary>
  49. /// 取得CosmosClient,支持安全執行緒
  50. /// </summary>
  51. /// <param name="name"></param>
  52. /// <param name="region">可以使用Regions.{區域}設置,指定此屬性後,SDK會首選該區域來執行操作。此外,SDK會自動選擇後備的地理複製區域以實現高可用性。如果未指定此屬性,則SDK會將寫區域用作所有操作的首選區域</param>
  53. /// <returns></returns>
  54. public CosmosClient GetCosmosClient(string region = null, string name = "Default")
  55. {
  56. try
  57. {
  58. var cm = CosmosClients.GetOrAdd(name, x => new CosmosClient(_optionsMonitor.Get(name).ConnectionString, new CosmosClientOptions() { ApplicationRegion = region, SerializerOptions = new CosmosSerializationOptions() { PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase } }));
  59. return cm;
  60. }
  61. catch (Exception e)
  62. {
  63. _logger?.LogWarning(e, e.Message);
  64. return null;
  65. }
  66. }
  67. /// <summary>
  68. /// 获取容器信息
  69. /// </summary>
  70. /// <param name="cosmosDBAttribute"></param>
  71. /// <param name="typeName"></param>
  72. /// <param name="PartitionKey"></param>
  73. /// <returns></returns>
  74. public AzureCosmosModel GetCosmosModel( string typeName )
  75. {
  76. /////内存中已经存在这个表则直接返回
  77. if (CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel AzureCosmosModel))
  78. {
  79. return AzureCosmosModel;
  80. }
  81. else {
  82. return null;
  83. }
  84. }
  85. /// <summary>
  86. /// 初始化数据配置信息
  87. /// </summary>
  88. /// <returns></returns>
  89. public async Task InitializeDatabase()
  90. {
  91. // string[] DatabaseIds = BaseConfigModel.Configuration.GetSection("Azure:Cosmos:Database").Get<string[]>();
  92. string[] DatabaseIds =_optionsMonitor.Get("Default").Database;
  93. bool isMonitor = false;
  94. string leases = "leases";
  95. if (DatabaseIds != null)
  96. {
  97. foreach (string DatabaseId in DatabaseIds)
  98. {
  99. CosmosDatabase databaseDef = GetCosmosClient().GetDatabase(DatabaseId);
  100. AsyncPageable<ContainerProperties> resultSetIterator = databaseDef.GetContainerQueryIterator<ContainerProperties>();
  101. await foreach (var container in resultSetIterator)
  102. {
  103. CosmosDict.nameCosmos.TryAdd(container.Id, new AzureCosmosModel { container = databaseDef.GetContainer(container.Id), partitionKey = container.PartitionKeyPath.Replace("/", ""), cache = false, monitor = false, database = databaseDef });
  104. }
  105. }
  106. }
  107. //获取数据库所有的表
  108. // List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(BaseConfigModel.Configuration.GetSection("Azure:Cosmos:ScanModel").Get<string[]>() );
  109. List<Type> types = new List<Type>() ;
  110. try {
  111. types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(_optionsMonitor.Get("Default").ScanModel);
  112. } catch(Exception e) {
  113. Console.WriteLine(e.StackTrace);
  114. }
  115. foreach (Type type in types)
  116. {
  117. string PartitionKey = AzureCosmosUtil.GetPartitionKey(type);
  118. string CollectionName = "";
  119. bool cache = false;
  120. bool monitor = false;
  121. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  122. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  123. {
  124. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  125. }
  126. else
  127. {
  128. throw new BizException("必须指定容器名", ResponseCode.PARAMS_ERROR);
  129. }
  130. if (attributes.First<CosmosDBAttribute>().Cache)
  131. {
  132. cache = attributes.First<CosmosDBAttribute>().Cache;
  133. }
  134. if (attributes.First<CosmosDBAttribute>().Monitor)
  135. {
  136. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  137. if (monitor)
  138. {
  139. isMonitor = true;
  140. }
  141. }
  142. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  143. if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out AzureCosmosModel AzureCosmosModel))
  144. { //更新RU
  145. AzureCosmosModel.cache = cache;
  146. CosmosContainer container = GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id);
  147. AzureCosmosModel cosmos = new AzureCosmosModel { container = container, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey };
  148. CosmosDict.nameCosmos[CollectionName] = cosmos;
  149. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  150. }
  151. else
  152. {
  153. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
  154. if (!string.IsNullOrEmpty(PartitionKey))
  155. {
  156. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  157. }
  158. CosmosDatabase database = GetCosmosClient().GetDatabase(attributes.First().Database);
  159. CosmosContainer containerWithConsistentIndexing = database.GetContainer(CollectionName);
  160. AzureCosmosModel cosmos = new AzureCosmosModel { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey,database=database };
  161. CosmosDict.nameCosmos[CollectionName] = cosmos;
  162. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  163. }
  164. }
  165. if (isMonitor)
  166. {
  167. if (DatabaseIds != null)
  168. {
  169. foreach (string DatabaseId in DatabaseIds)
  170. {
  171. CosmosDatabase database = GetCosmosClient().GetDatabase(DatabaseId);
  172. CosmosContainer leaseContainer = database.GetContainer(leases);
  173. CosmosDict.nameCosmos.TryAdd(leases, new AzureCosmosModel { container = leaseContainer, cache = false, monitor = false, partitionKey = "/id", database = database });
  174. }
  175. }
  176. }
  177. }
  178. }
  179. }