AzureCosmosDBV3Repository.cs 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using OpenXmlPowerTools;
  4. using System;
  5. using System.Collections.Concurrent;
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.IO;
  9. using System.Linq;
  10. using System.Linq.Expressions;
  11. using System.Net;
  12. using System.Reflection;
  13. using System.Text;
  14. using System.Text.Json;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. using TEAMModelOS.SDK.Context.Attributes.Azure;
  18. using TEAMModelOS.SDK.Context.Exception;
  19. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  20. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  21. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  22. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  23. {
  24. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  25. {
  26. private CosmosClient CosmosClient { get; set; }
  27. public CosmosDict CosmosDict { get; set; } = new CosmosDict();
  28. /// <summary>
  29. /// 数据库名
  30. /// </summary>
  31. private string DatabaseId { get; set; }
  32. /// <summary>
  33. /// RU
  34. /// </summary>
  35. private int CollectionThroughput { get; set; }
  36. /// <summary>
  37. /// 数据库对象
  38. /// </summary>
  39. private Database database { get; set; }
  40. /// <summary>
  41. /// 分页大小
  42. /// </summary>
  43. int pageSize = 200;
  44. /// <summary>
  45. /// 缓存前缀
  46. /// </summary>
  47. private const string CacheCosmosPrefix = "cosmos:";
  48. /// <summary>
  49. /// 扫描类
  50. /// </summary>
  51. private string[] ScanModel { get; set; }
  52. /// <summary>
  53. /// 超时时间
  54. /// </summary>
  55. private const int timeoutSeconds = 86400;
  56. /// <summary>
  57. /// ttl时长 1秒
  58. /// </summary>
  59. private const int ttl = 1;
  60. /// <summary>
  61. /// 更新源通知容器
  62. /// </summary>
  63. private string leaseId = "AleaseContainer";
  64. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  65. {
  66. try
  67. {
  68. if (!string.IsNullOrEmpty(options.ConnectionString))
  69. {
  70. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  71. }
  72. else
  73. {
  74. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  75. }
  76. DatabaseId = options.Database;
  77. CollectionThroughput = options.CollectionThroughput;
  78. ScanModel = options.ScanModel;
  79. }
  80. catch (CosmosException e)
  81. {
  82. throw new BizException(e.Message, 500, e.StackTrace);
  83. }
  84. }
  85. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  86. {
  87. try
  88. {
  89. if (!string.IsNullOrEmpty(options.ConnectionString))
  90. {
  91. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  92. }
  93. else
  94. {
  95. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  96. }
  97. DatabaseId = options.Database;
  98. CollectionThroughput = options.CollectionThroughput;
  99. ScanModel = options.ScanModel;
  100. }
  101. catch (CosmosException e)
  102. {
  103. throw new BizException(e.Message, 500, e.StackTrace);
  104. }
  105. }
  106. /// <summary>
  107. /// 初始化CosmosDB数据库
  108. /// </summary>
  109. /// <returns></returns>
  110. public async Task<CosmosDict> InitializeDatabase() {
  111. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  112. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  113. while (resultSetIterator.HasMoreResults)
  114. {
  115. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  116. {
  117. CosmosDict.nameCosmos.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), cache = false, monitor = false });
  118. }
  119. }
  120. bool isMonitor = false;
  121. //获取数据库所有的表
  122. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  123. foreach (Type type in types)
  124. {
  125. string PartitionKey = GetPartitionKey(type);
  126. string CollectionName = "";
  127. int RU = 0;
  128. bool cache = false;
  129. bool monitor = false;
  130. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  131. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  132. {
  133. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  134. }
  135. else {
  136. throw new BizException("必须指定容器名",ResponseCode.PARAMS_ERROR);
  137. }
  138. if (attributes.First<CosmosDBAttribute>().Cache)
  139. {
  140. cache = attributes.First<CosmosDBAttribute>().Cache;
  141. }
  142. if (attributes.First<CosmosDBAttribute>().Monitor)
  143. {
  144. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  145. if (monitor)
  146. {
  147. isMonitor = true;
  148. }
  149. }
  150. if (attributes.First<CosmosDBAttribute>().RU > 400)
  151. {
  152. RU = attributes.First<CosmosDBAttribute>().RU;
  153. }
  154. else
  155. {
  156. RU = CollectionThroughput;
  157. }
  158. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  159. if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  160. { //更新RU
  161. cosmosModelInfo.cache = cache;
  162. Container container = CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id);
  163. int? throughputResponse = await container.ReadThroughputAsync();
  164. if (throughputResponse < RU)
  165. {
  166. await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
  167. }
  168. CosmosModelInfo cosmos = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type };
  169. CosmosDict.nameCosmos[CollectionName] = cosmos;
  170. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  171. }
  172. else {
  173. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
  174. if (!string.IsNullOrEmpty(PartitionKey))
  175. {
  176. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  177. }
  178. if (RU > CollectionThroughput)
  179. {
  180. CollectionThroughput = RU;
  181. }
  182. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  183. CosmosModelInfo cosmos = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type };
  184. CosmosDict.nameCosmos[CollectionName] = cosmos;
  185. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  186. }
  187. }
  188. if (isMonitor)
  189. {
  190. ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
  191. Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
  192. CosmosDict.nameCosmos.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
  193. }
  194. return CosmosDict;
  195. }
  196. private string GetPartitionKey<T>()
  197. {
  198. Type type = typeof(T);
  199. return GetPartitionKey(type);
  200. }
  201. private string GetPartitionKey(Type type)
  202. {
  203. PropertyInfo[] properties = type.GetProperties();
  204. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  205. foreach (PropertyInfo property in properties)
  206. {
  207. if (property.Name.Equals("PartitionKey"))
  208. {
  209. attrProperties.Add(property);
  210. break;
  211. }
  212. object[] attributes = property.GetCustomAttributes(true);
  213. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  214. {
  215. if (attribute is PartitionKeyAttribute)
  216. {
  217. attrProperties.Add(property);
  218. }
  219. }
  220. }
  221. if (attrProperties.Count <= 0)
  222. {
  223. throw new BizException(type.Name + " has no PartitionKey !");
  224. }
  225. else
  226. {
  227. if (attrProperties.Count == 1)
  228. {
  229. return attrProperties[0].Name;
  230. }
  231. else { throw new BizException(type.Name+" PartitionKey can only be single!"); }
  232. }
  233. }
  234. private async Task<CosmosModelInfo> InitializeCollection<T>()
  235. {
  236. Type type = typeof(T);
  237. string partitionKey = GetPartitionKey<T>();
  238. CosmosDBAttribute cosmosDBAttribute = null;
  239. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  240. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  241. {
  242. cosmosDBAttribute = attributes.First<CosmosDBAttribute>();
  243. }
  244. else {
  245. throw new BizException(type.Name+"未指定CosmosDB表名",ResponseCode.PARAMS_ERROR);
  246. }
  247. return await InitializeCollection(cosmosDBAttribute, type.Name, partitionKey);
  248. }
  249. private async Task<CosmosModelInfo> InitializeCollection(CosmosDBAttribute cosmosDBAttribute, string typeName, string PartitionKey)
  250. {
  251. /////内存中已经存在这个表则直接返回
  252. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo cosmosModelInfo))
  253. {
  254. return cosmosModelInfo;
  255. }///如果没有则尝试默认创建
  256. else
  257. {
  258. ContainerProperties containerProperties = new ContainerProperties { Id = cosmosDBAttribute.Name };
  259. if (!string.IsNullOrEmpty(PartitionKey))
  260. {
  261. containerProperties.PartitionKeyPath = "/"+ PartitionKey;
  262. }
  263. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: cosmosDBAttribute.RU);
  264. CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cosmosDBAttribute.Cache,monitor= cosmosDBAttribute.Monitor };
  265. CosmosDict. nameCosmos.TryAdd(cosmosDBAttribute.Name, cosmosModel);
  266. CosmosDict.typeCosmos.TryAdd(typeName, cosmosModel);
  267. return cosmosModel;
  268. }
  269. }
  270. /// <summary>
  271. /// 按TTL删除
  272. /// </summary>
  273. /// <typeparam name="T"></typeparam>
  274. /// <param name="list"></param>
  275. /// <returns></returns>
  276. private async Task<List<T>> DeleteTTL<T>(List<T> list) where T : ID
  277. {
  278. CosmosModelInfo container = await InitializeCollection<T>();
  279. list.ForEach(x => { x.ttl = ttl; });
  280. list = await DeleteTTlALL(list);
  281. if (container.cache && RedisHelper.Instance != null)
  282. {
  283. list.ForEach(async x => {
  284. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  285. });
  286. }
  287. return list;
  288. }
  289. private async Task<List<T>> DeleteTTlALL<T>(List<T> enyites) where T : ID
  290. {
  291. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  292. //{
  293. // Task.WaitAll(Update(item));
  294. //}));
  295. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  296. CosmosModelInfo container = await InitializeCollection<T>();
  297. bool flag = false;
  298. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  299. {
  300. flag = true;
  301. }
  302. string partitionKey = GetPartitionKey<T>();
  303. Type type = typeof(T);
  304. Stopwatch stopwatch = Stopwatch.StartNew();
  305. for (int i = 0; i < pages; i++)
  306. {
  307. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  308. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  309. lists.ForEach(async x =>
  310. {
  311. x.pk = type.Name;
  312. //x.ttl = null;
  313. MemoryStream stream = new MemoryStream();
  314. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  315. object o = type.GetProperty(partitionKey).GetValue(x, null);
  316. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  317. itemsToInsert.Add(keyValue);
  318. });
  319. List<Task> tasks = new List<Task>(lists.Count);
  320. itemsToInsert.ForEach(item =>
  321. {
  322. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  323. .ContinueWith((Task<ResponseMessage> task) =>
  324. {
  325. //using (ResponseMessage response = task.Result)
  326. //{
  327. // if (!response.IsSuccessStatusCode)
  328. // {
  329. // }
  330. //}
  331. }
  332. ));
  333. });
  334. await Task.WhenAll(tasks);
  335. if (container.cache && RedisHelper.Instance != null)
  336. {
  337. lists.ForEach(async x => {
  338. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  339. });
  340. }
  341. }
  342. if (container.cache && RedisHelper.Instance != null && !flag)
  343. {
  344. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  345. }
  346. stopwatch.Stop();
  347. return enyites;
  348. }
  349. public async Task<List<IdPk>> DeleteAll<T>(List<IdPk> ids) where T : ID
  350. {
  351. // string pk = GetPartitionKey<T>();
  352. CosmosModelInfo container = await InitializeCollection<T>();
  353. List<IdPk> idPks = new List<IdPk>();
  354. if (container.monitor)
  355. {
  356. List<T> list = await FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
  357. list = await DeleteTTL(list);
  358. return ids;
  359. }
  360. else
  361. {
  362. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  363. Stopwatch stopwatch = Stopwatch.StartNew();
  364. for (int i = 0; i < pages; i++)
  365. {
  366. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  367. List<Task> tasks = new List<Task>(lists.Count);
  368. lists.ForEach(item =>
  369. {
  370. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  371. .ContinueWith((Task<ResponseMessage> task) =>
  372. {
  373. using (ResponseMessage response = task.Result)
  374. {
  375. idPks.Add(new IdPk { id = item.id, pk = item.pk, StatusCode = response.StatusCode });
  376. // if (!response.IsSuccessStatusCode)
  377. // {
  378. // }
  379. }
  380. }
  381. ));
  382. });
  383. await Task.WhenAll(tasks);
  384. if (container.cache && RedisHelper.Instance != null)
  385. {
  386. lists.ForEach(async x => {
  387. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  388. });
  389. }
  390. }
  391. stopwatch.Stop();
  392. return idPks;
  393. }
  394. }
  395. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string, object> dict) where T : ID
  396. {
  397. if (dict.Keys.Count > 0)
  398. {
  399. List<T> list = await FindByDict<T>(dict);
  400. return await DeleteAll(list);
  401. }
  402. else
  403. {
  404. throw new BizException("参数为空", 500);
  405. }
  406. }
  407. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  408. {
  409. Type type = typeof(T);
  410. string pk = GetPartitionKey<T>();
  411. CosmosModelInfo container = await InitializeCollection<T>();
  412. List<IdPk> idPks = new List<IdPk>();
  413. if (container.monitor)
  414. {
  415. enyites = await DeleteTTL(enyites);
  416. foreach (T t in enyites)
  417. {
  418. object o = type.GetProperty(pk).GetValue(t, null);
  419. idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
  420. }
  421. return idPks;
  422. }
  423. else
  424. {
  425. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  426. Stopwatch stopwatch = Stopwatch.StartNew();
  427. for (int i = 0; i < pages; i++)
  428. {
  429. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  430. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  431. lists.ForEach(x =>
  432. {
  433. object o = type.GetProperty(pk).GetValue(x, null);
  434. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  435. itemsToInsert.Add(keyValue);
  436. });
  437. List<Task> tasks = new List<Task>(lists.Count);
  438. itemsToInsert.ForEach(item =>
  439. {
  440. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  441. .ContinueWith((Task<ResponseMessage> task) =>
  442. {
  443. using (ResponseMessage response = task.Result)
  444. {
  445. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  446. }
  447. }
  448. ));
  449. });
  450. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  451. {
  452. lists.ForEach(async x => {
  453. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  454. });
  455. }
  456. }
  457. stopwatch.Stop();
  458. return idPks;
  459. }
  460. }
  461. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID
  462. {
  463. return await DeleteAsync<T>(idPk.id, idPk.pk);
  464. }
  465. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  466. {
  467. // pk = GetPartitionKey<T>();
  468. CosmosModelInfo container = await InitializeCollection<T>();
  469. if (container.monitor)
  470. {
  471. List<T> list = await FindByDict<T>(new Dictionary<string, object>() { { "id", id } });
  472. if (list.Count > 0)
  473. {
  474. await DeleteTTL<T>(list);
  475. return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
  476. }
  477. else
  478. {
  479. throw new BizException("未找到ID匹配的数据,删除失败");
  480. }
  481. }
  482. else
  483. {
  484. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  485. if (container.cache && RedisHelper.Instance != null)
  486. {
  487. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  488. }
  489. return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
  490. }
  491. }
  492. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  493. {
  494. CosmosModelInfo container = await InitializeCollection<T>();
  495. string partitionKey = GetPartitionKey<T>();
  496. Type type = typeof(T);
  497. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  498. if (container.monitor)
  499. {
  500. await DeleteTTL<T>(new List<T>() { entity });
  501. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
  502. }
  503. else
  504. {
  505. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  506. if (container.cache && RedisHelper.Instance != null)
  507. {
  508. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  509. }
  510. return new IdPk { id = entity.id, pk = partitionKey, StatusCode = response.StatusCode };
  511. }
  512. }
  513. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  514. {
  515. CosmosModelInfo container = await InitializeCollection<T>();
  516. string pk = typeof(T).Name;
  517. StringBuilder sql;
  518. sql = SQLHelperParametric.GetSQLSelect(propertys);
  519. sql.Append(" where c.pk ='"+ pk + "'");
  520. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = sql.ToString() };
  521. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  522. return await ResultsFromFeedIterator(query);
  523. }
  524. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  525. {
  526. List<T> results = new List<T>();
  527. while (query.HasMoreResults)
  528. {
  529. foreach (T t in await query.ReadNextAsync())
  530. {
  531. results.Add(t);
  532. if (results.Count == maxItemCount)
  533. {
  534. return results;
  535. }
  536. }
  537. }
  538. return results;
  539. }
  540. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  541. {
  542. List<T> results = new List<T>();
  543. while (query.HasMoreResults)
  544. {
  545. if (results.Count() >= itemsPerPage)
  546. {
  547. await batchAction(results);
  548. results.Clear();
  549. }
  550. results.AddRange(await query.ReadNextAsync());
  551. }
  552. if (results.Count() > 0)
  553. {
  554. await batchAction(results);
  555. results.Clear();
  556. }
  557. return results;
  558. }
  559. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
  560. {
  561. if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
  562. {
  563. //StringBuilder sql = new StringBuilder("select value(c) from c");
  564. //SQLHelper.GetSQL(dict, ref sql);
  565. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery, int itemsPerPage = -1, int?
  566. //{
  567. // QueryText = sql.ToString()
  568. //};
  569. string pk = container.type.Name;
  570. StringBuilder sql;
  571. sql = SQLHelperParametric.GetSQLSelect(propertys);
  572. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  573. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  574. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  575. return await ResultsFromFeedIterator(query);
  576. }
  577. else
  578. {
  579. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  580. }
  581. }
  582. public async Task<List<int>> FindCountByDict<T>(Dictionary<string, object> dict) {
  583. CosmosModelInfo container = await InitializeCollection<T>();
  584. string pk = typeof(T).Name;
  585. StringBuilder sql = new StringBuilder("select value count(c) from c");
  586. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  587. if (cosmosDbQuery == null)
  588. {
  589. return new List<int> { 0 };
  590. }
  591. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  592. FeedIterator<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  593. return await ResultsFromFeedIterator(query);
  594. }
  595. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
  596. {
  597. if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
  598. {
  599. string pk = container.type.Name;
  600. dict.Remove("@CURRPAGE");
  601. dict.Remove("@PAGESIZE");
  602. dict.Remove("@ASC");
  603. dict.Remove("@DESC");
  604. StringBuilder sql = new StringBuilder("select value count(c) from c");
  605. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  606. if (cosmosDbQuery == null) {
  607. return new List<dynamic> {0 };
  608. }
  609. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  610. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  611. return await ResultsFromFeedIterator(query);
  612. }
  613. else
  614. {
  615. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  616. }
  617. }
  618. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  619. {
  620. return await FindByDict<T>(dict, propertys);
  621. }
  622. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  623. {
  624. StringBuilder sql;
  625. sql = SQLHelperParametric.GetSQLSelect(propertys);
  626. string pk = typeof(T).Name;
  627. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  628. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  629. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  630. }
  631. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  632. {
  633. if (cosmosDbQuery == null) {
  634. return null;
  635. }
  636. CosmosModelInfo container = await InitializeCollection<T>();
  637. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  638. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  639. requestOptions: queryOptions);
  640. return await ResultsFromFeedIterator(query);
  641. }
  642. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  643. {
  644. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  645. }
  646. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  647. int? maxBufferedItemCount = null,
  648. int? maxConcurrency = null)
  649. {
  650. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  651. {
  652. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  653. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  654. MaxConcurrency = maxConcurrency ?? 50
  655. };
  656. return queryRequestOptions;
  657. }
  658. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  659. {
  660. CosmosModelInfo container = await InitializeCollection<T>();
  661. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  662. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  663. requestOptions: queryOptions);
  664. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  665. }
  666. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  667. {
  668. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  669. {
  670. MaxItemCount = itemsPerPage
  671. };
  672. return queryRequestOptions;
  673. }
  674. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false) where T : ID
  675. {
  676. string pk = typeof(T).Name;
  677. query = query.And(x => x.pk == pk);
  678. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  679. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  680. FeedIterator<T> feedIterator;
  681. CosmosModelInfo container = await InitializeCollection<T>();
  682. if (query == null)
  683. {
  684. if (order != null)
  685. {
  686. if (isDesc)
  687. {
  688. feedIterator = container.container
  689. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  690. .ToFeedIterator();
  691. }
  692. else
  693. {
  694. feedIterator = container.container
  695. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  696. .ToFeedIterator();
  697. }
  698. }
  699. else
  700. {
  701. feedIterator = container.container
  702. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  703. .ToFeedIterator();
  704. }
  705. }
  706. else
  707. {
  708. if (order != null)
  709. {
  710. if (isDesc)
  711. {
  712. feedIterator = container.container
  713. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  714. .Where(query).OrderByDescending(order)
  715. .ToFeedIterator();
  716. }
  717. else
  718. {
  719. feedIterator = container.container
  720. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  721. .Where(query).OrderBy(order)
  722. .ToFeedIterator();
  723. }
  724. }
  725. else
  726. {
  727. feedIterator = container.container
  728. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  729. .Where(query)
  730. .ToFeedIterator();
  731. }
  732. }
  733. return await ResultsFromFeedIterator<T>(feedIterator);
  734. }
  735. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null) where T : ID
  736. {
  737. if (sql.Contains(".pk"))
  738. {
  739. CosmosModelInfo container = await InitializeCollection<T>();
  740. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  741. if (Parameters != null)
  742. {
  743. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  744. {
  745. QueryText = sql,
  746. Parameters = Parameters
  747. };
  748. FeedIterator<T> feedIterator = container.container
  749. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  750. return await ResultsFromFeedIterator(feedIterator);
  751. }
  752. else
  753. {
  754. QueryDefinition queryDefinition = new QueryDefinition(sql);
  755. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  756. }
  757. }
  758. else
  759. {
  760. throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
  761. }
  762. }
  763. public async Task<T> Save<T>(T entity) where T : ID
  764. {
  765. try
  766. {
  767. CosmosModelInfo container = await InitializeCollection<T>();
  768. entity.pk = container.type.Name;
  769. entity.ttl = null;
  770. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  771. if (container.cache && RedisHelper.Instance != null)
  772. {
  773. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  774. {
  775. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  776. }
  777. else
  778. {
  779. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  780. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  781. }
  782. }
  783. return response.Resource;
  784. }
  785. catch (Exception e)
  786. {
  787. throw new BizException(e.Message);
  788. }
  789. }
  790. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  791. {
  792. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  793. CosmosModelInfo container = await InitializeCollection<T>();
  794. bool flag = false;
  795. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  796. {
  797. flag = true;
  798. }
  799. string partitionKey = GetPartitionKey<T>();
  800. Type type = typeof(T);
  801. Stopwatch stopwatch = Stopwatch.StartNew();
  802. for (int i = 0; i < pages; i++)
  803. {
  804. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  805. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  806. lists.ForEach(async x =>
  807. {
  808. x.pk = type.Name;
  809. x.ttl = null;
  810. MemoryStream stream = new MemoryStream();
  811. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  812. object o = type.GetProperty(partitionKey).GetValue(x, null);
  813. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  814. itemsToInsert.Add(keyValue);
  815. });
  816. List<Task> tasks = new List<Task>(lists.Count);
  817. itemsToInsert.ForEach(item =>
  818. {
  819. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  820. .ContinueWith((Task<ResponseMessage> task) =>
  821. {
  822. using (ResponseMessage response = task.Result)
  823. {
  824. if (!response.IsSuccessStatusCode)
  825. {
  826. }
  827. }
  828. }
  829. ));
  830. });
  831. await Task.WhenAll(tasks);
  832. if (container.cache && RedisHelper.Instance != null)
  833. {
  834. lists.ForEach(async x => {
  835. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  836. });
  837. }
  838. }
  839. if (container.cache && RedisHelper.Instance != null && !flag)
  840. {
  841. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  842. }
  843. stopwatch.Stop();
  844. return enyites;
  845. }
  846. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  847. {
  848. CosmosModelInfo container = await InitializeCollection<T>();
  849. entity.pk = container.type.Name;
  850. entity.ttl = null;
  851. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  852. if (container.cache && RedisHelper.Instance != null)
  853. {
  854. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  855. {
  856. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  857. }
  858. else
  859. {
  860. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  861. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  862. }
  863. }
  864. return response.Resource;
  865. }
  866. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  867. {
  868. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  869. //{
  870. // Task.WaitAll(Update(item));
  871. //}));
  872. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  873. CosmosModelInfo container = await InitializeCollection<T>();
  874. bool flag = false;
  875. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  876. {
  877. flag = true;
  878. }
  879. string partitionKey = GetPartitionKey<T>();
  880. Type type = typeof(T);
  881. Stopwatch stopwatch = Stopwatch.StartNew();
  882. for (int i = 0; i < pages; i++)
  883. {
  884. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  885. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  886. lists.ForEach(async x =>
  887. {
  888. x.pk = type.Name;
  889. x.ttl = null;
  890. MemoryStream stream = new MemoryStream();
  891. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  892. object o = type.GetProperty(partitionKey).GetValue(x, null);
  893. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  894. itemsToInsert.Add(keyValue);
  895. });
  896. List<Task> tasks = new List<Task>(lists.Count);
  897. itemsToInsert.ForEach(item =>
  898. {
  899. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  900. .ContinueWith((Task<ResponseMessage> task) =>
  901. {
  902. //using (ResponseMessage response = task.Result)
  903. //{
  904. // if (!response.IsSuccessStatusCode)
  905. // {
  906. // }
  907. //}
  908. }
  909. ));
  910. });
  911. await Task.WhenAll(tasks);
  912. if (container.cache && RedisHelper.Instance != null)
  913. {
  914. lists.ForEach(async x => {
  915. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  916. });
  917. }
  918. }
  919. if (container.cache && RedisHelper.Instance != null && !flag)
  920. {
  921. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  922. }
  923. stopwatch.Stop();
  924. return enyites;
  925. }
  926. public async Task<T> Update<T>(T entity) where T : ID
  927. {
  928. CosmosModelInfo container = await InitializeCollection<T>();
  929. string partitionKey = GetPartitionKey<T>();
  930. Type type = typeof(T);
  931. entity.pk = type.Name;
  932. entity.ttl = null;
  933. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  934. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  935. if (container.cache && RedisHelper.Instance != null)
  936. {
  937. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  938. {
  939. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  940. }
  941. else
  942. {
  943. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  944. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  945. }
  946. }
  947. return response.Resource;
  948. }
  949. internal class Item
  950. {
  951. public string id { get; set; }
  952. public string pk { get; set; }
  953. public MemoryStream stream { get; set; }
  954. }
  955. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  956. {
  957. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  958. //{
  959. // Task.WaitAll(Update(item));
  960. //}));
  961. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  962. CosmosModelInfo container = await InitializeCollection<T>();
  963. bool flag = false;
  964. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  965. {
  966. flag = true;
  967. }
  968. string partitionKey = GetPartitionKey<T>();
  969. Type type = typeof(T);
  970. Stopwatch stopwatch = Stopwatch.StartNew();
  971. for (int i = 0; i < pages; i++)
  972. {
  973. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  974. List<Item> itemsToInsert = new List<Item>();
  975. lists.ForEach(async x =>
  976. {
  977. x.pk = type.Name;
  978. x.ttl = null;
  979. MemoryStream stream = new MemoryStream();
  980. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  981. object o = type.GetProperty(partitionKey).GetValue(x, null);
  982. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  983. itemsToInsert.Add(keyValue);
  984. });
  985. List<Task> tasks = new List<Task>(lists.Count);
  986. itemsToInsert.ForEach(item =>
  987. {
  988. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  989. .ContinueWith((Task<ResponseMessage> task) =>
  990. {
  991. //using (ResponseMessage response = task.Result)
  992. //{
  993. // if (!response.IsSuccessStatusCode)
  994. // {
  995. // }
  996. //}
  997. }
  998. ));
  999. });
  1000. await Task.WhenAll(tasks);
  1001. if (container.cache && RedisHelper.Instance != null)
  1002. {
  1003. lists.ForEach(async x => {
  1004. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  1005. });
  1006. }
  1007. }
  1008. if (container.cache && RedisHelper.Instance != null && !flag)
  1009. {
  1010. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  1011. }
  1012. stopwatch.Stop();
  1013. return enyites;
  1014. }
  1015. private async Task<List<T>> FindByIdAsSql<T>(string id) where T : ID
  1016. {
  1017. CosmosModelInfo container = await InitializeCollection<T>();
  1018. string pk = container.type.Name;
  1019. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  1020. {
  1021. QueryText = @"SELECT *
  1022. FROM c
  1023. WHERE c.pk='" + pk + "' and c.id = @id",
  1024. Parameters = new Dictionary<string, object>
  1025. {
  1026. { "@id",id}
  1027. }
  1028. };
  1029. FeedIterator<T> feedIterator = container.container
  1030. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  1031. return (await ResultsFromFeedIterator(feedIterator)).ToList();
  1032. }
  1033. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  1034. {
  1035. CosmosModelInfo container = await InitializeCollection<T>();
  1036. try {
  1037. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  1038. return response.Resource;
  1039. }
  1040. catch (Exception ex) {
  1041. return default;
  1042. }
  1043. }
  1044. public async Task<List<T>> FindById<T>(string id, bool cache = true) where T : ID
  1045. {
  1046. CosmosModelInfo container = await InitializeCollection<T>();
  1047. if (container.cache && RedisHelper.Instance != null && cache == true)
  1048. {
  1049. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByIdAsSql<T>(id); });
  1050. }
  1051. else
  1052. {
  1053. return await FindByIdAsSql<T>(id);
  1054. }
  1055. }
  1056. public async Task<List<T>> FindByIds<T>(List<string> ids) where T : ID
  1057. {
  1058. CosmosModelInfo container = await InitializeCollection<T>();
  1059. if (container.cache && RedisHelper.Instance != null)
  1060. {
  1061. List<T> list = new List<T>();
  1062. List<string> NotIn = new List<string>();
  1063. foreach (string id in ids)
  1064. {
  1065. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  1066. {
  1067. NotIn.Add(id);
  1068. }
  1069. else
  1070. {
  1071. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  1072. }
  1073. }
  1074. if (NotIn.IsNotEmpty())
  1075. {
  1076. List<T> noInList = await FindByDict<T>(new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  1077. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  1078. list.AddRange(noInList);
  1079. }
  1080. return list;
  1081. }
  1082. else
  1083. {
  1084. return await FindByDict<T>(new Dictionary<string, object> { { "id", ids.ToArray() } });
  1085. }
  1086. }
  1087. public async Task<dynamic> FindById(string typeName, string id)
  1088. {
  1089. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
  1090. {
  1091. if (container.cache && RedisHelper.Instance != null)
  1092. {
  1093. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary<string, object> { { "id", id } }); });
  1094. }
  1095. else
  1096. {
  1097. return await FindByDict(typeName, new Dictionary<string, object> { { "id", id } });
  1098. }
  1099. }
  1100. else
  1101. {
  1102. throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
  1103. }
  1104. }
  1105. public async Task<List<dynamic>> FindByIds(string typeName, List<string> ids)
  1106. {
  1107. if (CosmosDict. typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
  1108. {
  1109. if (container.cache && RedisHelper.Instance != null)
  1110. {
  1111. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary<string, object> { { "id", ids.ToArray() } }); });
  1112. }
  1113. else
  1114. {
  1115. return await FindByDict(typeName, new Dictionary<string, object> { { "id", ids.ToArray() } });
  1116. }
  1117. }
  1118. else
  1119. {
  1120. throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
  1121. }
  1122. }
  1123. }
  1124. }