AzureCosmosExtensions.cs 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. using Microsoft.Azure.Cosmos.Table;
  2. using Microsoft.Azure.Cosmos.Table.Queryable;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using System.Linq;
  10. using Azure;
  11. using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
  12. using System.IO;
  13. using TEAMModelOS.SDK.DI;
  14. using System.Diagnostics;
  15. using Azure.Cosmos;
  16. using System.Text.Json;
  17. using System.Net;
  18. using TEAMModelOS.SDK.Context.Exception;
  19. using System.Linq.Expressions;
  20. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  21. using TEAMModelOS.SDK.Helper.Common.LogHelper;
  22. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  23. namespace TEAMModelOS.SDK.DI
  24. {
  25. public static class AzureCosmosExtensions
  26. {
  27. /// <summary>
  28. /// 缓存前缀
  29. /// </summary>
  30. private const string CacheCosmosPrefix = "cosmos:";
  31. /// <summary>
  32. /// ttl时长 1秒
  33. /// </summary>
  34. private const int ttl = 1;
  35. /// <summary>
  36. /// 分页大小
  37. /// </summary>
  38. private const int pageSize = 200;
  39. /// <summary>
  40. /// 超时时间
  41. /// </summary>
  42. private const int timeoutSeconds = 86400;
  43. public static int RU(this Response response)
  44. {
  45. try
  46. {
  47. response.Headers.TryGetValue("x-ms-request-charge", out var value);
  48. var ru = Convert.ToInt32(value);
  49. return ru;
  50. }
  51. catch (Exception)
  52. {
  53. return 0;
  54. }
  55. }
  56. public static async Task<T> FindByIdPk<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
  57. {
  58. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  59. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  60. return response.Value;
  61. }
  62. public static async Task<List<T>> FindByIds<T>(this AzureCosmosFactory azureCosmosFactory, List<string> ids) where T : ID
  63. {
  64. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  65. if (container.cache && RedisHelper.Instance != null)
  66. {
  67. List<T> list = new List<T>();
  68. List<string> NotIn = new List<string>();
  69. foreach (string id in ids)
  70. {
  71. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  72. {
  73. NotIn.Add(id);
  74. }
  75. else
  76. {
  77. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  78. }
  79. }
  80. if (NotIn.IsNotEmpty())
  81. {
  82. List<T> noInList = await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  83. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  84. list.AddRange(noInList);
  85. }
  86. return list;
  87. }
  88. else
  89. {
  90. return await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", ids.ToArray() } });
  91. }
  92. }
  93. public static async Task<T> Save<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  94. {
  95. AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
  96. entity.pk = container.type.Name;
  97. entity.ttl = null;
  98. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  99. if (container.cache && RedisHelper.Instance != null)
  100. {
  101. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  102. {
  103. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  104. }
  105. else
  106. {
  107. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  108. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  109. }
  110. }
  111. return response.Value;
  112. }
  113. public static async Task<List<T>> SaveAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  114. {
  115. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  116. AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
  117. bool flag = false;
  118. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  119. {
  120. flag = true;
  121. }
  122. string partitionKey =AzureCosmosUtil.GetPartitionKey<T>();
  123. Type type = typeof(T);
  124. Stopwatch stopwatch = Stopwatch.StartNew();
  125. for (int i = 0; i < pages; i++)
  126. {
  127. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  128. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  129. lists.ForEach(async x =>
  130. {
  131. x.pk = type.Name;
  132. x.ttl = null;
  133. MemoryStream stream = new MemoryStream();
  134. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  135. object o = type.GetProperty(partitionKey).GetValue(x, null);
  136. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  137. itemsToInsert.Add(keyValue);
  138. });
  139. List<Task> tasks = new List<Task>(lists.Count);
  140. itemsToInsert.ForEach(item =>
  141. {
  142. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  143. .ContinueWith((Task<Response> task) =>
  144. {
  145. using (Response response = task.Result)
  146. {
  147. //if (!response.IsSuccessStatusCode)
  148. //{
  149. //}
  150. }
  151. }
  152. ));
  153. });
  154. await Task.WhenAll(tasks);
  155. if (container.cache && RedisHelper.Instance != null)
  156. {
  157. lists.ForEach(async x => {
  158. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  159. });
  160. }
  161. }
  162. if (container.cache && RedisHelper.Instance != null && !flag)
  163. {
  164. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  165. }
  166. stopwatch.Stop();
  167. return enyites;
  168. }
  169. public static async Task<T> SaveOrUpdate<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  170. {
  171. AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
  172. entity.pk = container.type.Name;
  173. entity.ttl = null;
  174. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  175. if (container.cache && RedisHelper.Instance != null)
  176. {
  177. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  178. {
  179. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  180. }
  181. else
  182. {
  183. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  184. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  185. }
  186. }
  187. return response.Value;
  188. }
  189. public static async Task<List<T>> SaveOrUpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  190. {
  191. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  192. AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
  193. bool flag = false;
  194. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  195. {
  196. flag = true;
  197. }
  198. string partitionKey =AzureCosmosUtil.GetPartitionKey<T>();
  199. Type type = typeof(T);
  200. Stopwatch stopwatch = Stopwatch.StartNew();
  201. for (int i = 0; i < pages; i++)
  202. {
  203. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  204. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  205. lists.ForEach(async x =>
  206. {
  207. x.pk = type.Name;
  208. x.ttl = null;
  209. MemoryStream stream = new MemoryStream();
  210. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  211. object o = type.GetProperty(partitionKey).GetValue(x, null);
  212. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  213. itemsToInsert.Add(keyValue);
  214. });
  215. List<Task> tasks = new List<Task>(lists.Count);
  216. itemsToInsert.ForEach(item =>
  217. {
  218. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  219. .ContinueWith((Task<Response> task) =>
  220. {
  221. }
  222. ));
  223. });
  224. await Task.WhenAll(tasks);
  225. if (container.cache && RedisHelper.Instance != null)
  226. {
  227. lists.ForEach(async x => {
  228. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  229. });
  230. }
  231. }
  232. if (container.cache && RedisHelper.Instance != null && !flag)
  233. {
  234. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  235. }
  236. stopwatch.Stop();
  237. return enyites;
  238. }
  239. public static async Task<List<dynamic>> UpdateAll(this AzureCosmosFactory azureCosmosFactory, string typeName, List<dynamic> enyites)
  240. {
  241. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  242. if (azureCosmosFactory. CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel container))
  243. {
  244. }
  245. bool flag = false;
  246. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  247. {
  248. flag = true;
  249. }
  250. string partitionKey = container.partitionKey;
  251. Stopwatch stopwatch = Stopwatch.StartNew();
  252. for (int i = 0; i < pages; i++)
  253. {
  254. List<dynamic> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  255. List<Item> itemsToInsert = new List<Item>();
  256. lists.ForEach(async x =>
  257. {
  258. /* x.pk = typeName;
  259. x.ttl = null;*/
  260. MemoryStream stream = new MemoryStream();
  261. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  262. object o = x[partitionKey];
  263. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  264. itemsToInsert.Add(keyValue);
  265. });
  266. List<Task> tasks = new List<Task>(lists.Count);
  267. itemsToInsert.ForEach(item =>
  268. {
  269. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  270. .ContinueWith((Task<Response> task) =>
  271. {
  272. //using (ResponseMessage response = task.Result)
  273. //{
  274. // if (!response.IsSuccessStatusCode)
  275. // {
  276. // }
  277. //}
  278. }
  279. ));
  280. });
  281. await Task.WhenAll(tasks);
  282. if (container.cache && RedisHelper.Instance != null)
  283. {
  284. lists.ForEach(async x => {
  285. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  286. });
  287. }
  288. }
  289. if (container.cache && RedisHelper.Instance != null && !flag)
  290. {
  291. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  292. }
  293. stopwatch.Stop();
  294. return enyites;
  295. }
  296. public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict)
  297. {
  298. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  299. string pk = typeof(T).Name;
  300. StringBuilder sql = new StringBuilder("select value count(c) from c");
  301. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  302. if (cosmosDbQuery == null)
  303. {
  304. return new List<int> { 0 };
  305. }
  306. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  307. AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  308. return await ResultsFromFeedIterator(query);
  309. }
  310. public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict)
  311. {
  312. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  313. {
  314. string pk = container.type.Name;
  315. dict.Remove("@CURRPAGE");
  316. dict.Remove("@PAGESIZE");
  317. dict.Remove("@ASC");
  318. dict.Remove("@DESC");
  319. StringBuilder sql = new StringBuilder("select value count(c) from c");
  320. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  321. if (cosmosDbQuery == null)
  322. {
  323. return new List<dynamic> { 0 };
  324. }
  325. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  326. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  327. return await ResultsFromFeedIterator(query);
  328. }
  329. else
  330. {
  331. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  332. }
  333. }
  334. public static async Task<T> Update<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  335. {
  336. AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
  337. string partitionKey =AzureCosmosUtil.GetPartitionKey<T>();
  338. Type type = typeof(T);
  339. entity.pk = type.Name;
  340. entity.ttl = null;
  341. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  342. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  343. if (container.cache && RedisHelper.Instance != null)
  344. {
  345. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  346. {
  347. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  348. }
  349. else
  350. {
  351. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  352. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  353. }
  354. }
  355. return response.Value;
  356. }
  357. internal class Item
  358. {
  359. public string id { get; set; }
  360. public string pk { get; set; }
  361. public MemoryStream stream { get; set; }
  362. }
  363. public static async Task<List<T>> UpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  364. {
  365. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  366. //{
  367. // Task.WaitAll(Update(item));
  368. //}));
  369. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  370. AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
  371. bool flag = false;
  372. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  373. {
  374. flag = true;
  375. }
  376. string partitionKey =AzureCosmosUtil.GetPartitionKey<T>();
  377. Type type = typeof(T);
  378. Stopwatch stopwatch = Stopwatch.StartNew();
  379. for (int i = 0; i < pages; i++)
  380. {
  381. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  382. List<Item> itemsToInsert = new List<Item>();
  383. lists.ForEach(async x =>
  384. {
  385. x.pk = type.Name;
  386. x.ttl = null;
  387. MemoryStream stream = new MemoryStream();
  388. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  389. object o = type.GetProperty(partitionKey).GetValue(x, null);
  390. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  391. itemsToInsert.Add(keyValue);
  392. });
  393. List<Task> tasks = new List<Task>(lists.Count);
  394. itemsToInsert.ForEach(item =>
  395. {
  396. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  397. .ContinueWith((Task<Response> task) =>
  398. {
  399. //using (ResponseMessage response = task.Result)
  400. //{
  401. // if (!response.IsSuccessStatusCode)
  402. // {
  403. // }
  404. //}
  405. }
  406. ));
  407. });
  408. await Task.WhenAll(tasks);
  409. if (container.cache && RedisHelper.Instance != null)
  410. {
  411. lists.ForEach(async x => {
  412. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  413. });
  414. }
  415. }
  416. if (container.cache && RedisHelper.Instance != null && !flag)
  417. {
  418. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  419. }
  420. stopwatch.Stop();
  421. return enyites;
  422. }
  423. public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  424. {
  425. StringBuilder sql;
  426. sql = SQLHelper.GetSQLSelect(propertys);
  427. string pk = typeof(T).Name;
  428. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  429. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  430. return await ResultsFromQueryAndOptions<T>(azureCosmosFactory,cosmosDbQuery, queryRequestOptions);
  431. }
  432. public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
  433. {
  434. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  435. {
  436. string pk = container.type.Name;
  437. StringBuilder sql;
  438. sql = SQLHelper.GetSQLSelect(propertys);
  439. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  440. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  441. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  442. return await ResultsFromFeedIterator(query);
  443. }
  444. else
  445. {
  446. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  447. }
  448. }
  449. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<IdPk> ids) where T : ID
  450. {
  451. // string pk = GetPartitionKey<T>();
  452. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  453. List<IdPk> idPks = new List<IdPk>();
  454. if (container.monitor)
  455. {
  456. List<T> list = await azureCosmosFactory.FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
  457. list = await azureCosmosFactory. DeleteTTL(list);
  458. return ids;
  459. }
  460. else
  461. {
  462. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  463. Stopwatch stopwatch = Stopwatch.StartNew();
  464. for (int i = 0; i < pages; i++)
  465. {
  466. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  467. List<Task> tasks = new List<Task>(lists.Count);
  468. lists.ForEach(item =>
  469. {
  470. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  471. .ContinueWith((Task<Response> task) =>
  472. {
  473. using (Response response = task.Result)
  474. {
  475. idPks.Add(new IdPk { id = item.id, pk = item.pk, Status = response.Status });
  476. // if (!response.IsSuccessStatusCode)
  477. // {
  478. // }
  479. }
  480. }
  481. ));
  482. });
  483. await Task.WhenAll(tasks);
  484. if (container.cache && RedisHelper.Instance != null)
  485. {
  486. lists.ForEach(async x => {
  487. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  488. });
  489. }
  490. }
  491. stopwatch.Stop();
  492. return idPks;
  493. }
  494. }
  495. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict) where T : ID
  496. {
  497. if (dict.Keys.Count > 0)
  498. {
  499. List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
  500. return await azureCosmosFactory.DeleteAll(list);
  501. }
  502. else
  503. {
  504. throw new BizException("参数为空", 500);
  505. }
  506. }
  507. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  508. {
  509. Type type = typeof(T);
  510. string pk =AzureCosmosUtil. GetPartitionKey<T>();
  511. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  512. List<IdPk> idPks = new List<IdPk>();
  513. //log4net 日志記錄
  514. string uuidKey = Guid.NewGuid().ToString();
  515. string logkey = "\r\n【" + uuidKey + "】\r\n";
  516. LogHelper.Info(default(object),
  517. logkey
  518. + "删除------->>\r\n"
  519. + "表:"
  520. + type.Name + "\r\n"
  521. + "数据:"
  522. + enyites.ToApiJson()
  523. + "\r\n" + logkey);
  524. if (container.monitor)
  525. {
  526. enyites = await azureCosmosFactory.DeleteTTL(enyites);
  527. foreach (T t in enyites)
  528. {
  529. object o = type.GetProperty(pk).GetValue(t, null);
  530. idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
  531. }
  532. return idPks;
  533. }
  534. else
  535. {
  536. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  537. Stopwatch stopwatch = Stopwatch.StartNew();
  538. for (int i = 0; i < pages; i++)
  539. {
  540. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  541. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  542. lists.ForEach(x =>
  543. {
  544. object o = type.GetProperty(pk).GetValue(x, null);
  545. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  546. itemsToInsert.Add(keyValue);
  547. });
  548. List<Task> tasks = new List<Task>(lists.Count);
  549. itemsToInsert.ForEach(item =>
  550. {
  551. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  552. .ContinueWith((Task<Response> task) =>
  553. {
  554. using (Response response = task.Result)
  555. {
  556. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), Status = response.Status });
  557. }
  558. }
  559. ));
  560. });
  561. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  562. {
  563. lists.ForEach(async x => {
  564. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  565. });
  566. }
  567. }
  568. stopwatch.Stop();
  569. return idPks;
  570. }
  571. }
  572. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, IdPk idPk) where T : ID
  573. {
  574. return await DeleteAsync<T>(azureCosmosFactory, idPk.id, idPk.pk);
  575. }
  576. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
  577. {
  578. // pk =AzureCosmosUtil.GetPartitionKey<T>();
  579. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  580. if (container.monitor)
  581. {
  582. List<T> list = await FindByDict<T>(azureCosmosFactory,new Dictionary<string, object>() { { "id", id } });
  583. if (list.Count > 0)
  584. {
  585. await DeleteTTL<T>(azureCosmosFactory, list);
  586. return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
  587. }
  588. else
  589. {
  590. throw new BizException("未找到ID匹配的数据,删除失败");
  591. }
  592. }
  593. else
  594. {
  595. Response response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  596. if (container.cache && RedisHelper.Instance != null)
  597. {
  598. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  599. }
  600. return new IdPk { id = id, pk = pk, Status = response.Status };
  601. }
  602. }
  603. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  604. {
  605. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  606. string partitionKey = AzureCosmosUtil.GetPartitionKey<T>();
  607. Type type = typeof(T);
  608. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  609. if (container.monitor)
  610. {
  611. await DeleteTTL<T>(azureCosmosFactory, new List<T>() { entity });
  612. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
  613. }
  614. else
  615. {
  616. Response response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  617. if (container.cache && RedisHelper.Instance != null)
  618. {
  619. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  620. }
  621. return new IdPk { id = entity.id, pk = partitionKey, Status = response.Status };
  622. }
  623. }
  624. public static async Task<List<T>> FindSQL<T>(this AzureCosmosFactory azureCosmosFactory, string sql, Dictionary<string, object> Parameters = null) where T : ID
  625. {
  626. if (sql.Contains(".pk"))
  627. {
  628. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  629. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  630. if (Parameters != null)
  631. {
  632. AzureCosmosQuery cosmosDbQuery = new AzureCosmosQuery
  633. {
  634. QueryText = sql,
  635. Parameters = Parameters
  636. };
  637. AsyncPageable<T> feedIterator = container.container
  638. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  639. return await ResultsFromFeedIterator(feedIterator);
  640. }
  641. else
  642. {
  643. QueryDefinition queryDefinition = new QueryDefinition(sql);
  644. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  645. }
  646. }
  647. else
  648. {
  649. throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
  650. }
  651. }
  652. /// <summary>
  653. /// 按TTL删除
  654. /// </summary>
  655. /// <typeparam name="T"></typeparam>
  656. /// <param name="list"></param>
  657. /// <returns></returns>
  658. private static async Task<List<T>> DeleteTTL<T>(this AzureCosmosFactory azureCosmosFactory ,List<T> list) where T : ID
  659. {
  660. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  661. list.ForEach(x => { x.ttl = ttl; });
  662. list = await DeleteTTlALL(azureCosmosFactory,list);
  663. if (container.cache && RedisHelper.Instance != null)
  664. {
  665. list.ForEach(async x => {
  666. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  667. });
  668. }
  669. return list;
  670. }
  671. private static async Task<List<T>> DeleteTTlALL<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  672. {
  673. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  674. AzureCosmosModel container = await azureCosmosFactory.InitializeCollection<T>();
  675. bool flag = false;
  676. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  677. {
  678. flag = true;
  679. }
  680. string partitionKey = AzureCosmosUtil.GetPartitionKey<T>();
  681. Type type = typeof(T);
  682. Stopwatch stopwatch = Stopwatch.StartNew();
  683. for (int i = 0; i < pages; i++)
  684. {
  685. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  686. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  687. lists.ForEach(async x =>
  688. {
  689. x.pk = type.Name;
  690. //x.ttl = null;
  691. MemoryStream stream = new MemoryStream();
  692. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  693. object o = type.GetProperty(partitionKey).GetValue(x, null);
  694. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  695. itemsToInsert.Add(keyValue);
  696. });
  697. List<Task> tasks = new List<Task>(lists.Count);
  698. itemsToInsert.ForEach(item =>
  699. {
  700. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  701. .ContinueWith((Task<Response> task) =>
  702. {
  703. //using (ResponseMessage response = task.Result)
  704. //{
  705. // if (!response.IsSuccessStatusCode)
  706. // {
  707. // }
  708. //}
  709. }
  710. ));
  711. });
  712. await Task.WhenAll(tasks);
  713. if (container.cache && RedisHelper.Instance != null)
  714. {
  715. lists.ForEach(async x => {
  716. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  717. });
  718. }
  719. }
  720. if (container.cache && RedisHelper.Instance != null && !flag)
  721. {
  722. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  723. }
  724. stopwatch.Stop();
  725. return enyites;
  726. }
  727. private static QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null, int? maxBufferedItemCount = null, int? maxConcurrency = null)
  728. {
  729. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  730. {
  731. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  732. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  733. MaxConcurrency = maxConcurrency ?? 50
  734. };
  735. return queryRequestOptions;
  736. }
  737. private static int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  738. {
  739. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  740. }
  741. private static async Task<List<T>> ResultsFromQueryAndOptions<T>(this AzureCosmosFactory azureCosmosFactory, AzureCosmosQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  742. {
  743. if (cosmosDbQuery == null)
  744. {
  745. return null;
  746. }
  747. AzureCosmosModel container = await azureCosmosFactory. InitializeCollection<T>();
  748. AsyncPageable<T> query = container.container.GetItemQueryIterator<T>(
  749. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  750. requestOptions: queryOptions);
  751. return await ResultsFromFeedIterator(query);
  752. }
  753. private static async Task<List<T>> ResultsFromFeedIterator<T>(AsyncPageable<T> query, int? maxItemCount = null)
  754. {
  755. List<T> results = new List<T>();
  756. await foreach (T t in query)
  757. {
  758. results.Add(t);
  759. if (results.Count == maxItemCount)
  760. {
  761. return results;
  762. }
  763. }
  764. return results;
  765. }
  766. private static QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  767. {
  768. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  769. {
  770. MaxItemCount = itemsPerPage
  771. };
  772. return queryRequestOptions;
  773. }
  774. }
  775. }