|
@@ -25,23 +25,7 @@ using TEAMModelOS.SDK.Models;
|
|
|
namespace TEAMModelOS.SDK.DI
|
|
|
{
|
|
|
public static class AzureCosmosExtensions
|
|
|
- {
|
|
|
- /// <summary>
|
|
|
- /// 缓存前缀
|
|
|
- /// </summary>
|
|
|
- private const string CacheCosmosPrefix = "cosmos:";
|
|
|
- /// <summary>
|
|
|
- /// ttl时长 1秒
|
|
|
- /// </summary>
|
|
|
- private const int ttl = 1;
|
|
|
- /// <summary>
|
|
|
- /// 分页大小
|
|
|
- /// </summary>
|
|
|
- private const int pageSize = 200;
|
|
|
- /// <summary>
|
|
|
- /// 超时时间
|
|
|
- /// </summary>
|
|
|
- private const int timeoutSeconds = 86400;
|
|
|
+ {
|
|
|
public static double RU(this Response response)
|
|
|
{
|
|
|
try
|
|
@@ -50,11 +34,12 @@ namespace TEAMModelOS.SDK.DI
|
|
|
var ru = Convert.ToDouble(value);
|
|
|
return ru;
|
|
|
}
|
|
|
- catch (Exception)
|
|
|
+ catch
|
|
|
{
|
|
|
return 0;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
public static string GetContinuationToken(this Response response)
|
|
|
{
|
|
|
try
|
|
@@ -62,768 +47,29 @@ namespace TEAMModelOS.SDK.DI
|
|
|
response.Headers.TryGetValue("x-ms-continuation", out var value);
|
|
|
return value;
|
|
|
}
|
|
|
- catch (Exception)
|
|
|
+ catch
|
|
|
{
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
|
- public static async Task<T> FindByIdPk<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : CosmosEntity
|
|
|
- {
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- try
|
|
|
- {
|
|
|
- ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
|
|
|
- return response.Value;
|
|
|
- }
|
|
|
- catch (Exception){
|
|
|
- return default;
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<List<T>> FindByIds<T>(this AzureCosmosFactory azureCosmosFactory, List<string> ids) where T : CosmosEntity
|
|
|
- {
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- List<T> list = new List<T>();
|
|
|
- List<string> NotIn = new List<string>();
|
|
|
- foreach (string id in ids)
|
|
|
- {
|
|
|
- if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
|
|
|
- {
|
|
|
- NotIn.Add(id);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
|
|
|
- }
|
|
|
- }
|
|
|
- if (NotIn.IsNotEmpty())
|
|
|
- {
|
|
|
- List<T> noInList = await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", NotIn.ToArray() } });
|
|
|
- noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
|
|
|
- list.AddRange(noInList);
|
|
|
- }
|
|
|
- return list;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- return await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", ids.ToArray() } });
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<T> Save<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : CosmosEntity
|
|
|
- {
|
|
|
-
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- entity.pk = container.type.Name;
|
|
|
- ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
- {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
- }
|
|
|
- }
|
|
|
- return response.Value;
|
|
|
- }
|
|
|
- public static async Task<List<T>> SaveAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : CosmosEntity
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
- bool flag = false;
|
|
|
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
- {
|
|
|
- flag = true;
|
|
|
- }
|
|
|
- string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
|
|
|
-
|
|
|
- Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
- {
|
|
|
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
|
|
|
- lists.ForEach(async x =>
|
|
|
- {
|
|
|
- x.pk = type.Name;
|
|
|
- MemoryStream stream = new MemoryStream();
|
|
|
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
|
|
|
- object o = type.GetProperty(partitionKey).GetValue(x, null);
|
|
|
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
|
|
|
- itemsToInsert.Add(keyValue);
|
|
|
- });
|
|
|
- List<Task> tasks = new List<Task>(lists.Count);
|
|
|
- itemsToInsert.ForEach(item =>
|
|
|
- {
|
|
|
- tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
|
|
|
- .ContinueWith((Task<Response> task) =>
|
|
|
- {
|
|
|
- using (Response response = task.Result)
|
|
|
- {
|
|
|
- //if (!response.IsSuccessStatusCode)
|
|
|
- //{
|
|
|
- //}
|
|
|
- }
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
- await Task.WhenAll(tasks);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- lists.ForEach(async x => {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- if (container.cache && RedisHelper.Instance != null && !flag)
|
|
|
- {
|
|
|
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
- }
|
|
|
- stopwatch.Stop();
|
|
|
- return enyites;
|
|
|
- }
|
|
|
- public static async Task<T> SaveOrUpdate<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : CosmosEntity
|
|
|
- {
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- entity.pk = container.type.Name;
|
|
|
- ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
- {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
- }
|
|
|
- }
|
|
|
- return response.Value;
|
|
|
- }
|
|
|
- public static async Task<List<T>> SaveOrUpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : CosmosEntity
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
- bool flag = false;
|
|
|
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
- {
|
|
|
- flag = true;
|
|
|
- }
|
|
|
-
|
|
|
- string partitionKey = AzureCosmosUtil.GetPartitionKey(type);
|
|
|
-
|
|
|
- Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
- {
|
|
|
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
|
|
|
- lists.ForEach(async x =>
|
|
|
- {
|
|
|
- x.pk = type.Name;
|
|
|
- MemoryStream stream = new MemoryStream();
|
|
|
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
|
|
|
- object o = type.GetProperty(partitionKey).GetValue(x, null);
|
|
|
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
|
|
|
- itemsToInsert.Add(keyValue);
|
|
|
- });
|
|
|
- List<Task> tasks = new List<Task>(lists.Count);
|
|
|
- itemsToInsert.ForEach(item =>
|
|
|
- {
|
|
|
- tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
|
|
|
- .ContinueWith((Task<Response> task) =>
|
|
|
- {
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
- await Task.WhenAll(tasks);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- lists.ForEach(async x => {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- if (container.cache && RedisHelper.Instance != null && !flag)
|
|
|
- {
|
|
|
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
- }
|
|
|
- stopwatch.Stop();
|
|
|
- return enyites;
|
|
|
- }
|
|
|
- public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict)
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
- string pk = type.Name;
|
|
|
- StringBuilder sql = new StringBuilder("select value count(c) from c");
|
|
|
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
- if (cosmosDbQuery == null)
|
|
|
- {
|
|
|
- return new List<int> { 0 };
|
|
|
- }
|
|
|
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
- AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
- return await ResultsFromFeedIterator(query);
|
|
|
- }
|
|
|
- public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory,JsonElement json)
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
- string pk = type.Name;
|
|
|
- StringBuilder sql = new StringBuilder("select value count(c) from c");
|
|
|
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(json, sql, pk);
|
|
|
- if (cosmosDbQuery == null)
|
|
|
- {
|
|
|
- return new List<int> { 0 };
|
|
|
- }
|
|
|
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
- AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
- return await ResultsFromFeedIterator(query);
|
|
|
- }
|
|
|
- public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict)
|
|
|
- {
|
|
|
- if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
|
|
|
- {
|
|
|
- string pk = container.type.Name;
|
|
|
- dict.Remove("@CURRPAGE");
|
|
|
- dict.Remove("@PAGESIZE");
|
|
|
- dict.Remove("@ASC");
|
|
|
- dict.Remove("@DESC");
|
|
|
- StringBuilder sql = new StringBuilder("select value count(c) from c");
|
|
|
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
- if (cosmosDbQuery == null)
|
|
|
- {
|
|
|
- return new List<dynamic> { 0 };
|
|
|
- }
|
|
|
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
- AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
- return await ResultsFromFeedIterator(query);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName,JsonElement json)
|
|
|
- {
|
|
|
- if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
|
|
|
- {
|
|
|
- Dictionary<string, object> dict = new Dictionary<string, object>();
|
|
|
- var emobj = json.EnumerateObject();
|
|
|
- while (emobj.MoveNext())
|
|
|
- {
|
|
|
- if (emobj.Current.Name != "@CURRPAGE"||
|
|
|
- emobj.Current.Name != "@PAGESIZE" ||
|
|
|
- emobj.Current.Name != "@ASC" ||
|
|
|
- emobj.Current.Name != "@DESC") {
|
|
|
- dict[emobj.Current.Name] = emobj.Current.Value;
|
|
|
- }
|
|
|
- }
|
|
|
- string pk = container.type.Name;
|
|
|
- StringBuilder sql = new StringBuilder("select value count(c) from c");
|
|
|
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
- if (cosmosDbQuery == null)
|
|
|
- {
|
|
|
- return new List<dynamic> { 0 };
|
|
|
- }
|
|
|
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
- AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
- return await ResultsFromFeedIterator(query);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<T> Update<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : CosmosEntity
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
- string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
|
|
|
- entity.pk = type.Name;
|
|
|
- object o = type.GetProperty(partitionKey).GetValue(entity, null);
|
|
|
- ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
- {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
|
|
|
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
- }
|
|
|
- }
|
|
|
- return response.Value;
|
|
|
- }
|
|
|
- internal class Item
|
|
|
- {
|
|
|
- public string id { get; set; }
|
|
|
- public string pk { get; set; }
|
|
|
- public MemoryStream stream { get; set; }
|
|
|
- }
|
|
|
- public static async Task<List<T>> UpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : CosmosEntity
|
|
|
- {
|
|
|
- //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
|
|
|
- //{
|
|
|
- // Task.WaitAll(Update(item));
|
|
|
- //}));
|
|
|
- Type type = typeof(T);
|
|
|
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
- bool flag = false;
|
|
|
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
- {
|
|
|
- flag = true;
|
|
|
- }
|
|
|
- string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
|
|
|
-
|
|
|
- Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
- {
|
|
|
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<Item> itemsToInsert = new List<Item>();
|
|
|
- lists.ForEach(async x =>
|
|
|
- {
|
|
|
- x.pk = type.Name;
|
|
|
- MemoryStream stream = new MemoryStream();
|
|
|
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
|
|
|
- object o = type.GetProperty(partitionKey).GetValue(x, null);
|
|
|
- Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
|
|
|
- itemsToInsert.Add(keyValue);
|
|
|
- });
|
|
|
- List<Task> tasks = new List<Task>(lists.Count);
|
|
|
- itemsToInsert.ForEach(item =>
|
|
|
- {
|
|
|
- tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
|
|
|
- .ContinueWith((Task<Response> task) =>
|
|
|
- {
|
|
|
- //using (ResponseMessage response = task.Result)
|
|
|
- //{
|
|
|
- // if (!response.IsSuccessStatusCode)
|
|
|
- // {
|
|
|
- // }
|
|
|
- //}
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
- await Task.WhenAll(tasks);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- lists.ForEach(async x => {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- if (container.cache && RedisHelper.Instance != null && !flag)
|
|
|
- {
|
|
|
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
- }
|
|
|
- stopwatch.Stop();
|
|
|
- return enyites;
|
|
|
- }
|
|
|
- public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict, List<string> propertys = null) where T : CosmosEntity
|
|
|
- {
|
|
|
- StringBuilder sql;
|
|
|
- sql = SQLHelper.GetSQLSelect(propertys);
|
|
|
- string pk = typeof(T).Name;
|
|
|
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
- return await ResultsFromQueryAndOptions<T>(azureCosmosFactory,cosmosDbQuery, queryRequestOptions);
|
|
|
- }
|
|
|
- public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, JsonElement jsonElement, List<string> propertys = null) where T : CosmosEntity
|
|
|
- {
|
|
|
- StringBuilder sql;
|
|
|
- sql = SQLHelper.GetSQLSelect(propertys);
|
|
|
- string pk = typeof(T).Name;
|
|
|
- //Dictionary<string, object> dict = new Dictionary<string, object>();
|
|
|
- /* var emobj = jsonElement.EnumerateObject();
|
|
|
- while (emobj.MoveNext())
|
|
|
- {
|
|
|
- dict[emobj.Current.Name] = emobj.Current.Value;
|
|
|
- }
|
|
|
- //处理code
|
|
|
- if (dict.TryGetValue("code", out object _))
|
|
|
- {
|
|
|
- dict.Remove("code");
|
|
|
- }*/
|
|
|
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(jsonElement, sql, pk);
|
|
|
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
- return await ResultsFromQueryAndOptions<T>(azureCosmosFactory, cosmosDbQuery, queryRequestOptions);
|
|
|
- }
|
|
|
- public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, JsonElement json, List<string> propertys = null)
|
|
|
- {
|
|
|
- if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
|
|
|
- {
|
|
|
-
|
|
|
- string pk = container.type.Name;
|
|
|
- StringBuilder sql;
|
|
|
- sql = SQLHelper.GetSQLSelect(propertys);
|
|
|
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(json, sql, pk);
|
|
|
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
- AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
- return await ResultsFromFeedIterator(query);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
|
|
|
- {
|
|
|
- if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
|
|
|
- {
|
|
|
-
|
|
|
- string pk = container.type.Name;
|
|
|
- StringBuilder sql;
|
|
|
- sql = SQLHelper.GetSQLSelect(propertys);
|
|
|
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
|
|
|
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
|
|
|
- AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
|
|
|
- return await ResultsFromFeedIterator(query);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<IdPk> ids) where T : CosmosEntity
|
|
|
- {
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- List<IdPk> idPks = new List<IdPk>();
|
|
|
- if (container.monitor)
|
|
|
- {
|
|
|
- List<T> list = await azureCosmosFactory.FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
|
|
|
- list = await azureCosmosFactory. DeleteTTL(list);
|
|
|
- return ids;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
|
|
|
- Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
- {
|
|
|
- List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<Task> tasks = new List<Task>(lists.Count);
|
|
|
- lists.ForEach(item =>
|
|
|
- {
|
|
|
- tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
|
|
|
- .ContinueWith((Task<Response> task) =>
|
|
|
- {
|
|
|
- using (Response response = task.Result)
|
|
|
- {
|
|
|
- idPks.Add(new IdPk { id = item.id, pk = item.pk, Status = response.Status });
|
|
|
- // if (!response.IsSuccessStatusCode)
|
|
|
- // {
|
|
|
- // }
|
|
|
- }
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
- await Task.WhenAll(tasks);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- lists.ForEach(async x => {
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- stopwatch.Stop();
|
|
|
- return idPks;
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict) where T : CosmosEntity
|
|
|
- {
|
|
|
- if (dict.Keys.Count > 0)
|
|
|
- {
|
|
|
- List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
|
|
|
-
|
|
|
- return await azureCosmosFactory.DeleteAll(list);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("参数为空", 500);
|
|
|
- }
|
|
|
|
|
|
- }
|
|
|
- public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, JsonElement dict) where T : CosmosEntity
|
|
|
+ public static async Task<int> GetCount(this CosmosContainer container, string partitionkey, string queryWhere = "WHERE 1=1")
|
|
|
{
|
|
|
- List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
|
|
|
- return await azureCosmosFactory.DeleteAll(list);
|
|
|
- }
|
|
|
- public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : CosmosEntity
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel (type.Name);
|
|
|
- List<IdPk> idPks = new List<IdPk>();
|
|
|
-
|
|
|
- string pk = AzureCosmosUtil.GetPartitionKey (type);
|
|
|
- if (container.monitor)
|
|
|
+ int totalCount = 0;
|
|
|
+ await foreach (var item in container.GetItemQueryStreamIterator(
|
|
|
+ queryText: $"SELECT VALUE COUNT(1) From c {queryWhere}",
|
|
|
+ requestOptions: new QueryRequestOptions() { PartitionKey = new PartitionKey(partitionkey), MaxItemCount = -1 }))
|
|
|
{
|
|
|
- enyites = await azureCosmosFactory.DeleteTTL(enyites);
|
|
|
- foreach (T t in enyites)
|
|
|
+ using var json = await JsonDocument.ParseAsync(item.ContentStream);
|
|
|
+ if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
|
|
|
{
|
|
|
- object o = type.GetProperty(pk).GetValue(t, null);
|
|
|
- idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
|
|
|
- }
|
|
|
- return idPks;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
- Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
- {
|
|
|
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
|
|
|
- lists.ForEach(x =>
|
|
|
- {
|
|
|
- object o = type.GetProperty(pk).GetValue(x, null);
|
|
|
- KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
|
|
|
- itemsToInsert.Add(keyValue);
|
|
|
- });
|
|
|
-
|
|
|
- List<Task> tasks = new List<Task>(lists.Count);
|
|
|
- itemsToInsert.ForEach(item =>
|
|
|
- {
|
|
|
- tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
|
|
|
- .ContinueWith((Task<Response> task) =>
|
|
|
- {
|
|
|
- using (Response response = task.Result)
|
|
|
- {
|
|
|
-
|
|
|
- idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), Status = response.Status });
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
- await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
|
|
|
+ foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
|
|
|
{
|
|
|
- lists.ForEach(async x => {
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
- });
|
|
|
+ totalCount = obj.GetInt32();
|
|
|
}
|
|
|
}
|
|
|
- stopwatch.Stop();
|
|
|
- return idPks;
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, IdPk idPk) where T : CosmosEntity
|
|
|
- {
|
|
|
- return await DeleteAsync<T>(azureCosmosFactory, idPk.id, idPk.pk);
|
|
|
- }
|
|
|
- public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : CosmosEntity
|
|
|
- {
|
|
|
-
|
|
|
- // pk =AzureCosmosUtil.GetPartitionKey<T>();
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- if (container.monitor)
|
|
|
- {
|
|
|
- List<T> list = await FindByDict<T>(azureCosmosFactory,new Dictionary<string, object>() { { "id", id } });
|
|
|
- if (list.Count > 0)
|
|
|
- {
|
|
|
- await DeleteTTL<T>(azureCosmosFactory, list);
|
|
|
- return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("未找到ID匹配的数据,删除失败");
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- Response response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
|
|
|
- }
|
|
|
- return new IdPk { id = id, pk = pk, Status = response.Status };
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : CosmosEntity
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
- string partitionKey = AzureCosmosUtil.GetPartitionKey (type);
|
|
|
-
|
|
|
- object o = type.GetProperty(partitionKey).GetValue(entity, null);
|
|
|
- if (container.monitor)
|
|
|
- {
|
|
|
- await DeleteTTL<T>(azureCosmosFactory, new List<T>() { entity });
|
|
|
- return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- Response response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
|
|
|
- }
|
|
|
- return new IdPk { id = entity.id, pk = partitionKey, Status = response.Status };
|
|
|
- }
|
|
|
- }
|
|
|
- public static async Task<List<T>> FindSQL<T>(this AzureCosmosFactory azureCosmosFactory, string sql, Dictionary<string, object> Parameters = null) where T : CosmosEntity
|
|
|
- {
|
|
|
- if (sql.Contains(".pk"))
|
|
|
- {
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
|
|
|
- if (Parameters != null)
|
|
|
- {
|
|
|
- AzureCosmosQuery cosmosDbQuery = new AzureCosmosQuery
|
|
|
- {
|
|
|
- QueryText = sql,
|
|
|
- Parameters = Parameters
|
|
|
- };
|
|
|
- AsyncPageable<T> feedIterator = container.container
|
|
|
- .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
|
|
|
- return await ResultsFromFeedIterator(feedIterator);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- QueryDefinition queryDefinition = new QueryDefinition(sql);
|
|
|
- return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
|
|
|
- }
|
|
|
- }
|
|
|
- /// <summary>
|
|
|
- /// 偌TTL刉壺
|
|
|
- /// </summary>
|
|
|
- /// <typeparam name="T"></typeparam>
|
|
|
- /// <param name="list"></param>
|
|
|
- /// <returns></returns>
|
|
|
- private static async Task<List<T>> DeleteTTL<T>(this AzureCosmosFactory azureCosmosFactory ,List<T> list) where T : CosmosEntity
|
|
|
- {
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- list.ForEach(x => { x.ttl = ttl; });
|
|
|
- list = await DeleteTTlALL(azureCosmosFactory,list);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- list.ForEach(async x => {
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
- });
|
|
|
- }
|
|
|
- return list;
|
|
|
- }
|
|
|
- private static async Task<List<T>> DeleteTTlALL<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : CosmosEntity
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
|
|
|
- bool flag = false;
|
|
|
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
|
|
|
- {
|
|
|
- flag = true;
|
|
|
- }
|
|
|
- string partitionKey = AzureCosmosUtil.GetPartitionKey (type);
|
|
|
-
|
|
|
- Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
- {
|
|
|
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
|
|
|
- lists.ForEach(async x =>
|
|
|
- {
|
|
|
- x.pk = type.Name;
|
|
|
- //x.ttl = null;
|
|
|
- MemoryStream stream = new MemoryStream();
|
|
|
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
|
|
|
- object o = type.GetProperty(partitionKey).GetValue(x, null);
|
|
|
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
|
|
|
- itemsToInsert.Add(keyValue);
|
|
|
- });
|
|
|
- List<Task> tasks = new List<Task>(lists.Count);
|
|
|
- itemsToInsert.ForEach(item =>
|
|
|
- {
|
|
|
- tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
|
|
|
- .ContinueWith((Task<Response> task) =>
|
|
|
- {
|
|
|
- //using (ResponseMessage response = task.Result)
|
|
|
- //{
|
|
|
- // if (!response.IsSuccessStatusCode)
|
|
|
- // {
|
|
|
- // }
|
|
|
- //}
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
- await Task.WhenAll(tasks);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- lists.ForEach(async x => {
|
|
|
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- if (container.cache && RedisHelper.Instance != null && !flag)
|
|
|
- {
|
|
|
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
|
|
|
}
|
|
|
- stopwatch.Stop();
|
|
|
- return enyites;
|
|
|
- }
|
|
|
- private static QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null, int? maxBufferedItemCount = null, int? maxConcurrency = null)
|
|
|
- {
|
|
|
- QueryRequestOptions queryRequestOptions = new QueryRequestOptions
|
|
|
- {
|
|
|
- MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
|
|
|
- MaxBufferedItemCount = maxBufferedItemCount ?? 100,
|
|
|
- MaxConcurrency = maxConcurrency ?? 50
|
|
|
- };
|
|
|
- return queryRequestOptions;
|
|
|
- }
|
|
|
- private static int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
|
|
|
- {
|
|
|
- return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
|
|
|
- }
|
|
|
- private static async Task<List<T>> ResultsFromQueryAndOptions<T>(this AzureCosmosFactory azureCosmosFactory, AzureCosmosQuery cosmosDbQuery, QueryRequestOptions queryOptions)
|
|
|
- {
|
|
|
- if (cosmosDbQuery == null)
|
|
|
- {
|
|
|
- return null;
|
|
|
- }
|
|
|
- AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
|
|
|
- AsyncPageable<T> query = container.container.GetItemQueryIterator<T>(
|
|
|
- queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
|
|
|
- requestOptions: queryOptions);
|
|
|
-
|
|
|
- return await ResultsFromFeedIterator(query);
|
|
|
- }
|
|
|
- private static async Task<List<T>> ResultsFromFeedIterator<T>(AsyncPageable<T> query, int? maxItemCount = null)
|
|
|
- {
|
|
|
- List<T> results = new List<T>();
|
|
|
-
|
|
|
- await foreach (T t in query)
|
|
|
- {
|
|
|
- results.Add(t);
|
|
|
- if (results.Count == maxItemCount)
|
|
|
- {
|
|
|
- return results;
|
|
|
- }
|
|
|
- }
|
|
|
- return results;
|
|
|
- }
|
|
|
- private static QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
|
|
|
- {
|
|
|
- QueryRequestOptions queryRequestOptions = new QueryRequestOptions
|
|
|
- {
|
|
|
- MaxItemCount = itemsPerPage
|
|
|
- };
|
|
|
-
|
|
|
- return queryRequestOptions;
|
|
|
+ return totalCount;
|
|
|
}
|
|
|
}
|
|
|
}
|