|
@@ -0,0 +1,743 @@
|
|
|
+using Microsoft.Azure.Cosmos.Table;
|
|
|
+using Microsoft.Azure.Cosmos.Table.Queryable;
|
|
|
+using System;
|
|
|
+using System.Collections;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Text;
|
|
|
+using System.Threading;
|
|
|
+using System.Threading.Tasks;
|
|
|
+using System.Linq;
|
|
|
+using Azure;
|
|
|
+using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
|
|
|
+using System.IO;
|
|
|
+using TEAMModelOS.SDK.DI;
|
|
|
+using System.Diagnostics;
|
|
|
+using Azure.Cosmos;
|
|
|
+using System.Text.Json;
|
|
|
+using System.Net;
|
|
|
+using TEAMModelOS.SDK.Context.Exception;
|
|
|
+using System.Linq.Expressions;
|
|
|
+using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
|
|
|
+using TEAMModelOS.SDK.Helper.Common.LogHelper;
|
|
|
+using TEAMModelOS.SDK.Helper.Common.JsonHelper;
|
|
|
+
|
|
|
+namespace TEAMModelOS.SDK.DI
|
|
|
+{
|
|
|
+ public static class AzureCosmosExtensions
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// 缓存前缀
|
|
|
+ /// </summary>
|
|
|
+ private const string CacheCosmosPrefix = "cosmos:";
|
|
|
+ /// <summary>
|
|
|
+ /// ttl时长 1秒
|
|
|
+ /// </summary>
|
|
|
+ private const int ttl = 1;
|
|
|
+ /// <summary>
|
|
|
+ /// 分页大小
|
|
|
+ /// </summary>
|
|
|
+ private const int pageSize = 200;
|
|
|
+ /// <summary>
|
|
|
+ /// 超时时间
|
|
|
+ /// </summary>
|
|
|
+ private const int timeoutSeconds = 86400;
|
|
|
+ public static int RU(this Response response)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ response.Headers.TryGetValue("x-ms-request-charge", out var value);
|
|
|
+ var ru = Convert.ToInt32(value);
|
|
|
+ return ru;
|
|
|
+ }
|
|
|
+ catch (Exception)
|
|
|
+ {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async Task<T> FindByIdPk<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
|
|
|
+ {
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ try
|
|
|
+ {
|
|
|
+ ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
|
|
|
+ return response.Value;
|
|
|
+ }
|
|
|
+ catch (Exception x){
|
|
|
+ return default(T);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async Task<List<T>> FindByIds<T>(this AzureCosmosFactory azureCosmosFactory, List<string> ids) where T : ID
|
|
|
+ {
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ List<T> list = new List<T>();
|
|
|
+ List<string> NotIn = new List<string>();
|
|
|
+ foreach (string id in ids)
|
|
|
+ {
|
|
|
+ if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
|
|
|
+ {
|
|
|
+ NotIn.Add(id);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (NotIn.IsNotEmpty())
|
|
|
+ {
|
|
|
+ List<T> noInList = await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", NotIn.ToArray() } });
|
|
|
+ noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
|
|
|
+ list.AddRange(noInList);
|
|
|
+ }
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", ids.ToArray() } });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async Task<T> Save<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
|
|
|
+ {
|
|
|
+
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ entity.pk = container.type.Name;
|
|
|
+ entity.ttl = -1;
|
|
|
+ ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
+ {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
+ await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return response.Value;
|
|
|
+ }
|
|
|
+ public static async Task<List<T>> SaveAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
|
|
|
+ {
|
|
|
+ Type type = typeof(T);
|
|
|
+ int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
+ bool flag = false;
|
|
|
+ if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
+ {
|
|
|
+ flag = true;
|
|
|
+ }
|
|
|
+ string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
|
|
|
+
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
+ {
|
|
|
+ List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
|
|
|
+ lists.ForEach(async x =>
|
|
|
+ {
|
|
|
+ x.pk = type.Name;
|
|
|
+ x.ttl = -1;
|
|
|
+ MemoryStream stream = new MemoryStream();
|
|
|
+ await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
|
|
|
+ object o = type.GetProperty(partitionKey).GetValue(x, null);
|
|
|
+ KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
|
|
|
+ itemsToInsert.Add(keyValue);
|
|
|
+ });
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ itemsToInsert.ForEach(item =>
|
|
|
+ {
|
|
|
+ tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
|
|
|
+ .ContinueWith((Task<Response> task) =>
|
|
|
+ {
|
|
|
+ using (Response response = task.Result)
|
|
|
+ {
|
|
|
+ //if (!response.IsSuccessStatusCode)
|
|
|
+ //{
|
|
|
+ //}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ });
|
|
|
+ await Task.WhenAll(tasks);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ lists.ForEach(async x => {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (container.cache && RedisHelper.Instance != null && !flag)
|
|
|
+ {
|
|
|
+ await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
+ }
|
|
|
+ stopwatch.Stop();
|
|
|
+ return enyites;
|
|
|
+ }
|
|
|
+ public static async Task<T> SaveOrUpdate<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
|
|
|
+ {
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ entity.pk = container.type.Name;
|
|
|
+ entity.ttl = -1;
|
|
|
+ ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
+ {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
+ await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return response.Value;
|
|
|
+ }
|
|
|
+ public static async Task<List<T>> SaveOrUpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
|
|
|
+ {
|
|
|
+ Type type = typeof(T);
|
|
|
+ int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
+ bool flag = false;
|
|
|
+ if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
+ {
|
|
|
+ flag = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ string partitionKey = AzureCosmosUtil.GetPartitionKey(type);
|
|
|
+
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
+ {
|
|
|
+ List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
|
|
|
+ lists.ForEach(async x =>
|
|
|
+ {
|
|
|
+ x.pk = type.Name;
|
|
|
+ x.ttl = -1;
|
|
|
+ MemoryStream stream = new MemoryStream();
|
|
|
+ await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
|
|
|
+ object o = type.GetProperty(partitionKey).GetValue(x, null);
|
|
|
+ KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
|
|
|
+ itemsToInsert.Add(keyValue);
|
|
|
+ });
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ itemsToInsert.ForEach(item =>
|
|
|
+ {
|
|
|
+ tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
|
|
|
+ .ContinueWith((Task<Response> task) =>
|
|
|
+ {
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ });
|
|
|
+ await Task.WhenAll(tasks);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ lists.ForEach(async x => {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (container.cache && RedisHelper.Instance != null && !flag)
|
|
|
+ {
|
|
|
+ await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
+ }
|
|
|
+ stopwatch.Stop();
|
|
|
+ return enyites;
|
|
|
+ }
|
|
|
+ public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict)
|
|
|
+ {
|
|
|
+ Type type = typeof(T);
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
+ string pk = type.Name;
|
|
|
+ StringBuilder sql = new StringBuilder("select value count(c) from c");
|
|
|
+ AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
+ if (cosmosDbQuery == null)
|
|
|
+ {
|
|
|
+ return new List<int> { 0 };
|
|
|
+ }
|
|
|
+ QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
+ AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
+ return await ResultsFromFeedIterator(query);
|
|
|
+ }
|
|
|
+ public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict)
|
|
|
+ {
|
|
|
+ if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
|
|
|
+ {
|
|
|
+ string pk = container.type.Name;
|
|
|
+ dict.Remove("@CURRPAGE");
|
|
|
+ dict.Remove("@PAGESIZE");
|
|
|
+ dict.Remove("@ASC");
|
|
|
+ dict.Remove("@DESC");
|
|
|
+ StringBuilder sql = new StringBuilder("select value count(c) from c");
|
|
|
+ AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
+ if (cosmosDbQuery == null)
|
|
|
+ {
|
|
|
+ return new List<dynamic> { 0 };
|
|
|
+ }
|
|
|
+ QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
+ AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
+ return await ResultsFromFeedIterator(query);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async Task<T> Update<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
|
|
|
+ {
|
|
|
+ Type type = typeof(T);
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
+ string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
|
|
|
+ entity.pk = type.Name;
|
|
|
+ entity.ttl = -1;
|
|
|
+ object o = type.GetProperty(partitionKey).GetValue(entity, null);
|
|
|
+ ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
+ {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
+ await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return response.Value;
|
|
|
+ }
|
|
|
+ internal class Item
|
|
|
+ {
|
|
|
+ public string id { get; set; }
|
|
|
+ public string pk { get; set; }
|
|
|
+ public MemoryStream stream { get; set; }
|
|
|
+ }
|
|
|
+ public static async Task<List<T>> UpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
|
|
|
+ {
|
|
|
+ //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
|
|
|
+ //{
|
|
|
+ // Task.WaitAll(Update(item));
|
|
|
+ //}));
|
|
|
+ Type type = typeof(T);
|
|
|
+ int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
+ bool flag = false;
|
|
|
+ if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
+ {
|
|
|
+ flag = true;
|
|
|
+ }
|
|
|
+ string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
|
|
|
+
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
+ {
|
|
|
+ List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<Item> itemsToInsert = new List<Item>();
|
|
|
+ lists.ForEach(async x =>
|
|
|
+ {
|
|
|
+ x.pk = type.Name;
|
|
|
+ x.ttl = -1;
|
|
|
+ MemoryStream stream = new MemoryStream();
|
|
|
+ await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
|
|
|
+ object o = type.GetProperty(partitionKey).GetValue(x, null);
|
|
|
+ Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
|
|
|
+ itemsToInsert.Add(keyValue);
|
|
|
+ });
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ itemsToInsert.ForEach(item =>
|
|
|
+ {
|
|
|
+ tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
|
|
|
+ .ContinueWith((Task<Response> task) =>
|
|
|
+ {
|
|
|
+ //using (ResponseMessage response = task.Result)
|
|
|
+ //{
|
|
|
+ // if (!response.IsSuccessStatusCode)
|
|
|
+ // {
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ });
|
|
|
+ await Task.WhenAll(tasks);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ lists.ForEach(async x => {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (container.cache && RedisHelper.Instance != null && !flag)
|
|
|
+ {
|
|
|
+ await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
+ }
|
|
|
+ stopwatch.Stop();
|
|
|
+ return enyites;
|
|
|
+ }
|
|
|
+ public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict, List<string> propertys = null) where T : ID
|
|
|
+ {
|
|
|
+ StringBuilder sql;
|
|
|
+ sql = SQLHelper.GetSQLSelect(propertys);
|
|
|
+ string pk = typeof(T).Name;
|
|
|
+ AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
+ QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
+ return await ResultsFromQueryAndOptions<T>(azureCosmosFactory,cosmosDbQuery, queryRequestOptions);
|
|
|
+ }
|
|
|
+ public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
|
|
|
+ {
|
|
|
+ if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
|
|
|
+ {
|
|
|
+
|
|
|
+ string pk = container.type.Name;
|
|
|
+ StringBuilder sql;
|
|
|
+ sql = SQLHelper.GetSQLSelect(propertys);
|
|
|
+ AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
+ QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
+ AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
+ return await ResultsFromFeedIterator(query);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<IdPk> ids) where T : ID
|
|
|
+ {
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ List<IdPk> idPks = new List<IdPk>();
|
|
|
+ if (container.monitor)
|
|
|
+ {
|
|
|
+ List<T> list = await azureCosmosFactory.FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
|
|
|
+ list = await azureCosmosFactory. DeleteTTL(list);
|
|
|
+ return ids;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
+ {
|
|
|
+ List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ lists.ForEach(item =>
|
|
|
+ {
|
|
|
+ tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
|
|
|
+ .ContinueWith((Task<Response> task) =>
|
|
|
+ {
|
|
|
+ using (Response response = task.Result)
|
|
|
+ {
|
|
|
+ idPks.Add(new IdPk { id = item.id, pk = item.pk, Status = response.Status });
|
|
|
+ // if (!response.IsSuccessStatusCode)
|
|
|
+ // {
|
|
|
+ // }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ });
|
|
|
+ await Task.WhenAll(tasks);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ lists.ForEach(async x => {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ stopwatch.Stop();
|
|
|
+ return idPks;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict) where T : ID
|
|
|
+ {
|
|
|
+ if (dict.Keys.Count > 0)
|
|
|
+ {
|
|
|
+ List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
|
|
|
+
|
|
|
+ return await azureCosmosFactory.DeleteAll(list);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ throw new BizException("参数为空", 500);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
|
|
|
+ {
|
|
|
+ Type type = typeof(T);
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel (type.Name);
|
|
|
+ List<IdPk> idPks = new List<IdPk>();
|
|
|
+ //log4net 日志記錄
|
|
|
+ string uuidKey = Guid.NewGuid().ToString();
|
|
|
+ string logkey = "\r\n【" + uuidKey + "】\r\n";
|
|
|
+ LogHelper.Info(new object(),
|
|
|
+ logkey
|
|
|
+ + "删除------->>\r\n"
|
|
|
+ + "表:"
|
|
|
+ + type.Name + "\r\n"
|
|
|
+ + "数据:"
|
|
|
+ + enyites.ToApiJson()
|
|
|
+ + "\r\n" + logkey);
|
|
|
+ string pk = AzureCosmosUtil.GetPartitionKey (type);
|
|
|
+ if (container.monitor)
|
|
|
+ {
|
|
|
+ enyites = await azureCosmosFactory.DeleteTTL(enyites);
|
|
|
+ foreach (T t in enyites)
|
|
|
+ {
|
|
|
+ object o = type.GetProperty(pk).GetValue(t, null);
|
|
|
+ idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
|
|
|
+ }
|
|
|
+ return idPks;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
+ {
|
|
|
+ List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
|
|
|
+ lists.ForEach(x =>
|
|
|
+ {
|
|
|
+ object o = type.GetProperty(pk).GetValue(x, null);
|
|
|
+ KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
|
|
|
+ itemsToInsert.Add(keyValue);
|
|
|
+ });
|
|
|
+
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ itemsToInsert.ForEach(item =>
|
|
|
+ {
|
|
|
+ tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
|
|
|
+ .ContinueWith((Task<Response> task) =>
|
|
|
+ {
|
|
|
+ using (Response response = task.Result)
|
|
|
+ {
|
|
|
+
|
|
|
+ idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), Status = response.Status });
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ });
|
|
|
+ await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ lists.ForEach(async x => {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ stopwatch.Stop();
|
|
|
+ return idPks;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, IdPk idPk) where T : ID
|
|
|
+ {
|
|
|
+ return await DeleteAsync<T>(azureCosmosFactory, idPk.id, idPk.pk);
|
|
|
+ }
|
|
|
+ public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
|
|
|
+ {
|
|
|
+
|
|
|
+ // pk =AzureCosmosUtil.GetPartitionKey<T>();
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ if (container.monitor)
|
|
|
+ {
|
|
|
+ List<T> list = await FindByDict<T>(azureCosmosFactory,new Dictionary<string, object>() { { "id", id } });
|
|
|
+ if (list.Count > 0)
|
|
|
+ {
|
|
|
+ await DeleteTTL<T>(azureCosmosFactory, list);
|
|
|
+ return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ throw new BizException("未找到ID匹配的数据,删除失败");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ Response response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
|
|
|
+ }
|
|
|
+ return new IdPk { id = id, pk = pk, Status = response.Status };
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
|
|
|
+ {
|
|
|
+ Type type = typeof(T);
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
+ string partitionKey = AzureCosmosUtil.GetPartitionKey (type);
|
|
|
+
|
|
|
+ object o = type.GetProperty(partitionKey).GetValue(entity, null);
|
|
|
+ if (container.monitor)
|
|
|
+ {
|
|
|
+ await DeleteTTL<T>(azureCosmosFactory, new List<T>() { entity });
|
|
|
+ return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ Response response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
|
|
|
+ }
|
|
|
+ return new IdPk { id = entity.id, pk = partitionKey, Status = response.Status };
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async Task<List<T>> FindSQL<T>(this AzureCosmosFactory azureCosmosFactory, string sql, Dictionary<string, object> Parameters = null) where T : ID
|
|
|
+ {
|
|
|
+ if (sql.Contains(".pk"))
|
|
|
+ {
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
|
|
|
+ if (Parameters != null)
|
|
|
+ {
|
|
|
+ AzureCosmosQuery cosmosDbQuery = new AzureCosmosQuery
|
|
|
+ {
|
|
|
+ QueryText = sql,
|
|
|
+ Parameters = Parameters
|
|
|
+ };
|
|
|
+ AsyncPageable<T> feedIterator = container.container
|
|
|
+ .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
|
|
|
+ return await ResultsFromFeedIterator(feedIterator);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ QueryDefinition queryDefinition = new QueryDefinition(sql);
|
|
|
+ return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 偌TTL刉壺
|
|
|
+ /// </summary>
|
|
|
+ /// <typeparam name="T"></typeparam>
|
|
|
+ /// <param name="list"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ private static async Task<List<T>> DeleteTTL<T>(this AzureCosmosFactory azureCosmosFactory ,List<T> list) where T : ID
|
|
|
+ {
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ list.ForEach(x => { x.ttl = ttl; });
|
|
|
+ list = await DeleteTTlALL(azureCosmosFactory,list);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ list.ForEach(async x => {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+ private static async Task<List<T>> DeleteTTlALL<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
|
|
|
+ {
|
|
|
+ Type type = typeof(T);
|
|
|
+ int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
+ bool flag = false;
|
|
|
+ if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
+ {
|
|
|
+ flag = true;
|
|
|
+ }
|
|
|
+ string partitionKey = AzureCosmosUtil.GetPartitionKey (type);
|
|
|
+
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
+ {
|
|
|
+ List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
|
|
|
+ lists.ForEach(async x =>
|
|
|
+ {
|
|
|
+ x.pk = type.Name;
|
|
|
+ //x.ttl = null;
|
|
|
+ MemoryStream stream = new MemoryStream();
|
|
|
+ await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
|
|
|
+ object o = type.GetProperty(partitionKey).GetValue(x, null);
|
|
|
+ KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
|
|
|
+ itemsToInsert.Add(keyValue);
|
|
|
+ });
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ itemsToInsert.ForEach(item =>
|
|
|
+ {
|
|
|
+ tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
|
|
|
+ .ContinueWith((Task<Response> task) =>
|
|
|
+ {
|
|
|
+ //using (ResponseMessage response = task.Result)
|
|
|
+ //{
|
|
|
+ // if (!response.IsSuccessStatusCode)
|
|
|
+ // {
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ });
|
|
|
+ await Task.WhenAll(tasks);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ lists.ForEach(async x => {
|
|
|
+ await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (container.cache && RedisHelper.Instance != null && !flag)
|
|
|
+ {
|
|
|
+ await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
+ }
|
|
|
+ stopwatch.Stop();
|
|
|
+ return enyites;
|
|
|
+ }
|
|
|
+ private static QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null, int? maxBufferedItemCount = null, int? maxConcurrency = null)
|
|
|
+ {
|
|
|
+ QueryRequestOptions queryRequestOptions = new QueryRequestOptions
|
|
|
+ {
|
|
|
+ MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
|
|
|
+ MaxBufferedItemCount = maxBufferedItemCount ?? 100,
|
|
|
+ MaxConcurrency = maxConcurrency ?? 50
|
|
|
+ };
|
|
|
+ return queryRequestOptions;
|
|
|
+ }
|
|
|
+ private static int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
|
|
|
+ {
|
|
|
+ return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
|
|
|
+ }
|
|
|
+ private static async Task<List<T>> ResultsFromQueryAndOptions<T>(this AzureCosmosFactory azureCosmosFactory, AzureCosmosQuery cosmosDbQuery, QueryRequestOptions queryOptions)
|
|
|
+ {
|
|
|
+ if (cosmosDbQuery == null)
|
|
|
+ {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
+ AsyncPageable<T> query = container.container.GetItemQueryIterator<T>(
|
|
|
+ queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
|
|
|
+ requestOptions: queryOptions);
|
|
|
+
|
|
|
+ return await ResultsFromFeedIterator(query);
|
|
|
+ }
|
|
|
+ private static async Task<List<T>> ResultsFromFeedIterator<T>(AsyncPageable<T> query, int? maxItemCount = null)
|
|
|
+ {
|
|
|
+ List<T> results = new List<T>();
|
|
|
+
|
|
|
+ await foreach (T t in query)
|
|
|
+ {
|
|
|
+ results.Add(t);
|
|
|
+ if (results.Count == maxItemCount)
|
|
|
+ {
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+ private static QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
|
|
|
+ {
|
|
|
+ QueryRequestOptions queryRequestOptions = new QueryRequestOptions
|
|
|
+ {
|
|
|
+ MaxItemCount = itemsPerPage
|
|
|
+ };
|
|
|
+
|
|
|
+ return queryRequestOptions;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|