using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Linq; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Linq.Expressions; using System.Net; using System.Reflection; using System.Text; using System.Text.Json; using System.Threading.Tasks; using TEAMModelOS.SDK.Context.Attributes.Azure; using TEAMModelOS.SDK.Context.Exception; using TEAMModelOS.SDK.Helper.Common.CollectionHelper; using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions; using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration; namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3 { internal class CosmosModelInfo { public Container container{ get; set; } public bool cache { get; set; } } public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository { private CosmosClient CosmosClient { get; set; } /// /// 线程安全的dict类型 /// private Dictionary DocumentCollectionDict { get; set; } = new Dictionary(); private string DatabaseId { get; set; } private int CollectionThroughput { get; set; } private Database database { get; set; } int pageSize = 200; private const string CacheCosmosPrefix = "cosmos:"; private string[] ScanModel { get; set; } private const int timeoutSeconds = 86400; public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer) { try { if (!string.IsNullOrEmpty(options.ConnectionString)) { CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient(); } else { throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!"); } DatabaseId = options.Database; CollectionThroughput = options.CollectionThroughput; ScanModel = options.ScanModel; // InitializeDatabase().GetAwaiter().GetResult(); } catch (CosmosException e) { // Dispose(true); throw new BizException(e.Message, 500, e.StackTrace); } } public AzureCosmosDBV3Repository(AzureCosmosDBOptions options) { try { if (!string.IsNullOrEmpty(options.ConnectionString)) { CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient(); } else { throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!"); } DatabaseId = options.Database; CollectionThroughput = options.CollectionThroughput; ScanModel = options.ScanModel; // InitializeDatabase().GetAwaiter().GetResult(); } catch (CosmosException e) { // Dispose(true); throw new BizException(e.Message, 500, e.StackTrace); } } public async Task InitializeDatabase() { try { database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput); FeedIterator resultSetIterator = database.GetContainerQueryIterator(); while (resultSetIterator.HasMoreResults) { foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync()) { DocumentCollectionDict.TryAdd(container.Id, new CosmosModelInfo { container= database.GetContainer(container.Id) ,cache=false}); } } //获取数据库所有的表 List types = ReflectorExtensions.GetAllTypeAsAttribute(ScanModel); foreach (Type type in types) { string PartitionKey = GetPartitionKey(type); string CollectionName = ""; int RU = 0; bool cache = false; IEnumerable attributes = type.GetCustomAttributes(true); if (!string.IsNullOrEmpty(attributes.First().Name)) { CollectionName = attributes.First().Name; } else { CollectionName = type.Name; } if ( attributes.First().Cache) { cache = attributes.First().Cache; } //else //{ // cache = false; //} if (attributes.First().RU > 400) { RU = attributes.First().RU; } else { RU = CollectionThroughput; } //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo)) { //更新RU cosmosModelInfo.cache = cache; int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReadThroughputAsync(); if (throughputResponse < RU) { await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU); } } else { ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName }; if (!string.IsNullOrEmpty(PartitionKey)) { containerProperties.PartitionKeyPath = "/" + PartitionKey; } if (RU > CollectionThroughput) { CollectionThroughput = RU; } Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput); DocumentCollectionDict.TryAdd(CollectionName, new CosmosModelInfo { container= containerWithConsistentIndexing ,cache=cache}); } } } catch (CosmosException e) { throw new BizException(e.Message, 500, e.StackTrace); } } private string GetPartitionKey() { Type type = typeof(T); return GetPartitionKey(type); } private string GetPartitionKey(Type type) { PropertyInfo[] properties = type.GetProperties(); List attrProperties = new List(); foreach (PropertyInfo property in properties) { if (property.Name.Equals("PartitionKey")) { attrProperties.Add(property); break; } object[] attributes = property.GetCustomAttributes(true); foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例, { if (attribute is PartitionKeyAttribute) { attrProperties.Add(property); } } } if (attrProperties.Count <= 0) { throw new BizException(type.Name + "has no PartitionKey !"); } else { if (attrProperties.Count == 1) { return attrProperties[0].Name; } else { throw new BizException("PartitionKey can only be single!"); } } } private async Task InitializeCollection() { Type type = typeof(T); string partitionKey = GetPartitionKey(); string CollectionName; IEnumerable attributes = type.GetCustomAttributes(true); if (!string.IsNullOrEmpty(attributes.First().Name)) { CollectionName = attributes.First().Name; } else { CollectionName = type.Name; } return await InitializeCollection(CollectionName, partitionKey); } private async Task InitializeCollection(string CollectionName, string PartitionKey) { /////内存中已经存在这个表则直接返回 if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo)) { return cosmosModelInfo; }///如果没有则尝试默认创建 else { ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName }; if (!string.IsNullOrEmpty(PartitionKey)) { containerProperties.PartitionKeyPath = "/" + PartitionKey; } Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput); CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = false }; DocumentCollectionDict.TryAdd(CollectionName, cosmosModel); return cosmosModel; } } public async Task> DeleteAll(List> ids) where T : ID { CosmosModelInfo container = await InitializeCollection(); //string partitionKey = GetPartitionKey(); //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) => //{ // Task.WaitAll(DeleteAsync(item.Value, item.Key)); //})); List idPks = new List(); int pages = (int)Math.Ceiling((double)ids.Count / pageSize); Stopwatch stopwatch = Stopwatch.StartNew(); for (int i = 0; i < pages; i++) { List> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList(); List tasks = new List(lists.Count); lists.ForEach(item => { tasks.Add(container.container.DeleteItemStreamAsync(item.Value, new PartitionKey(item.Key)) .ContinueWith((Task task) => { using (ResponseMessage response = task.Result) { idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode }); // if (!response.IsSuccessStatusCode) // { // } } } )); }); await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null) { lists.ForEach(async x => { await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id,x.Value ); }); } } stopwatch.Stop(); return idPks; } public async Task> DeleteAll(Dictionary dict) where T : ID { List list= await FindByDict(dict); return await DeleteAll(list); } public async Task> DeleteAll(List enyites) where T : ID { List idPks = new List(); CosmosModelInfo container = await InitializeCollection(); string pk = GetPartitionKey(); Type type = typeof(T); int pages = (int)Math.Ceiling((double)enyites.Count / pageSize); Stopwatch stopwatch = Stopwatch.StartNew(); for (int i = 0; i < pages; i++) { List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList(); List> itemsToInsert = new List>(); lists.ForEach(x => { object o = type.GetProperty(pk).GetValue(x, null); KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), x.id); itemsToInsert.Add(keyValue); }); List tasks = new List(lists.Count); itemsToInsert.ForEach(item => { tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key) .ContinueWith((Task task) => { using (ResponseMessage response = task.Result) { idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode }); } } )); }); await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null) { lists.ForEach(async x => { await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id); }); } } stopwatch.Stop(); return idPks; } public async Task DeleteAsync(IdPk idPk) where T : ID { return await DeleteAsync(idPk.id, idPk.pk); } public async Task DeleteAsync(string id, string pk) where T : ID { CosmosModelInfo container = await InitializeCollection(); ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk)); if (container.cache && RedisHelper.Instance != null) { await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id); } return new IdPk { id =id, pk = pk, StatusCode = response.StatusCode }; } public async Task DeleteAsync(T entity) where T : ID { CosmosModelInfo container = await InitializeCollection(); string partitionKey = GetPartitionKey(); Type type = typeof(T); object o = type.GetProperty(partitionKey).GetValue(entity, null); ResponseMessage response = await container.container.DeleteItemStreamAsync (id: entity.id, partitionKey: new PartitionKey(o.ToString())); if (container.cache && RedisHelper.Instance != null) { await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id); } return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode }; } //public async Task DeleteAsync(string id) where T : ID //{ // Container container = await InitializeCollection(); // ItemResponse response = await container.DeleteItemAsync(id: id, partitionKey: new PartitionKey(GetPartitionKey())); // return response.Resource; //} public async Task> FindAll(List propertys = null) where T : ID { CosmosModelInfo container = await InitializeCollection(); StringBuilder sql; sql = SQLHelperParametric.GetSQLSelect(propertys); CosmosDbQuery cosmosDbQuery = new CosmosDbQuery {QueryText = sql.ToString() }; FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition); return await ResultsFromFeedIterator(query); } private async Task> ResultsFromFeedIterator(FeedIterator query, int? maxItemCount = null) { List results = new List(); while (query.HasMoreResults) { foreach (T t in await query.ReadNextAsync()) { results.Add(t); if (results.Count == maxItemCount) { return results; } } } return results; } private async Task> ResultsFromFeedIterator(FeedIterator query, Func, Task> batchAction, int itemsPerPage) { List results = new List(); while (query.HasMoreResults) { if (results.Count() >= itemsPerPage) { await batchAction(results); results.Clear(); } results.AddRange(await query.ReadNextAsync()); } if (results.Count() > 0) { await batchAction(results); results.Clear(); } return results; } public async Task> FindByDict(string CollectionName, Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List propertys = null) { if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container)) { //StringBuilder sql = new StringBuilder("select value(c) from c"); //SQLHelper.GetSQL(dict, ref sql); //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery //{ // QueryText = sql.ToString() //}; StringBuilder sql; sql = SQLHelperParametric.GetSQLSelect(propertys); CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql); QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount)); FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions); return await ResultsFromFeedIterator(query, maxItemCount); } else { throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!"); } } public async Task> FindCountByDict(string CollectionName, Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) { if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container)) { dict.Remove("@CURRPAGE"); dict.Remove("@PAGESIZE"); dict.Remove("@ASC"); dict.Remove("@DESC"); StringBuilder sql = new StringBuilder("select value count(c) from c"); CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql); QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount)); FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions); return await ResultsFromFeedIterator(query, maxItemCount); } else { throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!"); } } public async Task> FindByParams(Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List propertys = null) where T : ID { return await FindByDict(dict, itemsPerPage, maxItemCount, partitionKey, propertys); } public async Task> FindByDict(Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null,List propertys = null) where T : ID { StringBuilder sql; sql = SQLHelperParametric.GetSQLSelect(propertys); CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql); QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount)); return await ResultsFromQueryAndOptions(cosmosDbQuery, queryRequestOptions); } private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null) { CosmosModelInfo container = await InitializeCollection(); FeedIterator query = container.container.GetItemQueryIterator( queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions); return await ResultsFromFeedIterator(query, maxItemCount); } private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount) { return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage); } private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null, int? maxBufferedItemCount = null, int? maxConcurrency = null) { QueryRequestOptions queryRequestOptions = new QueryRequestOptions { MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage, MaxBufferedItemCount = maxBufferedItemCount ?? 100, MaxConcurrency = maxConcurrency ?? 50 }; return queryRequestOptions; } private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, Func, Task> batchAction, QueryRequestOptions queryOptions) { CosmosModelInfo container = await InitializeCollection(); FeedIterator query = container.container.GetItemQueryIterator( queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions); return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0); } private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage) { QueryRequestOptions queryRequestOptions = new QueryRequestOptions { MaxItemCount = itemsPerPage }; return queryRequestOptions; } public async Task> FindLinq(Expression> query = null, Expression> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID { //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage); QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount)); FeedIterator feedIterator; CosmosModelInfo container = await InitializeCollection(); if (query == null) { if (order != null) { if (isDesc) { feedIterator = container.container .GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderByDescending(order) .ToFeedIterator(); } else { feedIterator = container.container .GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderBy(order) .ToFeedIterator(); } } else { feedIterator = container.container .GetItemLinqQueryable(requestOptions: queryRequestOptions) .ToFeedIterator(); } } else { if (order != null) { if (isDesc) { feedIterator = container.container .GetItemLinqQueryable(requestOptions: queryRequestOptions) .Where(query).OrderByDescending(order) .ToFeedIterator(); } else { feedIterator = container.container .GetItemLinqQueryable(requestOptions: queryRequestOptions) .Where(query).OrderBy(order) .ToFeedIterator(); } } else { feedIterator = container.container .GetItemLinqQueryable(requestOptions: queryRequestOptions) .Where(query) .ToFeedIterator(); } } return await ResultsFromFeedIterator(feedIterator); } public async Task> FindSQL(string sql, Dictionary Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID { CosmosModelInfo container = await InitializeCollection(); QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount)); if (Parameters != null) { CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = sql, Parameters = Parameters }; FeedIterator feedIterator = container.container .GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions); return await ResultsFromFeedIterator(feedIterator); } else { QueryDefinition queryDefinition = new QueryDefinition(sql); return await ResultsFromFeedIterator(container.container.GetItemQueryIterator(queryDefinition)); } } public async Task Save(T entity) where T : ID { try { CosmosModelInfo container = await InitializeCollection(); ItemResponse response = await container.container.CreateItemAsync(entity); if (container.cache && RedisHelper.Instance!=null) { if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id)) { await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity); } else { await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity); await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds); } } return response.Resource; } catch (Exception e) { throw new BizException(e.Message); } } public async Task> SaveAll(List enyites) where T : ID { int pages = (int)Math.Ceiling((double)enyites.Count / pageSize); CosmosModelInfo container = await InitializeCollection(); bool flag = false; if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id)) { flag = true; } string pk = GetPartitionKey(); Type type = typeof(T); Stopwatch stopwatch = Stopwatch.StartNew(); for (int i = 0; i < pages; i++) { List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList(); List> itemsToInsert = new List>(); lists.ForEach(async x => { MemoryStream stream = new MemoryStream(); await JsonSerializer.SerializeAsync(stream, x ,new JsonSerializerOptions { IgnoreNullValues=true}); object o = type.GetProperty(pk).GetValue(x, null); KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream); itemsToInsert.Add(keyValue); }); List tasks = new List(lists.Count); itemsToInsert.ForEach(item => { tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key) .ContinueWith((Task task) => { using (ResponseMessage response = task.Result) { if (!response.IsSuccessStatusCode) { } } } )); }); await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null) { lists.ForEach(async x => { await RedisHelper.HSetAsync(CacheCosmosPrefix+container.container.Id, x.id, x); }); } } if (container.cache && RedisHelper.Instance != null&&!flag) { await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds); } stopwatch.Stop(); return enyites; } public async Task SaveOrUpdate(T entity) where T : ID { CosmosModelInfo container = await InitializeCollection(); ItemResponse response = await container.container.UpsertItemAsync(item: entity); if (container.cache && RedisHelper.Instance != null) { if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id)) { await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity); } else { await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity); await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds); } } return response.Resource; } public async Task> SaveOrUpdateAll(List enyites) where T : ID { //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) => //{ // Task.WaitAll(Update(item)); //})); int pages = (int)Math.Ceiling((double)enyites.Count / pageSize); CosmosModelInfo container = await InitializeCollection(); bool flag = false; if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id)) { flag = true; } string pk = GetPartitionKey(); Type type = typeof(T); Stopwatch stopwatch = Stopwatch.StartNew(); for (int i = 0; i < pages; i++) { List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList(); List> itemsToInsert = new List>(); lists.ForEach(async x => { MemoryStream stream = new MemoryStream(); await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true }); object o = type.GetProperty(pk).GetValue(x, null); KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream); itemsToInsert.Add(keyValue); }); List tasks = new List(lists.Count); itemsToInsert.ForEach(item => { tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key) .ContinueWith((Task task) => { //using (ResponseMessage response = task.Result) //{ // if (!response.IsSuccessStatusCode) // { // } //} } )); }); await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null) { lists.ForEach(async x => { await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x); }); } } if (container.cache && RedisHelper.Instance != null&&!flag) { await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds); } stopwatch.Stop(); return enyites; } public async Task Update(T entity) where T : ID { CosmosModelInfo container = await InitializeCollection(); string pk = GetPartitionKey(); object o = typeof(T).GetProperty(pk).GetValue(entity, null); ItemResponse response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString())); if (container.cache && RedisHelper.Instance != null) { if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id)) { await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity); } else { await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity); await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds); } } return response.Resource; } internal class Item { public string id { get; set; } public string pk { get; set; } public MemoryStream stream { get; set; } } public async Task> UpdateAll(List enyites) where T : ID { //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) => //{ // Task.WaitAll(Update(item)); //})); int pages = (int)Math.Ceiling((double)enyites.Count / pageSize); CosmosModelInfo container = await InitializeCollection(); bool flag = false; if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id)) { flag = true; } string pk = GetPartitionKey(); Type type = typeof(T); Stopwatch stopwatch = Stopwatch.StartNew(); for (int i = 0; i < pages; i++) { List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList(); List itemsToInsert = new List(); lists.ForEach(async x => { MemoryStream stream = new MemoryStream(); await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true }); object o = type.GetProperty(pk).GetValue(x, null); Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream }; itemsToInsert.Add(keyValue); }); List tasks = new List(lists.Count); itemsToInsert.ForEach(item => { tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk)) .ContinueWith((Task task) => { //using (ResponseMessage response = task.Result) //{ // if (!response.IsSuccessStatusCode) // { // } //} } )); }); await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null) { lists.ForEach(async x => { await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x); }); } } if (container.cache && RedisHelper.Instance != null&&!flag) { await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds); } stopwatch.Stop(); return enyites; } //public void Dispose() //{ // Dispose(true); //} //protected virtual void Dispose(bool disposing) //{ // if (disposing) // { // CosmosClient?.Dispose(); // } //} private async Task FindByIdAsSql(string id) where T : ID { CosmosModelInfo container = await InitializeCollection(); CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = @"SELECT * FROM c WHERE c.id = @id", Parameters = new Dictionary { { "@id",id} } }; FeedIterator feedIterator = container.container .GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition); return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault(); } public async Task FindByIdPk(string id, string pk) where T : ID { CosmosModelInfo container = await InitializeCollection(); ItemResponse response = await container.container.ReadItemAsync(id: id, partitionKey: new PartitionKey(pk)); return response.Resource; } public async Task FindById(string id) where T : ID { CosmosModelInfo container = await InitializeCollection(); if (container.cache && RedisHelper.Instance != null) { return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id , id , timeoutSeconds, () => { return FindByIdAsSql(id); }) ; } else { return await FindByIdAsSql(id); } } public async Task> FindByIds(List ids) where T : ID { CosmosModelInfo container = await InitializeCollection(); if (container.cache && RedisHelper.Instance != null) { List list = new List(); List NotIn = new List(); foreach (string id in ids) { if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id)) { NotIn.Add(id); } else { list.Add(await RedisHelper.HGetAsync(CacheCosmosPrefix + container.container.Id, id)); } } if (NotIn.IsNotEmpty()) { List noInList = await FindByDict(new Dictionary { { "id", NotIn.ToArray() } }); noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); }); list.AddRange(noInList); } return list; } else { return await FindByDict(new Dictionary { { "id", ids.ToArray() } }); } } public async Task FindById(string CollectionName, string id) { if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container)) { if (container.cache && RedisHelper.Instance != null) { return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id,id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary { { "id", id } }); }); } else { return await FindByDict(CollectionName, new Dictionary { { "id", id } }); } } else { throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!"); } } public async Task> FindByIds(string CollectionName, List ids) { if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container)) { if (container.cache && RedisHelper.Instance != null) { return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary { { "id", ids.ToArray() } }); }); } else { return await FindByDict(CollectionName, new Dictionary { { "id", ids.ToArray() } }); } } else { throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!"); } } } }