AzureCosmosExtensions.cs 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821
  1. using Microsoft.Azure.Cosmos.Table;
  2. using Microsoft.Azure.Cosmos.Table.Queryable;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using System.Linq;
  10. using Azure;
  11. using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
  12. using System.IO;
  13. using TEAMModelOS.SDK.DI;
  14. using System.Diagnostics;
  15. using Azure.Cosmos;
  16. using System.Text.Json;
  17. using System.Net;
  18. using TEAMModelOS.SDK.Context.Exception;
  19. using System.Linq.Expressions;
  20. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  21. using TEAMModelOS.SDK.Helper.Common.LogHelper;
  22. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  23. namespace TEAMModelOS.SDK.DI
  24. {
  25. public static class AzureCosmosExtensions
  26. {
  27. /// <summary>
  28. /// 缓存前缀
  29. /// </summary>
  30. private const string CacheCosmosPrefix = "cosmos:";
  31. /// <summary>
  32. /// ttl时长 1秒
  33. /// </summary>
  34. private const int ttl = 1;
  35. /// <summary>
  36. /// 分页大小
  37. /// </summary>
  38. private const int pageSize = 200;
  39. /// <summary>
  40. /// 超时时间
  41. /// </summary>
  42. private const int timeoutSeconds = 86400;
  43. public static int RU(this Response response)
  44. {
  45. try
  46. {
  47. response.Headers.TryGetValue("x-ms-request-charge", out var value);
  48. var ru = Convert.ToInt32(value);
  49. return ru;
  50. }
  51. catch (Exception)
  52. {
  53. return 0;
  54. }
  55. }
  56. public static async Task<T> FindByIdPk<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
  57. {
  58. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  59. try
  60. {
  61. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  62. return response.Value;
  63. }
  64. catch (Exception x){
  65. return default(T);
  66. }
  67. }
  68. public static async Task<List<T>> FindByIds<T>(this AzureCosmosFactory azureCosmosFactory, List<string> ids) where T : ID
  69. {
  70. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  71. if (container.cache && RedisHelper.Instance != null)
  72. {
  73. List<T> list = new List<T>();
  74. List<string> NotIn = new List<string>();
  75. foreach (string id in ids)
  76. {
  77. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  78. {
  79. NotIn.Add(id);
  80. }
  81. else
  82. {
  83. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  84. }
  85. }
  86. if (NotIn.IsNotEmpty())
  87. {
  88. List<T> noInList = await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  89. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  90. list.AddRange(noInList);
  91. }
  92. return list;
  93. }
  94. else
  95. {
  96. return await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", ids.ToArray() } });
  97. }
  98. }
  99. public static async Task<T> Save<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  100. {
  101. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  102. entity.pk = container.type.Name;
  103. entity.ttl = -1;
  104. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  105. if (container.cache && RedisHelper.Instance != null)
  106. {
  107. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  108. {
  109. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  110. }
  111. else
  112. {
  113. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  114. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  115. }
  116. }
  117. return response.Value;
  118. }
  119. public static async Task<List<T>> SaveAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  120. {
  121. Type type = typeof(T);
  122. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  123. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  124. bool flag = false;
  125. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  126. {
  127. flag = true;
  128. }
  129. string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
  130. Stopwatch stopwatch = Stopwatch.StartNew();
  131. for (int i = 0; i < pages; i++)
  132. {
  133. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  134. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  135. lists.ForEach(async x =>
  136. {
  137. x.pk = type.Name;
  138. x.ttl = -1;
  139. MemoryStream stream = new MemoryStream();
  140. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  141. object o = type.GetProperty(partitionKey).GetValue(x, null);
  142. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  143. itemsToInsert.Add(keyValue);
  144. });
  145. List<Task> tasks = new List<Task>(lists.Count);
  146. itemsToInsert.ForEach(item =>
  147. {
  148. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  149. .ContinueWith((Task<Response> task) =>
  150. {
  151. using (Response response = task.Result)
  152. {
  153. //if (!response.IsSuccessStatusCode)
  154. //{
  155. //}
  156. }
  157. }
  158. ));
  159. });
  160. await Task.WhenAll(tasks);
  161. if (container.cache && RedisHelper.Instance != null)
  162. {
  163. lists.ForEach(async x => {
  164. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  165. });
  166. }
  167. }
  168. if (container.cache && RedisHelper.Instance != null && !flag)
  169. {
  170. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  171. }
  172. stopwatch.Stop();
  173. return enyites;
  174. }
  175. public static async Task<T> SaveOrUpdate<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  176. {
  177. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  178. entity.pk = container.type.Name;
  179. entity.ttl = -1;
  180. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  181. if (container.cache && RedisHelper.Instance != null)
  182. {
  183. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  184. {
  185. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  186. }
  187. else
  188. {
  189. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  190. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  191. }
  192. }
  193. return response.Value;
  194. }
  195. public static async Task<List<T>> SaveOrUpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  196. {
  197. Type type = typeof(T);
  198. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  199. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  200. bool flag = false;
  201. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  202. {
  203. flag = true;
  204. }
  205. string partitionKey = AzureCosmosUtil.GetPartitionKey(type);
  206. Stopwatch stopwatch = Stopwatch.StartNew();
  207. for (int i = 0; i < pages; i++)
  208. {
  209. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  210. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  211. lists.ForEach(async x =>
  212. {
  213. x.pk = type.Name;
  214. x.ttl = -1;
  215. MemoryStream stream = new MemoryStream();
  216. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  217. object o = type.GetProperty(partitionKey).GetValue(x, null);
  218. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  219. itemsToInsert.Add(keyValue);
  220. });
  221. List<Task> tasks = new List<Task>(lists.Count);
  222. itemsToInsert.ForEach(item =>
  223. {
  224. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  225. .ContinueWith((Task<Response> task) =>
  226. {
  227. }
  228. ));
  229. });
  230. await Task.WhenAll(tasks);
  231. if (container.cache && RedisHelper.Instance != null)
  232. {
  233. lists.ForEach(async x => {
  234. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  235. });
  236. }
  237. }
  238. if (container.cache && RedisHelper.Instance != null && !flag)
  239. {
  240. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  241. }
  242. stopwatch.Stop();
  243. return enyites;
  244. }
  245. public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict)
  246. {
  247. Type type = typeof(T);
  248. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  249. string pk = type.Name;
  250. StringBuilder sql = new StringBuilder("select value count(c) from c");
  251. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  252. if (cosmosDbQuery == null)
  253. {
  254. return new List<int> { 0 };
  255. }
  256. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  257. AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  258. return await ResultsFromFeedIterator(query);
  259. }
  260. public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory,JsonElement json)
  261. {
  262. Type type = typeof(T);
  263. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  264. string pk = type.Name;
  265. StringBuilder sql = new StringBuilder("select value count(c) from c");
  266. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(json, sql, pk);
  267. if (cosmosDbQuery == null)
  268. {
  269. return new List<int> { 0 };
  270. }
  271. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  272. AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  273. return await ResultsFromFeedIterator(query);
  274. }
  275. public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict)
  276. {
  277. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  278. {
  279. string pk = container.type.Name;
  280. dict.Remove("@CURRPAGE");
  281. dict.Remove("@PAGESIZE");
  282. dict.Remove("@ASC");
  283. dict.Remove("@DESC");
  284. StringBuilder sql = new StringBuilder("select value count(c) from c");
  285. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  286. if (cosmosDbQuery == null)
  287. {
  288. return new List<dynamic> { 0 };
  289. }
  290. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  291. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  292. return await ResultsFromFeedIterator(query);
  293. }
  294. else
  295. {
  296. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  297. }
  298. }
  299. public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName,JsonElement json)
  300. {
  301. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  302. {
  303. Dictionary<string, object> dict = new Dictionary<string, object>();
  304. var emobj = json.EnumerateObject();
  305. while (emobj.MoveNext())
  306. {
  307. if (emobj.Current.Name != "@CURRPAGE"||
  308. emobj.Current.Name != "@PAGESIZE" ||
  309. emobj.Current.Name != "@ASC" ||
  310. emobj.Current.Name != "@DESC") {
  311. dict[emobj.Current.Name] = emobj.Current.Value;
  312. }
  313. }
  314. string pk = container.type.Name;
  315. StringBuilder sql = new StringBuilder("select value count(c) from c");
  316. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  317. if (cosmosDbQuery == null)
  318. {
  319. return new List<dynamic> { 0 };
  320. }
  321. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  322. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  323. return await ResultsFromFeedIterator(query);
  324. }
  325. else
  326. {
  327. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  328. }
  329. }
  330. public static async Task<T> Update<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  331. {
  332. Type type = typeof(T);
  333. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  334. string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
  335. entity.pk = type.Name;
  336. entity.ttl = -1;
  337. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  338. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  339. if (container.cache && RedisHelper.Instance != null)
  340. {
  341. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  342. {
  343. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  344. }
  345. else
  346. {
  347. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  348. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  349. }
  350. }
  351. return response.Value;
  352. }
  353. internal class Item
  354. {
  355. public string id { get; set; }
  356. public string pk { get; set; }
  357. public MemoryStream stream { get; set; }
  358. }
  359. public static async Task<List<T>> UpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  360. {
  361. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  362. //{
  363. // Task.WaitAll(Update(item));
  364. //}));
  365. Type type = typeof(T);
  366. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  367. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  368. bool flag = false;
  369. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  370. {
  371. flag = true;
  372. }
  373. string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
  374. Stopwatch stopwatch = Stopwatch.StartNew();
  375. for (int i = 0; i < pages; i++)
  376. {
  377. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  378. List<Item> itemsToInsert = new List<Item>();
  379. lists.ForEach(async x =>
  380. {
  381. x.pk = type.Name;
  382. x.ttl = -1;
  383. MemoryStream stream = new MemoryStream();
  384. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  385. object o = type.GetProperty(partitionKey).GetValue(x, null);
  386. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  387. itemsToInsert.Add(keyValue);
  388. });
  389. List<Task> tasks = new List<Task>(lists.Count);
  390. itemsToInsert.ForEach(item =>
  391. {
  392. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  393. .ContinueWith((Task<Response> task) =>
  394. {
  395. //using (ResponseMessage response = task.Result)
  396. //{
  397. // if (!response.IsSuccessStatusCode)
  398. // {
  399. // }
  400. //}
  401. }
  402. ));
  403. });
  404. await Task.WhenAll(tasks);
  405. if (container.cache && RedisHelper.Instance != null)
  406. {
  407. lists.ForEach(async x => {
  408. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  409. });
  410. }
  411. }
  412. if (container.cache && RedisHelper.Instance != null && !flag)
  413. {
  414. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  415. }
  416. stopwatch.Stop();
  417. return enyites;
  418. }
  419. public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  420. {
  421. StringBuilder sql;
  422. sql = SQLHelper.GetSQLSelect(propertys);
  423. string pk = typeof(T).Name;
  424. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  425. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  426. return await ResultsFromQueryAndOptions<T>(azureCosmosFactory,cosmosDbQuery, queryRequestOptions);
  427. }
  428. public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, JsonElement jsonElement, List<string> propertys = null) where T : ID
  429. {
  430. StringBuilder sql;
  431. sql = SQLHelper.GetSQLSelect(propertys);
  432. string pk = typeof(T).Name;
  433. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(jsonElement, sql, pk);
  434. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  435. return await ResultsFromQueryAndOptions<T>(azureCosmosFactory, cosmosDbQuery, queryRequestOptions);
  436. }
  437. public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, JsonElement json, List<string> propertys = null)
  438. {
  439. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  440. {
  441. string pk = container.type.Name;
  442. StringBuilder sql;
  443. sql = SQLHelper.GetSQLSelect(propertys);
  444. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(json, sql, pk);
  445. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  446. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  447. return await ResultsFromFeedIterator(query);
  448. }
  449. else
  450. {
  451. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  452. }
  453. }
  454. public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
  455. {
  456. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  457. {
  458. string pk = container.type.Name;
  459. StringBuilder sql;
  460. sql = SQLHelper.GetSQLSelect(propertys);
  461. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  462. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  463. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  464. return await ResultsFromFeedIterator(query);
  465. }
  466. else
  467. {
  468. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  469. }
  470. }
  471. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<IdPk> ids) where T : ID
  472. {
  473. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  474. List<IdPk> idPks = new List<IdPk>();
  475. if (container.monitor)
  476. {
  477. List<T> list = await azureCosmosFactory.FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
  478. list = await azureCosmosFactory. DeleteTTL(list);
  479. return ids;
  480. }
  481. else
  482. {
  483. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  484. Stopwatch stopwatch = Stopwatch.StartNew();
  485. for (int i = 0; i < pages; i++)
  486. {
  487. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  488. List<Task> tasks = new List<Task>(lists.Count);
  489. lists.ForEach(item =>
  490. {
  491. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  492. .ContinueWith((Task<Response> task) =>
  493. {
  494. using (Response response = task.Result)
  495. {
  496. idPks.Add(new IdPk { id = item.id, pk = item.pk, Status = response.Status });
  497. // if (!response.IsSuccessStatusCode)
  498. // {
  499. // }
  500. }
  501. }
  502. ));
  503. });
  504. await Task.WhenAll(tasks);
  505. if (container.cache && RedisHelper.Instance != null)
  506. {
  507. lists.ForEach(async x => {
  508. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  509. });
  510. }
  511. }
  512. stopwatch.Stop();
  513. return idPks;
  514. }
  515. }
  516. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict) where T : ID
  517. {
  518. if (dict.Keys.Count > 0)
  519. {
  520. List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
  521. return await azureCosmosFactory.DeleteAll(list);
  522. }
  523. else
  524. {
  525. throw new BizException("参数为空", 500);
  526. }
  527. }
  528. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, JsonElement dict) where T : ID
  529. {
  530. List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
  531. return await azureCosmosFactory.DeleteAll(list);
  532. }
  533. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  534. {
  535. Type type = typeof(T);
  536. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel (type.Name);
  537. List<IdPk> idPks = new List<IdPk>();
  538. //log4net 日志記錄
  539. string uuidKey = Guid.NewGuid().ToString();
  540. string logkey = "\r\n【" + uuidKey + "】\r\n";
  541. LogHelper.Info(new object(),
  542. logkey
  543. + "删除------->>\r\n"
  544. + "表:"
  545. + type.Name + "\r\n"
  546. + "数据:"
  547. + enyites.ToApiJson()
  548. + "\r\n" + logkey);
  549. string pk = AzureCosmosUtil.GetPartitionKey (type);
  550. if (container.monitor)
  551. {
  552. enyites = await azureCosmosFactory.DeleteTTL(enyites);
  553. foreach (T t in enyites)
  554. {
  555. object o = type.GetProperty(pk).GetValue(t, null);
  556. idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
  557. }
  558. return idPks;
  559. }
  560. else
  561. {
  562. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  563. Stopwatch stopwatch = Stopwatch.StartNew();
  564. for (int i = 0; i < pages; i++)
  565. {
  566. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  567. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  568. lists.ForEach(x =>
  569. {
  570. object o = type.GetProperty(pk).GetValue(x, null);
  571. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  572. itemsToInsert.Add(keyValue);
  573. });
  574. List<Task> tasks = new List<Task>(lists.Count);
  575. itemsToInsert.ForEach(item =>
  576. {
  577. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  578. .ContinueWith((Task<Response> task) =>
  579. {
  580. using (Response response = task.Result)
  581. {
  582. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), Status = response.Status });
  583. }
  584. }
  585. ));
  586. });
  587. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  588. {
  589. lists.ForEach(async x => {
  590. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  591. });
  592. }
  593. }
  594. stopwatch.Stop();
  595. return idPks;
  596. }
  597. }
  598. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, IdPk idPk) where T : ID
  599. {
  600. return await DeleteAsync<T>(azureCosmosFactory, idPk.id, idPk.pk);
  601. }
  602. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
  603. {
  604. // pk =AzureCosmosUtil.GetPartitionKey<T>();
  605. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  606. if (container.monitor)
  607. {
  608. List<T> list = await FindByDict<T>(azureCosmosFactory,new Dictionary<string, object>() { { "id", id } });
  609. if (list.Count > 0)
  610. {
  611. await DeleteTTL<T>(azureCosmosFactory, list);
  612. return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
  613. }
  614. else
  615. {
  616. throw new BizException("未找到ID匹配的数据,删除失败");
  617. }
  618. }
  619. else
  620. {
  621. Response response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  622. if (container.cache && RedisHelper.Instance != null)
  623. {
  624. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  625. }
  626. return new IdPk { id = id, pk = pk, Status = response.Status };
  627. }
  628. }
  629. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  630. {
  631. Type type = typeof(T);
  632. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  633. string partitionKey = AzureCosmosUtil.GetPartitionKey (type);
  634. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  635. if (container.monitor)
  636. {
  637. await DeleteTTL<T>(azureCosmosFactory, new List<T>() { entity });
  638. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
  639. }
  640. else
  641. {
  642. Response response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  643. if (container.cache && RedisHelper.Instance != null)
  644. {
  645. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  646. }
  647. return new IdPk { id = entity.id, pk = partitionKey, Status = response.Status };
  648. }
  649. }
  650. public static async Task<List<T>> FindSQL<T>(this AzureCosmosFactory azureCosmosFactory, string sql, Dictionary<string, object> Parameters = null) where T : ID
  651. {
  652. if (sql.Contains(".pk"))
  653. {
  654. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  655. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  656. if (Parameters != null)
  657. {
  658. AzureCosmosQuery cosmosDbQuery = new AzureCosmosQuery
  659. {
  660. QueryText = sql,
  661. Parameters = Parameters
  662. };
  663. AsyncPageable<T> feedIterator = container.container
  664. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  665. return await ResultsFromFeedIterator(feedIterator);
  666. }
  667. else
  668. {
  669. QueryDefinition queryDefinition = new QueryDefinition(sql);
  670. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  671. }
  672. }
  673. else
  674. {
  675. throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
  676. }
  677. }
  678. /// <summary>
  679. /// 偌TTL刉壺
  680. /// </summary>
  681. /// <typeparam name="T"></typeparam>
  682. /// <param name="list"></param>
  683. /// <returns></returns>
  684. private static async Task<List<T>> DeleteTTL<T>(this AzureCosmosFactory azureCosmosFactory ,List<T> list) where T : ID
  685. {
  686. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  687. list.ForEach(x => { x.ttl = ttl; });
  688. list = await DeleteTTlALL(azureCosmosFactory,list);
  689. if (container.cache && RedisHelper.Instance != null)
  690. {
  691. list.ForEach(async x => {
  692. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  693. });
  694. }
  695. return list;
  696. }
  697. private static async Task<List<T>> DeleteTTlALL<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  698. {
  699. Type type = typeof(T);
  700. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  701. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  702. bool flag = false;
  703. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  704. {
  705. flag = true;
  706. }
  707. string partitionKey = AzureCosmosUtil.GetPartitionKey (type);
  708. Stopwatch stopwatch = Stopwatch.StartNew();
  709. for (int i = 0; i < pages; i++)
  710. {
  711. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  712. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  713. lists.ForEach(async x =>
  714. {
  715. x.pk = type.Name;
  716. //x.ttl = null;
  717. MemoryStream stream = new MemoryStream();
  718. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  719. object o = type.GetProperty(partitionKey).GetValue(x, null);
  720. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  721. itemsToInsert.Add(keyValue);
  722. });
  723. List<Task> tasks = new List<Task>(lists.Count);
  724. itemsToInsert.ForEach(item =>
  725. {
  726. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  727. .ContinueWith((Task<Response> task) =>
  728. {
  729. //using (ResponseMessage response = task.Result)
  730. //{
  731. // if (!response.IsSuccessStatusCode)
  732. // {
  733. // }
  734. //}
  735. }
  736. ));
  737. });
  738. await Task.WhenAll(tasks);
  739. if (container.cache && RedisHelper.Instance != null)
  740. {
  741. lists.ForEach(async x => {
  742. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  743. });
  744. }
  745. }
  746. if (container.cache && RedisHelper.Instance != null && !flag)
  747. {
  748. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  749. }
  750. stopwatch.Stop();
  751. return enyites;
  752. }
  753. private static QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null, int? maxBufferedItemCount = null, int? maxConcurrency = null)
  754. {
  755. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  756. {
  757. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  758. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  759. MaxConcurrency = maxConcurrency ?? 50
  760. };
  761. return queryRequestOptions;
  762. }
  763. private static int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  764. {
  765. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  766. }
  767. private static async Task<List<T>> ResultsFromQueryAndOptions<T>(this AzureCosmosFactory azureCosmosFactory, AzureCosmosQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  768. {
  769. if (cosmosDbQuery == null)
  770. {
  771. return null;
  772. }
  773. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  774. AsyncPageable<T> query = container.container.GetItemQueryIterator<T>(
  775. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  776. requestOptions: queryOptions);
  777. return await ResultsFromFeedIterator(query);
  778. }
  779. private static async Task<List<T>> ResultsFromFeedIterator<T>(AsyncPageable<T> query, int? maxItemCount = null)
  780. {
  781. List<T> results = new List<T>();
  782. await foreach (T t in query)
  783. {
  784. results.Add(t);
  785. if (results.Count == maxItemCount)
  786. {
  787. return results;
  788. }
  789. }
  790. return results;
  791. }
  792. private static QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  793. {
  794. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  795. {
  796. MaxItemCount = itemsPerPage
  797. };
  798. return queryRequestOptions;
  799. }
  800. }
  801. }