123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798 |
- using Microsoft.Azure.Cosmos.Table;
- using Microsoft.Azure.Cosmos.Table.Queryable;
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Linq;
- using Azure;
- using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
- using System.IO;
- using TEAMModelOS.SDK.DI;
- using System.Diagnostics;
- using Azure.Cosmos;
- using System.Text.Json;
- using System.Net;
- using TEAMModelOS.SDK.Context.Exception;
- using System.Linq.Expressions;
- using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
- using TEAMModelOS.SDK.Helper.Common.LogHelper;
- using TEAMModelOS.SDK.Helper.Common.JsonHelper;
- namespace TEAMModelOS.SDK.DI
- {
- public static class AzureCosmosExtensions
- {
- /// <summary>
- /// 缓存前缀
- /// </summary>
- private const string CacheCosmosPrefix = "cosmos:";
- /// <summary>
- /// ttl时长 1秒
- /// </summary>
- private const int ttl = 1;
- /// <summary>
- /// 分页大小
- /// </summary>
- private const int pageSize = 200;
- /// <summary>
- /// 超时时间
- /// </summary>
- private const int timeoutSeconds = 86400;
- public static int RU(this Response response)
- {
- try
- {
- response.Headers.TryGetValue("x-ms-request-charge", out var value);
- var ru = Convert.ToInt32(value);
- return ru;
- }
- catch (Exception)
- {
- return 0;
- }
- }
- public static async Task<T> FindByIdPk<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
- {
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
- return response.Value;
- }
- public static async Task<List<T>> FindByIds<T>(this AzureCosmosFactory azureCosmosFactory, List<string> ids) where T : ID
- {
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- if (container.cache && RedisHelper.Instance != null)
- {
- List<T> list = new List<T>();
- List<string> NotIn = new List<string>();
- foreach (string id in ids)
- {
- if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
- {
- NotIn.Add(id);
- }
- else
- {
- list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
- }
- }
- if (NotIn.IsNotEmpty())
- {
- List<T> noInList = await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", NotIn.ToArray() } });
- noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
- list.AddRange(noInList);
- }
- return list;
- }
- else
- {
- return await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", ids.ToArray() } });
- }
- }
- public static async Task<T> Save<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
- {
-
- AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
- entity.pk = container.type.Name;
- entity.ttl = null;
- ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
- if (container.cache && RedisHelper.Instance != null)
- {
- if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
- {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
- }
- else
- {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
- }
- }
- return response.Value;
- }
- public static async Task<List<T>> SaveAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
- {
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
- bool flag = false;
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
- {
- flag = true;
- }
- string partitionKey =AzureCosmosUtil.GetPartitionKey<T>();
- Type type = typeof(T);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
- lists.ForEach(async x =>
- {
- x.pk = type.Name;
- x.ttl = null;
- MemoryStream stream = new MemoryStream();
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
- object o = type.GetProperty(partitionKey).GetValue(x, null);
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
- .ContinueWith((Task<Response> task) =>
- {
- using (Response response = task.Result)
- {
- //if (!response.IsSuccessStatusCode)
- //{
- //}
- }
- }
- ));
- });
- await Task.WhenAll(tasks);
- if (container.cache && RedisHelper.Instance != null)
- {
- lists.ForEach(async x => {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
- });
- }
- }
- if (container.cache && RedisHelper.Instance != null && !flag)
- {
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
- }
- stopwatch.Stop();
- return enyites;
- }
- public static async Task<T> SaveOrUpdate<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
- {
- AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
- entity.pk = container.type.Name;
- entity.ttl = null;
- ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
- if (container.cache && RedisHelper.Instance != null)
- {
- if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
- {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
- }
- else
- {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
- }
- }
- return response.Value;
- }
- public static async Task<List<T>> SaveOrUpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
- {
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
- bool flag = false;
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
- {
- flag = true;
- }
- string partitionKey =AzureCosmosUtil.GetPartitionKey<T>();
- Type type = typeof(T);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
- lists.ForEach(async x =>
- {
- x.pk = type.Name;
- x.ttl = null;
- MemoryStream stream = new MemoryStream();
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
- object o = type.GetProperty(partitionKey).GetValue(x, null);
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
- .ContinueWith((Task<Response> task) =>
- {
- }
- ));
- });
- await Task.WhenAll(tasks);
- if (container.cache && RedisHelper.Instance != null)
- {
- lists.ForEach(async x => {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
- });
- }
- }
- if (container.cache && RedisHelper.Instance != null && !flag)
- {
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
- }
- stopwatch.Stop();
- return enyites;
- }
- public static async Task<List<dynamic>> UpdateAll(this AzureCosmosFactory azureCosmosFactory, string typeName, List<dynamic> enyites)
- {
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- if (azureCosmosFactory. CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel container))
- {
- }
- bool flag = false;
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
- {
- flag = true;
- }
- string partitionKey = container.partitionKey;
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<dynamic> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<Item> itemsToInsert = new List<Item>();
- lists.ForEach(async x =>
- {
- /* x.pk = typeName;
- x.ttl = null;*/
- MemoryStream stream = new MemoryStream();
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
- object o = x[partitionKey];
- Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
- .ContinueWith((Task<Response> task) =>
- {
- //using (ResponseMessage response = task.Result)
- //{
- // if (!response.IsSuccessStatusCode)
- // {
- // }
- //}
- }
- ));
- });
- await Task.WhenAll(tasks);
- if (container.cache && RedisHelper.Instance != null)
- {
- lists.ForEach(async x => {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
- });
- }
- }
- if (container.cache && RedisHelper.Instance != null && !flag)
- {
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
- }
- stopwatch.Stop();
- return enyites;
- }
- public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict)
- {
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- string pk = typeof(T).Name;
- StringBuilder sql = new StringBuilder("select value count(c) from c");
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
- if (cosmosDbQuery == null)
- {
- return new List<int> { 0 };
- }
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
- AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
- return await ResultsFromFeedIterator(query);
- }
- public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict)
- {
- if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
- {
- string pk = container.type.Name;
- dict.Remove("@CURRPAGE");
- dict.Remove("@PAGESIZE");
- dict.Remove("@ASC");
- dict.Remove("@DESC");
- StringBuilder sql = new StringBuilder("select value count(c) from c");
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
- if (cosmosDbQuery == null)
- {
- return new List<dynamic> { 0 };
- }
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
- AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
- return await ResultsFromFeedIterator(query);
- }
- else
- {
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
- }
- }
- public static async Task<T> Update<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
- {
- AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
- string partitionKey =AzureCosmosUtil.GetPartitionKey<T>();
- Type type = typeof(T);
- entity.pk = type.Name;
- entity.ttl = null;
- object o = type.GetProperty(partitionKey).GetValue(entity, null);
- ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
- if (container.cache && RedisHelper.Instance != null)
- {
- if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
- {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
- }
- else
- {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
- }
- }
- return response.Value;
- }
- internal class Item
- {
- public string id { get; set; }
- public string pk { get; set; }
- public MemoryStream stream { get; set; }
- }
- public static async Task<List<T>> UpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
- {
- //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
- //{
- // Task.WaitAll(Update(item));
- //}));
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
- bool flag = false;
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
- {
- flag = true;
- }
- string partitionKey =AzureCosmosUtil.GetPartitionKey<T>();
- Type type = typeof(T);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<Item> itemsToInsert = new List<Item>();
- lists.ForEach(async x =>
- {
- x.pk = type.Name;
- x.ttl = null;
- MemoryStream stream = new MemoryStream();
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
- object o = type.GetProperty(partitionKey).GetValue(x, null);
- Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
- .ContinueWith((Task<Response> task) =>
- {
- //using (ResponseMessage response = task.Result)
- //{
- // if (!response.IsSuccessStatusCode)
- // {
- // }
- //}
- }
- ));
- });
- await Task.WhenAll(tasks);
- if (container.cache && RedisHelper.Instance != null)
- {
- lists.ForEach(async x => {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
- });
- }
- }
- if (container.cache && RedisHelper.Instance != null && !flag)
- {
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
- }
- stopwatch.Stop();
- return enyites;
- }
- public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict, List<string> propertys = null) where T : ID
- {
- StringBuilder sql;
- sql = SQLHelper.GetSQLSelect(propertys);
- string pk = typeof(T).Name;
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
- return await ResultsFromQueryAndOptions<T>(azureCosmosFactory,cosmosDbQuery, queryRequestOptions);
- }
- public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
- {
- if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
- {
- string pk = container.type.Name;
- StringBuilder sql;
- sql = SQLHelper.GetSQLSelect(propertys);
- AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
- AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
- return await ResultsFromFeedIterator(query);
- }
- else
- {
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
- }
- }
- public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<IdPk> ids) where T : ID
- {
- // string pk = GetPartitionKey<T>();
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- List<IdPk> idPks = new List<IdPk>();
- if (container.monitor)
- {
- List<T> list = await azureCosmosFactory.FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
- list = await azureCosmosFactory. DeleteTTL(list);
- return ids;
- }
- else
- {
- int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
- List<Task> tasks = new List<Task>(lists.Count);
- lists.ForEach(item =>
- {
- tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
- .ContinueWith((Task<Response> task) =>
- {
- using (Response response = task.Result)
- {
- idPks.Add(new IdPk { id = item.id, pk = item.pk, Status = response.Status });
- // if (!response.IsSuccessStatusCode)
- // {
- // }
- }
- }
- ));
- });
- await Task.WhenAll(tasks);
- if (container.cache && RedisHelper.Instance != null)
- {
- lists.ForEach(async x => {
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
- });
- }
- }
- stopwatch.Stop();
- return idPks;
- }
- }
- public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict) where T : ID
- {
- if (dict.Keys.Count > 0)
- {
- List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
- return await azureCosmosFactory.DeleteAll(list);
- }
- else
- {
- throw new BizException("参数为空", 500);
- }
- }
- public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
- {
- Type type = typeof(T);
- string pk =AzureCosmosUtil. GetPartitionKey<T>();
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- List<IdPk> idPks = new List<IdPk>();
- //log4net 日志記錄
- string uuidKey = Guid.NewGuid().ToString();
- string logkey = "\r\n【" + uuidKey + "】\r\n";
- LogHelper.Info(default(object),
- logkey
- + "删除------->>\r\n"
- + "表:"
- + type.Name + "\r\n"
- + "数据:"
- + enyites.ToApiJson()
- + "\r\n" + logkey);
- if (container.monitor)
- {
- enyites = await azureCosmosFactory.DeleteTTL(enyites);
- foreach (T t in enyites)
- {
- object o = type.GetProperty(pk).GetValue(t, null);
- idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
- }
- return idPks;
- }
- else
- {
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
- lists.ForEach(x =>
- {
- object o = type.GetProperty(pk).GetValue(x, null);
- KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
- .ContinueWith((Task<Response> task) =>
- {
- using (Response response = task.Result)
- {
- idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), Status = response.Status });
- }
- }
- ));
- });
- await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
- {
- lists.ForEach(async x => {
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
- });
- }
- }
- stopwatch.Stop();
- return idPks;
- }
- }
- public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, IdPk idPk) where T : ID
- {
- return await DeleteAsync<T>(azureCosmosFactory, idPk.id, idPk.pk);
- }
- public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
- {
- // pk =AzureCosmosUtil.GetPartitionKey<T>();
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- if (container.monitor)
- {
- List<T> list = await FindByDict<T>(azureCosmosFactory,new Dictionary<string, object>() { { "id", id } });
- if (list.Count > 0)
- {
- await DeleteTTL<T>(azureCosmosFactory, list);
- return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
- }
- else
- {
- throw new BizException("未找到ID匹配的数据,删除失败");
- }
- }
- else
- {
- Response response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
- if (container.cache && RedisHelper.Instance != null)
- {
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
- }
- return new IdPk { id = id, pk = pk, Status = response.Status };
- }
- }
- public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
- {
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- string partitionKey = AzureCosmosUtil.GetPartitionKey<T>();
- Type type = typeof(T);
- object o = type.GetProperty(partitionKey).GetValue(entity, null);
- if (container.monitor)
- {
- await DeleteTTL<T>(azureCosmosFactory, new List<T>() { entity });
- return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
- }
- else
- {
- Response response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
- if (container.cache && RedisHelper.Instance != null)
- {
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
- }
- return new IdPk { id = entity.id, pk = partitionKey, Status = response.Status };
- }
- }
- public static async Task<List<T>> FindSQL<T>(this AzureCosmosFactory azureCosmosFactory, string sql, Dictionary<string, object> Parameters = null) where T : ID
- {
- if (sql.Contains(".pk"))
- {
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
- if (Parameters != null)
- {
- AzureCosmosQuery cosmosDbQuery = new AzureCosmosQuery
- {
- QueryText = sql,
- Parameters = Parameters
- };
- AsyncPageable<T> feedIterator = container.container
- .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
- return await ResultsFromFeedIterator(feedIterator);
- }
- else
- {
- QueryDefinition queryDefinition = new QueryDefinition(sql);
- return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
- }
- }
- else
- {
- throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
- }
- }
- /// <summary>
- /// 按TTL删除
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="list"></param>
- /// <returns></returns>
- private static async Task<List<T>> DeleteTTL<T>(this AzureCosmosFactory azureCosmosFactory ,List<T> list) where T : ID
- {
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- list.ForEach(x => { x.ttl = ttl; });
- list = await DeleteTTlALL(azureCosmosFactory,list);
- if (container.cache && RedisHelper.Instance != null)
- {
- list.ForEach(async x => {
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
- });
- }
- return list;
- }
- private static async Task<List<T>> DeleteTTlALL<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
- {
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
- bool flag = false;
- if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
- {
- flag = true;
- }
- string partitionKey = AzureCosmosUtil.GetPartitionKey<T>();
- Type type = typeof(T);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
- lists.ForEach(async x =>
- {
- x.pk = type.Name;
- //x.ttl = null;
- MemoryStream stream = new MemoryStream();
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
- object o = type.GetProperty(partitionKey).GetValue(x, null);
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
- .ContinueWith((Task<Response> task) =>
- {
- //using (ResponseMessage response = task.Result)
- //{
- // if (!response.IsSuccessStatusCode)
- // {
- // }
- //}
- }
- ));
- });
- await Task.WhenAll(tasks);
- if (container.cache && RedisHelper.Instance != null)
- {
- lists.ForEach(async x => {
- await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
- });
- }
- }
- if (container.cache && RedisHelper.Instance != null && !flag)
- {
- await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
- }
- stopwatch.Stop();
- return enyites;
- }
- private static QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null, int? maxBufferedItemCount = null, int? maxConcurrency = null)
- {
- QueryRequestOptions queryRequestOptions = new QueryRequestOptions
- {
- MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
- MaxBufferedItemCount = maxBufferedItemCount ?? 100,
- MaxConcurrency = maxConcurrency ?? 50
- };
- return queryRequestOptions;
- }
- private static int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
- {
- return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
- }
- private static async Task<List<T>> ResultsFromQueryAndOptions<T>(this AzureCosmosFactory azureCosmosFactory, AzureCosmosQuery cosmosDbQuery, QueryRequestOptions queryOptions)
- {
- if (cosmosDbQuery == null)
- {
- return null;
- }
- AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
- AsyncPageable<T> query = container.container.GetItemQueryIterator<T>(
- queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
- requestOptions: queryOptions);
- return await ResultsFromFeedIterator(query);
- }
- private static async Task<List<T>> ResultsFromFeedIterator<T>(AsyncPageable<T> query, int? maxItemCount = null)
- {
- List<T> results = new List<T>();
-
- await foreach (T t in query)
- {
- results.Add(t);
- if (results.Count == maxItemCount)
- {
- return results;
- }
- }
- return results;
- }
- private static QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
- {
- QueryRequestOptions queryRequestOptions = new QueryRequestOptions
- {
- MaxItemCount = itemsPerPage
- };
- return queryRequestOptions;
- }
- }
- }
|