using Azure; using Azure.Cosmos; using DocumentFormat.OpenXml.Office2010.ExcelAc; using Microsoft.Azure.Cosmos; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using TEAMModelOS.Models; using TEAMModelOS.SDK.Context.Attributes.Azure; using TEAMModelOS.SDK.Context.Configuration; using TEAMModelOS.SDK.DI.AzureCosmos.Inner; using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions; using ContainerProperties = Azure.Cosmos.ContainerProperties; using CosmosClient = Azure.Cosmos.CosmosClient; using CosmosClientOptions = Azure.Cosmos.CosmosClientOptions; using OpenXmlPowerTools; using System.Diagnostics; using System.IO; using System.Linq.Expressions; using System.Net; using System.Reflection; using System.Text; using System.Text.Json; using System.Threading; using TEAMModelOS.SDK.Context.Exception; using TEAMModelOS.SDK.DI; using PartitionKey = Azure.Cosmos.PartitionKey; namespace TEAMModelOS.SDK.DI { public class AzureCosmosFactory { private readonly IServiceProvider _services; private readonly IOptionsMonitor _optionsMonitor; private readonly ILogger _logger; //private Option _option; private ConcurrentDictionary CosmosClients { get; } = new ConcurrentDictionary(); public AzureCosmosDict CosmosDict { get; set; } = new AzureCosmosDict(); // private CosmosDatabase database { get; set; } public AzureCosmosFactory(IServiceProvider services, IOptionsMonitor optionsMonitor, ILogger logger) { if (services == null) throw new ArgumentNullException(nameof(services)); if (optionsMonitor == null) throw new ArgumentNullException(nameof(optionsMonitor)); _services = services; _optionsMonitor = optionsMonitor; _logger = logger; InitializeDatabase().GetAwaiter().GetResult(); } /// /// 取得CosmosClient,支持安全執行緒 /// /// /// 可以使用Regions.{區域}設置,指定此屬性後,SDK會首選該區域來執行操作。此外,SDK會自動選擇後備的地理複製區域以實現高可用性。如果未指定此屬性,則SDK會將寫區域用作所有操作的首選區域 /// public CosmosClient GetCosmosClient(string region = null, string name = "Default") { try { var cm = CosmosClients.GetOrAdd(name, x => new CosmosClient(_optionsMonitor.Get(name).CosmosConnectionString, new CosmosClientOptions() { ApplicationRegion = region })); return cm; } catch (Exception e) { _logger?.LogWarning(e, e.Message); return null; } } /// /// 获取容器信息 /// /// /// /// /// public AzureCosmosModel GetCosmosModel( string typeName ) { /////内存中已经存在这个表则直接返回 if (CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel AzureCosmosModel)) { return AzureCosmosModel; } else { return null; } } /// /// 初始化数据配置信息 /// /// public async Task InitializeDatabase() { // int CollectionThroughput = 400; string[] DatabaseIds = BaseConfigModel.Configuration.GetSection("Azure:Cosmos:Database").Get(); bool isMonitor = false; string leases = "leases"; if (DatabaseIds != null) { foreach (string DatabaseId in DatabaseIds) { CosmosDatabase databaseDef = GetCosmosClient().GetDatabase(DatabaseId); AsyncPageable resultSetIterator = databaseDef.GetContainerQueryIterator(); await foreach (var container in resultSetIterator) { CosmosDict.nameCosmos.TryAdd(container.Id, new AzureCosmosModel { container = databaseDef.GetContainer(container.Id), partitionKey = container.PartitionKeyPath.Replace("/", ""), cache = false, monitor = false, database = databaseDef }); } } } //获取数据库所有的表 List types = ReflectorExtensions.GetAllTypeAsAttribute(BaseConfigModel.Configuration.GetSection("Azure:Cosmos:ScanModel").Get() ); foreach (Type type in types) { string PartitionKey = AzureCosmosUtil.GetPartitionKey(type); string CollectionName = ""; // int RU = 0; bool cache = false; bool monitor = false; IEnumerable attributes = type.GetCustomAttributes(true); if (attributes != null && !string.IsNullOrEmpty(attributes.First().Name)) { CollectionName = attributes.First().Name; } else { throw new BizException("必须指定容器名", ResponseCode.PARAMS_ERROR); } if (attributes.First().Cache) { cache = attributes.First().Cache; } if (attributes.First().Monitor) { monitor = attributes.First().Monitor; if (monitor) { isMonitor = true; } } //if (attributes.First().RU > 400) //{ // RU = attributes.First().RU; //} //else //{ // RU = CollectionThroughput; //} //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out AzureCosmosModel AzureCosmosModel)) { //更新RU AzureCosmosModel.cache = cache; CosmosContainer container = GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id); //int? throughputResponse = await container.ReadThroughputAsync(); //if (throughputResponse < RU) //{ // await GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id).ReplaceThroughputAsync(RU); //} AzureCosmosModel cosmos = new AzureCosmosModel { container = container, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey }; CosmosDict.nameCosmos[CollectionName] = cosmos; CosmosDict.typeCosmos.Add(type.Name, cosmos); } else { ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 }; if (!string.IsNullOrEmpty(PartitionKey)) { containerProperties.PartitionKeyPath = "/" + PartitionKey; } //if (RU > CollectionThroughput) //{ // CollectionThroughput = RU; //} CosmosDatabase database = GetCosmosClient().GetDatabase(attributes.First().Database); CosmosContainer containerWithConsistentIndexing =await database.CreateContainerIfNotExistsAsync(containerProperties); AzureCosmosModel cosmos = new AzureCosmosModel { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey,database=database }; CosmosDict.nameCosmos[CollectionName] = cosmos; CosmosDict.typeCosmos.Add(type.Name, cosmos); } } if (isMonitor) { if (DatabaseIds != null) { foreach (string DatabaseId in DatabaseIds) { CosmosDatabase database = GetCosmosClient().GetDatabase(DatabaseId); ContainerProperties leaseProperties = new ContainerProperties { Id = leases, PartitionKeyPath = "/id", DefaultTimeToLive = -1 }; CosmosContainer leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties); CosmosDict.nameCosmos.TryAdd(leases, new AzureCosmosModel { container = leaseContainer, cache = false, monitor = false, partitionKey = "/id", database = database }); } } } } } }