123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- using Azure;
- using Azure.Cosmos;
- using DocumentFormat.OpenXml.Office2010.ExcelAc;
- using Microsoft.Azure.Cosmos;
- using Microsoft.Extensions.Configuration;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading.Tasks;
- using TEAMModelOS.Models;
- using TEAMModelOS.SDK.Context.Attributes.Azure;
- using TEAMModelOS.SDK.Context.Configuration;
- using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
- using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
- using ContainerProperties = Azure.Cosmos.ContainerProperties;
- using CosmosClient = Azure.Cosmos.CosmosClient;
- using CosmosClientOptions = Azure.Cosmos.CosmosClientOptions;
- using OpenXmlPowerTools;
- using System.Diagnostics;
- using System.IO;
- using System.Linq.Expressions;
- using System.Net;
- using System.Reflection;
- using System.Text;
- using System.Text.Json;
- using System.Threading;
- using TEAMModelOS.SDK.Context.Exception;
- using TEAMModelOS.SDK.DI;
- using PartitionKey = Azure.Cosmos.PartitionKey;
- namespace TEAMModelOS.SDK.DI
- {
- public class AzureCosmosFactory
- {
- private readonly IServiceProvider _services;
- private readonly IOptionsMonitor<AzureCosmosFactoryOptions> _optionsMonitor;
- private readonly ILogger _logger;
- //private Option _option;
- private ConcurrentDictionary<string, CosmosClient> CosmosClients { get; } = new ConcurrentDictionary<string, CosmosClient>();
- public AzureCosmosDict CosmosDict { get; set; } = new AzureCosmosDict();
- // private CosmosDatabase database { get; set; }
-
- public AzureCosmosFactory(IServiceProvider services, IOptionsMonitor<AzureCosmosFactoryOptions> optionsMonitor, ILogger<AzureCosmosFactory> logger)
- {
- if (services == null) throw new ArgumentNullException(nameof(services));
- if (optionsMonitor == null) throw new ArgumentNullException(nameof(optionsMonitor));
- _services = services;
- _optionsMonitor = optionsMonitor;
- _logger = logger;
-
- InitializeDatabase().GetAwaiter().GetResult();
- }
- /// <summary>
- /// 取得CosmosClient,支持安全執行緒
- /// </summary>
- /// <param name="name"></param>
- /// <param name="region">可以使用Regions.{區域}設置,指定此屬性後,SDK會首選該區域來執行操作。此外,SDK會自動選擇後備的地理複製區域以實現高可用性。如果未指定此屬性,則SDK會將寫區域用作所有操作的首選區域</param>
- /// <returns></returns>
- public CosmosClient GetCosmosClient(string region = null, string name = "Default")
- {
- try
- {
- var cm = CosmosClients.GetOrAdd(name, x => new CosmosClient(_optionsMonitor.Get(name).CosmosConnectionString, new CosmosClientOptions() { ApplicationRegion = region }));
- return cm;
- }
- catch (Exception e)
- {
- _logger?.LogWarning(e, e.Message);
- return null;
- }
- }
- /// <summary>
- /// 调用表容器信息
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <returns></returns>
- public async Task<AzureCosmosModel> InitializeCollection<T>()
- {
- Type type = typeof(T);
- string partitionKey =AzureCosmosUtil. GetPartitionKey<T>();
- CosmosDBAttribute cosmosDBAttribute = null;
- IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
- if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
- {
- cosmosDBAttribute = attributes.First<CosmosDBAttribute>();
- }
- else
- {
- throw new BizException(type.Name + "未指定CosmosDB表名", ResponseCode.PARAMS_ERROR);
- }
- return await InitializeCollection(cosmosDBAttribute, type.Name, partitionKey);
- }
- /// <summary>
- /// 调用表容器信息
- /// </summary>
- /// <param name="cosmosDBAttribute"></param>
- /// <param name="typeName"></param>
- /// <param name="PartitionKey"></param>
- /// <returns></returns>
- public async Task<AzureCosmosModel> InitializeCollection(CosmosDBAttribute cosmosDBAttribute, string typeName, string PartitionKey)
- {
- /////内存中已经存在这个表则直接返回
- if (CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel AzureCosmosModel))
- {
- return AzureCosmosModel;
- }
- else {
- return null;
- }
- ///如果没有则尝试默认创建
- // else
- // {
- // ContainerProperties containerProperties = new ContainerProperties { Id = cosmosDBAttribute.Name };
- // if (!string.IsNullOrEmpty(PartitionKey))
- // {
- // containerProperties.PartitionKeyPath = "/" + PartitionKey;
- // }
- // CosmosDatabase database = GetCosmosClient().GetDatabase(cosmosDBAttribute.Database);
- // CosmosContainer containerWithConsistentIndexing =await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: cosmosDBAttribute.RU);
- // AzureCosmosModel cosmosModel = new AzureCosmosModel { container = containerWithConsistentIndexing, cache = cosmosDBAttribute.Cache, monitor = cosmosDBAttribute.Monitor,database= database };
- // CosmosDict.nameCosmos.TryAdd(cosmosDBAttribute.Name, cosmosModel);
- // CosmosDict.typeCosmos.TryAdd(typeName, cosmosModel);
- // return cosmosModel;
- // }
- }
- /// <summary>
- /// 初始化数据配置信息
- /// </summary>
- /// <returns></returns>
- public async Task InitializeDatabase()
- {
- int CollectionThroughput = 400;
- string[] DatabaseIds = BaseConfigModel.Configuration.GetSection("Azure:Cosmos:Database").Get<string[]>();
- bool isMonitor = false;
- string leases = "leases";
- if (DatabaseIds != null)
- {
- foreach (string DatabaseId in DatabaseIds)
- {
- CosmosDatabase databaseDef = GetCosmosClient().GetDatabase(DatabaseId);
- AsyncPageable<ContainerProperties> resultSetIterator = databaseDef.GetContainerQueryIterator<ContainerProperties>();
- await foreach (var container in resultSetIterator)
- {
- CosmosDict.nameCosmos.TryAdd(container.Id, new AzureCosmosModel { container = databaseDef.GetContainer(container.Id), partitionKey = container.PartitionKeyPath.Replace("/", ""), cache = false, monitor = false, database = databaseDef });
- }
- }
- }
- //获取数据库所有的表
- List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(BaseConfigModel.Configuration.GetSection("Azure:Cosmos:ScanModel").Get<string[]>() );
- foreach (Type type in types)
- {
- string PartitionKey = AzureCosmosUtil.GetPartitionKey(type);
- string CollectionName = "";
- int RU = 0;
- bool cache = false;
- bool monitor = false;
- IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
- if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
- {
- CollectionName = attributes.First<CosmosDBAttribute>().Name;
- }
- else
- {
- throw new BizException("必须指定容器名", ResponseCode.PARAMS_ERROR);
- }
- if (attributes.First<CosmosDBAttribute>().Cache)
- {
- cache = attributes.First<CosmosDBAttribute>().Cache;
- }
- if (attributes.First<CosmosDBAttribute>().Monitor)
- {
- monitor = attributes.First<CosmosDBAttribute>().Monitor;
- if (monitor)
- {
- isMonitor = true;
- }
- }
- if (attributes.First<CosmosDBAttribute>().RU > 400)
- {
- RU = attributes.First<CosmosDBAttribute>().RU;
- }
- else
- {
- RU = CollectionThroughput;
- }
- //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
- if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out AzureCosmosModel AzureCosmosModel))
- { //更新RU
- AzureCosmosModel.cache = cache;
- CosmosContainer container = GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id);
- int? throughputResponse = await container.ReadThroughputAsync();
- if (throughputResponse < RU)
- {
- await GetCosmosClient().GetDatabase(attributes.First().Database).GetContainer(AzureCosmosModel.container.Id).ReplaceThroughputAsync(RU);
- }
- AzureCosmosModel cosmos = new AzureCosmosModel { container = container, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey };
- CosmosDict.nameCosmos[CollectionName] = cosmos;
- CosmosDict.typeCosmos.Add(type.Name, cosmos);
- }
- else
- {
- ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
- if (!string.IsNullOrEmpty(PartitionKey))
- {
- containerProperties.PartitionKeyPath = "/" + PartitionKey;
- }
- if (RU > CollectionThroughput)
- {
- CollectionThroughput = RU;
- }
- CosmosDatabase database = GetCosmosClient().GetDatabase(attributes.First().Database);
- CosmosContainer containerWithConsistentIndexing =await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
- AzureCosmosModel cosmos = new AzureCosmosModel { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey,database=database };
- CosmosDict.nameCosmos[CollectionName] = cosmos;
- CosmosDict.typeCosmos.Add(type.Name, cosmos);
- }
- }
- if (isMonitor)
- {
- if (DatabaseIds != null)
- {
- foreach (string DatabaseId in DatabaseIds)
- {
- CosmosDatabase database = GetCosmosClient().GetDatabase(DatabaseId);
- ContainerProperties leaseProperties = new ContainerProperties { Id = leases, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
- CosmosContainer leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
- CosmosDict.nameCosmos.TryAdd(leases, new AzureCosmosModel { container = leaseContainer, cache = false, monitor = false, partitionKey = "/id", database = database });
- }
- }
- }
- }
-
- }
- }
|