AzureCosmosExtensions.cs 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. using Microsoft.Azure.Cosmos.Table;
  2. using Microsoft.Azure.Cosmos.Table.Queryable;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using System.Linq;
  10. using Azure;
  11. using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
  12. using System.IO;
  13. using TEAMModelOS.SDK.DI;
  14. using System.Diagnostics;
  15. using Azure.Cosmos;
  16. using System.Text.Json;
  17. using System.Net;
  18. using TEAMModelOS.SDK.Context.Exception;
  19. using System.Linq.Expressions;
  20. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  21. using TEAMModelOS.SDK.Helper.Common.LogHelper;
  22. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  23. using TEAMModelOS.SDK.Extension;
  24. namespace TEAMModelOS.SDK.DI
  25. {
  26. public static class AzureCosmosExtensions
  27. {
  28. /// <summary>
  29. /// 缓存前缀
  30. /// </summary>
  31. private const string CacheCosmosPrefix = "cosmos:";
  32. /// <summary>
  33. /// ttl时长 1秒
  34. /// </summary>
  35. private const int ttl = 1;
  36. /// <summary>
  37. /// 分页大小
  38. /// </summary>
  39. private const int pageSize = 200;
  40. /// <summary>
  41. /// 超时时间
  42. /// </summary>
  43. private const int timeoutSeconds = 86400;
  44. public static int RU(this Response response)
  45. {
  46. try
  47. {
  48. response.Headers.TryGetValue("x-ms-request-charge", out var value);
  49. var ru = Convert.ToInt32(value);
  50. return ru;
  51. }
  52. catch (Exception)
  53. {
  54. return 0;
  55. }
  56. }
  57. public static string GetContinuationToken(this Response response)
  58. {
  59. try
  60. {
  61. response.Headers.TryGetValue("x-ms-continuation", out var value);
  62. return value;
  63. }
  64. catch (Exception)
  65. {
  66. return null;
  67. }
  68. }
  69. public static async Task<T> FindByIdPk<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
  70. {
  71. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  72. try
  73. {
  74. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  75. return response.Value;
  76. }
  77. catch (Exception x){
  78. return default(T);
  79. }
  80. }
  81. public static async Task<List<T>> FindByIds<T>(this AzureCosmosFactory azureCosmosFactory, List<string> ids) where T : ID
  82. {
  83. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  84. if (container.cache && RedisHelper.Instance != null)
  85. {
  86. List<T> list = new List<T>();
  87. List<string> NotIn = new List<string>();
  88. foreach (string id in ids)
  89. {
  90. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  91. {
  92. NotIn.Add(id);
  93. }
  94. else
  95. {
  96. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  97. }
  98. }
  99. if (NotIn.IsNotEmpty())
  100. {
  101. List<T> noInList = await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  102. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  103. list.AddRange(noInList);
  104. }
  105. return list;
  106. }
  107. else
  108. {
  109. return await FindByDict<T>(azureCosmosFactory, new Dictionary<string, object> { { "id", ids.ToArray() } });
  110. }
  111. }
  112. public static async Task<T> Save<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  113. {
  114. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  115. entity.pk = container.type.Name;
  116. entity.ttl = -1;
  117. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  118. if (container.cache && RedisHelper.Instance != null)
  119. {
  120. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  121. {
  122. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  123. }
  124. else
  125. {
  126. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  127. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  128. }
  129. }
  130. return response.Value;
  131. }
  132. public static async Task<List<T>> SaveAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  133. {
  134. Type type = typeof(T);
  135. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  136. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  137. bool flag = false;
  138. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  139. {
  140. flag = true;
  141. }
  142. string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
  143. Stopwatch stopwatch = Stopwatch.StartNew();
  144. for (int i = 0; i < pages; i++)
  145. {
  146. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  147. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  148. lists.ForEach(async x =>
  149. {
  150. x.pk = type.Name;
  151. x.ttl = -1;
  152. MemoryStream stream = new MemoryStream();
  153. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  154. object o = type.GetProperty(partitionKey).GetValue(x, null);
  155. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  156. itemsToInsert.Add(keyValue);
  157. });
  158. List<Task> tasks = new List<Task>(lists.Count);
  159. itemsToInsert.ForEach(item =>
  160. {
  161. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  162. .ContinueWith((Task<Response> task) =>
  163. {
  164. using (Response response = task.Result)
  165. {
  166. //if (!response.IsSuccessStatusCode)
  167. //{
  168. //}
  169. }
  170. }
  171. ));
  172. });
  173. await Task.WhenAll(tasks);
  174. if (container.cache && RedisHelper.Instance != null)
  175. {
  176. lists.ForEach(async x => {
  177. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  178. });
  179. }
  180. }
  181. if (container.cache && RedisHelper.Instance != null && !flag)
  182. {
  183. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  184. }
  185. stopwatch.Stop();
  186. return enyites;
  187. }
  188. public static async Task<T> SaveOrUpdate<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  189. {
  190. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  191. entity.pk = container.type.Name;
  192. entity.ttl = -1;
  193. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  194. if (container.cache && RedisHelper.Instance != null)
  195. {
  196. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  197. {
  198. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  199. }
  200. else
  201. {
  202. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  203. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  204. }
  205. }
  206. return response.Value;
  207. }
  208. public static async Task<List<T>> SaveOrUpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  209. {
  210. Type type = typeof(T);
  211. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  212. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  213. bool flag = false;
  214. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  215. {
  216. flag = true;
  217. }
  218. string partitionKey = AzureCosmosUtil.GetPartitionKey(type);
  219. Stopwatch stopwatch = Stopwatch.StartNew();
  220. for (int i = 0; i < pages; i++)
  221. {
  222. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  223. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  224. lists.ForEach(async x =>
  225. {
  226. x.pk = type.Name;
  227. x.ttl = -1;
  228. MemoryStream stream = new MemoryStream();
  229. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  230. object o = type.GetProperty(partitionKey).GetValue(x, null);
  231. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  232. itemsToInsert.Add(keyValue);
  233. });
  234. List<Task> tasks = new List<Task>(lists.Count);
  235. itemsToInsert.ForEach(item =>
  236. {
  237. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  238. .ContinueWith((Task<Response> task) =>
  239. {
  240. }
  241. ));
  242. });
  243. await Task.WhenAll(tasks);
  244. if (container.cache && RedisHelper.Instance != null)
  245. {
  246. lists.ForEach(async x => {
  247. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  248. });
  249. }
  250. }
  251. if (container.cache && RedisHelper.Instance != null && !flag)
  252. {
  253. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  254. }
  255. stopwatch.Stop();
  256. return enyites;
  257. }
  258. public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict)
  259. {
  260. Type type = typeof(T);
  261. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  262. string pk = type.Name;
  263. StringBuilder sql = new StringBuilder("select value count(c) from c");
  264. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  265. if (cosmosDbQuery == null)
  266. {
  267. return new List<int> { 0 };
  268. }
  269. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  270. AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  271. return await ResultsFromFeedIterator(query);
  272. }
  273. public static async Task<List<int>> FindCountByDict<T>(this AzureCosmosFactory azureCosmosFactory,JsonElement json)
  274. {
  275. Type type = typeof(T);
  276. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  277. string pk = type.Name;
  278. StringBuilder sql = new StringBuilder("select value count(c) from c");
  279. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(json, sql, pk);
  280. if (cosmosDbQuery == null)
  281. {
  282. return new List<int> { 0 };
  283. }
  284. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  285. AsyncPageable<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  286. return await ResultsFromFeedIterator(query);
  287. }
  288. public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict)
  289. {
  290. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  291. {
  292. string pk = container.type.Name;
  293. dict.Remove("@CURRPAGE");
  294. dict.Remove("@PAGESIZE");
  295. dict.Remove("@ASC");
  296. dict.Remove("@DESC");
  297. StringBuilder sql = new StringBuilder("select value count(c) from c");
  298. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  299. if (cosmosDbQuery == null)
  300. {
  301. return new List<dynamic> { 0 };
  302. }
  303. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  304. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  305. return await ResultsFromFeedIterator(query);
  306. }
  307. else
  308. {
  309. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  310. }
  311. }
  312. public static async Task<List<dynamic>> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName,JsonElement json)
  313. {
  314. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  315. {
  316. Dictionary<string, object> dict = new Dictionary<string, object>();
  317. var emobj = json.EnumerateObject();
  318. while (emobj.MoveNext())
  319. {
  320. if (emobj.Current.Name != "@CURRPAGE"||
  321. emobj.Current.Name != "@PAGESIZE" ||
  322. emobj.Current.Name != "@ASC" ||
  323. emobj.Current.Name != "@DESC") {
  324. dict[emobj.Current.Name] = emobj.Current.Value;
  325. }
  326. }
  327. string pk = container.type.Name;
  328. StringBuilder sql = new StringBuilder("select value count(c) from c");
  329. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  330. if (cosmosDbQuery == null)
  331. {
  332. return new List<dynamic> { 0 };
  333. }
  334. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  335. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  336. return await ResultsFromFeedIterator(query);
  337. }
  338. else
  339. {
  340. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  341. }
  342. }
  343. public static async Task<T> Update<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  344. {
  345. Type type = typeof(T);
  346. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  347. string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
  348. entity.pk = type.Name;
  349. entity.ttl = -1;
  350. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  351. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  352. if (container.cache && RedisHelper.Instance != null)
  353. {
  354. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  355. {
  356. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  357. }
  358. else
  359. {
  360. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  361. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  362. }
  363. }
  364. return response.Value;
  365. }
  366. internal class Item
  367. {
  368. public string id { get; set; }
  369. public string pk { get; set; }
  370. public MemoryStream stream { get; set; }
  371. }
  372. public static async Task<List<T>> UpdateAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  373. {
  374. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  375. //{
  376. // Task.WaitAll(Update(item));
  377. //}));
  378. Type type = typeof(T);
  379. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  380. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  381. bool flag = false;
  382. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  383. {
  384. flag = true;
  385. }
  386. string partitionKey =AzureCosmosUtil.GetPartitionKey (type);
  387. Stopwatch stopwatch = Stopwatch.StartNew();
  388. for (int i = 0; i < pages; i++)
  389. {
  390. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  391. List<Item> itemsToInsert = new List<Item>();
  392. lists.ForEach(async x =>
  393. {
  394. x.pk = type.Name;
  395. x.ttl = -1;
  396. MemoryStream stream = new MemoryStream();
  397. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  398. object o = type.GetProperty(partitionKey).GetValue(x, null);
  399. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  400. itemsToInsert.Add(keyValue);
  401. });
  402. List<Task> tasks = new List<Task>(lists.Count);
  403. itemsToInsert.ForEach(item =>
  404. {
  405. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  406. .ContinueWith((Task<Response> task) =>
  407. {
  408. //using (ResponseMessage response = task.Result)
  409. //{
  410. // if (!response.IsSuccessStatusCode)
  411. // {
  412. // }
  413. //}
  414. }
  415. ));
  416. });
  417. await Task.WhenAll(tasks);
  418. if (container.cache && RedisHelper.Instance != null)
  419. {
  420. lists.ForEach(async x => {
  421. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  422. });
  423. }
  424. }
  425. if (container.cache && RedisHelper.Instance != null && !flag)
  426. {
  427. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  428. }
  429. stopwatch.Stop();
  430. return enyites;
  431. }
  432. public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  433. {
  434. StringBuilder sql;
  435. sql = SQLHelper.GetSQLSelect(propertys);
  436. string pk = typeof(T).Name;
  437. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  438. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  439. return await ResultsFromQueryAndOptions<T>(azureCosmosFactory,cosmosDbQuery, queryRequestOptions);
  440. }
  441. public static async Task<List<T>> FindByDict<T>(this AzureCosmosFactory azureCosmosFactory, JsonElement jsonElement, List<string> propertys = null) where T : ID
  442. {
  443. StringBuilder sql;
  444. sql = SQLHelper.GetSQLSelect(propertys);
  445. string pk = typeof(T).Name;
  446. //Dictionary<string, object> dict = new Dictionary<string, object>();
  447. /* var emobj = jsonElement.EnumerateObject();
  448. while (emobj.MoveNext())
  449. {
  450. dict[emobj.Current.Name] = emobj.Current.Value;
  451. }
  452. //处理code
  453. if (dict.TryGetValue("code", out object _))
  454. {
  455. dict.Remove("code");
  456. }*/
  457. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(jsonElement, sql, pk);
  458. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  459. return await ResultsFromQueryAndOptions<T>(azureCosmosFactory, cosmosDbQuery, queryRequestOptions);
  460. }
  461. public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, JsonElement json, List<string> propertys = null)
  462. {
  463. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  464. {
  465. string pk = container.type.Name;
  466. StringBuilder sql;
  467. sql = SQLHelper.GetSQLSelect(propertys);
  468. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(json, sql, pk);
  469. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  470. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  471. return await ResultsFromFeedIterator(query);
  472. }
  473. else
  474. {
  475. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  476. }
  477. }
  478. public static async Task<List<dynamic>> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
  479. {
  480. if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
  481. {
  482. string pk = container.type.Name;
  483. StringBuilder sql;
  484. sql = SQLHelper.GetSQLSelect(propertys);
  485. AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
  486. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  487. AsyncPageable<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  488. return await ResultsFromFeedIterator(query);
  489. }
  490. else
  491. {
  492. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  493. }
  494. }
  495. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<IdPk> ids) where T : ID
  496. {
  497. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  498. List<IdPk> idPks = new List<IdPk>();
  499. if (container.monitor)
  500. {
  501. List<T> list = await azureCosmosFactory.FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
  502. list = await azureCosmosFactory. DeleteTTL(list);
  503. return ids;
  504. }
  505. else
  506. {
  507. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  508. Stopwatch stopwatch = Stopwatch.StartNew();
  509. for (int i = 0; i < pages; i++)
  510. {
  511. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  512. List<Task> tasks = new List<Task>(lists.Count);
  513. lists.ForEach(item =>
  514. {
  515. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  516. .ContinueWith((Task<Response> task) =>
  517. {
  518. using (Response response = task.Result)
  519. {
  520. idPks.Add(new IdPk { id = item.id, pk = item.pk, Status = response.Status });
  521. // if (!response.IsSuccessStatusCode)
  522. // {
  523. // }
  524. }
  525. }
  526. ));
  527. });
  528. await Task.WhenAll(tasks);
  529. if (container.cache && RedisHelper.Instance != null)
  530. {
  531. lists.ForEach(async x => {
  532. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  533. });
  534. }
  535. }
  536. stopwatch.Stop();
  537. return idPks;
  538. }
  539. }
  540. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, Dictionary<string, object> dict) where T : ID
  541. {
  542. if (dict.Keys.Count > 0)
  543. {
  544. List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
  545. return await azureCosmosFactory.DeleteAll(list);
  546. }
  547. else
  548. {
  549. throw new BizException("参数为空", 500);
  550. }
  551. }
  552. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, JsonElement dict) where T : ID
  553. {
  554. List<T> list = await azureCosmosFactory.FindByDict<T>(dict);
  555. return await azureCosmosFactory.DeleteAll(list);
  556. }
  557. public static async Task<List<IdPk>> DeleteAll<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  558. {
  559. Type type = typeof(T);
  560. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel (type.Name);
  561. List<IdPk> idPks = new List<IdPk>();
  562. //log4net 日志記錄
  563. string uuidKey = Guid.NewGuid().ToString();
  564. string logkey = "\r\n【" + uuidKey + "】\r\n";
  565. LogHelper.Info(new object(),
  566. logkey
  567. + "删除------->>\r\n"
  568. + "表:"
  569. + type.Name + "\r\n"
  570. + "数据:"
  571. + enyites.ToJsonString()
  572. + "\r\n" + logkey);
  573. string pk = AzureCosmosUtil.GetPartitionKey (type);
  574. if (container.monitor)
  575. {
  576. enyites = await azureCosmosFactory.DeleteTTL(enyites);
  577. foreach (T t in enyites)
  578. {
  579. object o = type.GetProperty(pk).GetValue(t, null);
  580. idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
  581. }
  582. return idPks;
  583. }
  584. else
  585. {
  586. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  587. Stopwatch stopwatch = Stopwatch.StartNew();
  588. for (int i = 0; i < pages; i++)
  589. {
  590. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  591. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  592. lists.ForEach(x =>
  593. {
  594. object o = type.GetProperty(pk).GetValue(x, null);
  595. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  596. itemsToInsert.Add(keyValue);
  597. });
  598. List<Task> tasks = new List<Task>(lists.Count);
  599. itemsToInsert.ForEach(item =>
  600. {
  601. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  602. .ContinueWith((Task<Response> task) =>
  603. {
  604. using (Response response = task.Result)
  605. {
  606. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), Status = response.Status });
  607. }
  608. }
  609. ));
  610. });
  611. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  612. {
  613. lists.ForEach(async x => {
  614. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  615. });
  616. }
  617. }
  618. stopwatch.Stop();
  619. return idPks;
  620. }
  621. }
  622. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, IdPk idPk) where T : ID
  623. {
  624. return await DeleteAsync<T>(azureCosmosFactory, idPk.id, idPk.pk);
  625. }
  626. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
  627. {
  628. // pk =AzureCosmosUtil.GetPartitionKey<T>();
  629. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  630. if (container.monitor)
  631. {
  632. List<T> list = await FindByDict<T>(azureCosmosFactory,new Dictionary<string, object>() { { "id", id } });
  633. if (list.Count > 0)
  634. {
  635. await DeleteTTL<T>(azureCosmosFactory, list);
  636. return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
  637. }
  638. else
  639. {
  640. throw new BizException("未找到ID匹配的数据,删除失败");
  641. }
  642. }
  643. else
  644. {
  645. Response response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  646. if (container.cache && RedisHelper.Instance != null)
  647. {
  648. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  649. }
  650. return new IdPk { id = id, pk = pk, Status = response.Status };
  651. }
  652. }
  653. public static async Task<IdPk> DeleteAsync<T>(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
  654. {
  655. Type type = typeof(T);
  656. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  657. string partitionKey = AzureCosmosUtil.GetPartitionKey (type);
  658. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  659. if (container.monitor)
  660. {
  661. await DeleteTTL<T>(azureCosmosFactory, new List<T>() { entity });
  662. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
  663. }
  664. else
  665. {
  666. Response response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  667. if (container.cache && RedisHelper.Instance != null)
  668. {
  669. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  670. }
  671. return new IdPk { id = entity.id, pk = partitionKey, Status = response.Status };
  672. }
  673. }
  674. public static async Task<List<T>> FindSQL<T>(this AzureCosmosFactory azureCosmosFactory, string sql, Dictionary<string, object> Parameters = null) where T : ID
  675. {
  676. if (sql.Contains(".pk"))
  677. {
  678. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  679. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  680. if (Parameters != null)
  681. {
  682. AzureCosmosQuery cosmosDbQuery = new AzureCosmosQuery
  683. {
  684. QueryText = sql,
  685. Parameters = Parameters
  686. };
  687. AsyncPageable<T> feedIterator = container.container
  688. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  689. return await ResultsFromFeedIterator(feedIterator);
  690. }
  691. else
  692. {
  693. QueryDefinition queryDefinition = new QueryDefinition(sql);
  694. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  695. }
  696. }
  697. else
  698. {
  699. throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
  700. }
  701. }
  702. /// <summary>
  703. /// 偌TTL刉壺
  704. /// </summary>
  705. /// <typeparam name="T"></typeparam>
  706. /// <param name="list"></param>
  707. /// <returns></returns>
  708. private static async Task<List<T>> DeleteTTL<T>(this AzureCosmosFactory azureCosmosFactory ,List<T> list) where T : ID
  709. {
  710. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  711. list.ForEach(x => { x.ttl = ttl; });
  712. list = await DeleteTTlALL(azureCosmosFactory,list);
  713. if (container.cache && RedisHelper.Instance != null)
  714. {
  715. list.ForEach(async x => {
  716. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  717. });
  718. }
  719. return list;
  720. }
  721. private static async Task<List<T>> DeleteTTlALL<T>(this AzureCosmosFactory azureCosmosFactory, List<T> enyites) where T : ID
  722. {
  723. Type type = typeof(T);
  724. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  725. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(type.Name);
  726. bool flag = false;
  727. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  728. {
  729. flag = true;
  730. }
  731. string partitionKey = AzureCosmosUtil.GetPartitionKey (type);
  732. Stopwatch stopwatch = Stopwatch.StartNew();
  733. for (int i = 0; i < pages; i++)
  734. {
  735. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  736. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  737. lists.ForEach(async x =>
  738. {
  739. x.pk = type.Name;
  740. //x.ttl = null;
  741. MemoryStream stream = new MemoryStream();
  742. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  743. object o = type.GetProperty(partitionKey).GetValue(x, null);
  744. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  745. itemsToInsert.Add(keyValue);
  746. });
  747. List<Task> tasks = new List<Task>(lists.Count);
  748. itemsToInsert.ForEach(item =>
  749. {
  750. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  751. .ContinueWith((Task<Response> task) =>
  752. {
  753. //using (ResponseMessage response = task.Result)
  754. //{
  755. // if (!response.IsSuccessStatusCode)
  756. // {
  757. // }
  758. //}
  759. }
  760. ));
  761. });
  762. await Task.WhenAll(tasks);
  763. if (container.cache && RedisHelper.Instance != null)
  764. {
  765. lists.ForEach(async x => {
  766. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  767. });
  768. }
  769. }
  770. if (container.cache && RedisHelper.Instance != null && !flag)
  771. {
  772. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  773. }
  774. stopwatch.Stop();
  775. return enyites;
  776. }
  777. private static QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null, int? maxBufferedItemCount = null, int? maxConcurrency = null)
  778. {
  779. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  780. {
  781. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  782. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  783. MaxConcurrency = maxConcurrency ?? 50
  784. };
  785. return queryRequestOptions;
  786. }
  787. private static int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  788. {
  789. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  790. }
  791. private static async Task<List<T>> ResultsFromQueryAndOptions<T>(this AzureCosmosFactory azureCosmosFactory, AzureCosmosQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  792. {
  793. if (cosmosDbQuery == null)
  794. {
  795. return null;
  796. }
  797. AzureCosmosModel container = azureCosmosFactory.GetCosmosModel(typeof(T).Name);
  798. AsyncPageable<T> query = container.container.GetItemQueryIterator<T>(
  799. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  800. requestOptions: queryOptions);
  801. return await ResultsFromFeedIterator(query);
  802. }
  803. private static async Task<List<T>> ResultsFromFeedIterator<T>(AsyncPageable<T> query, int? maxItemCount = null)
  804. {
  805. List<T> results = new List<T>();
  806. await foreach (T t in query)
  807. {
  808. results.Add(t);
  809. if (results.Count == maxItemCount)
  810. {
  811. return results;
  812. }
  813. }
  814. return results;
  815. }
  816. private static QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  817. {
  818. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  819. {
  820. MaxItemCount = itemsPerPage
  821. };
  822. return queryRequestOptions;
  823. }
  824. }
  825. }