AzureCosmosDBV3Repository.cs 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using OpenXmlPowerTools;
  4. using System;
  5. using System.Collections.Concurrent;
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.IO;
  9. using System.Linq;
  10. using System.Linq.Expressions;
  11. using System.Net;
  12. using System.Reflection;
  13. using System.Text;
  14. using System.Text.Json;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. using TEAMModelOS.SDK.Context.Attributes.Azure;
  18. using TEAMModelOS.SDK.Context.Exception;
  19. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  20. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  21. using TEAMModelOS.SDK.Helper.Common.LogHelper;
  22. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  23. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  24. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  25. {
  26. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  27. {
  28. private CosmosClient CosmosClient { get; set; }
  29. public CosmosDict CosmosDict { get; set; } = new CosmosDict();
  30. /// <summary>
  31. /// 数据库名
  32. /// </summary>
  33. private string DatabaseId { get; set; }
  34. /// <summary>
  35. /// RU
  36. /// </summary>
  37. private int CollectionThroughput { get; set; }
  38. /// <summary>
  39. /// 数据库对象
  40. /// </summary>
  41. private Database database { get; set; }
  42. /// <summary>
  43. /// 分页大小
  44. /// </summary>
  45. int pageSize = 200;
  46. /// <summary>
  47. /// 缓存前缀
  48. /// </summary>
  49. private const string CacheCosmosPrefix = "cosmos:";
  50. /// <summary>
  51. /// 扫描类
  52. /// </summary>
  53. private string[] ScanModel { get; set; }
  54. /// <summary>
  55. /// 超时时间
  56. /// </summary>
  57. private const int timeoutSeconds = 86400;
  58. /// <summary>
  59. /// ttl时长 1秒
  60. /// </summary>
  61. private const int ttl = 1;
  62. /// <summary>
  63. /// 更新源通知容器
  64. /// </summary>
  65. private string leaseId = "AleaseContainer";
  66. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  67. {
  68. try
  69. {
  70. if (!string.IsNullOrEmpty(options.ConnectionString))
  71. {
  72. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  73. }
  74. else
  75. {
  76. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  77. }
  78. DatabaseId = options.Database;
  79. CollectionThroughput = options.CollectionThroughput;
  80. ScanModel = options.ScanModel;
  81. }
  82. catch (CosmosException e)
  83. {
  84. throw new BizException(e.Message, 500, e.StackTrace);
  85. }
  86. }
  87. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  88. {
  89. try
  90. {
  91. if (!string.IsNullOrEmpty(options.ConnectionString))
  92. {
  93. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  94. }
  95. else
  96. {
  97. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  98. }
  99. DatabaseId = options.Database;
  100. CollectionThroughput = options.CollectionThroughput;
  101. ScanModel = options.ScanModel;
  102. }
  103. catch (CosmosException e)
  104. {
  105. throw new BizException(e.Message, 500, e.StackTrace);
  106. }
  107. }
  108. /// <summary>
  109. /// 初始化CosmosDB数据库
  110. /// </summary>
  111. /// <returns></returns>
  112. public async Task<CosmosDict> InitializeDatabase() {
  113. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  114. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  115. while (resultSetIterator.HasMoreResults)
  116. {
  117. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  118. {
  119. CosmosDict.nameCosmos.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), partitionKey= container.PartitionKeyPath.Replace("/",""), cache = false, monitor = false });
  120. }
  121. }
  122. bool isMonitor = false;
  123. //获取数据库所有的表
  124. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  125. foreach (Type type in types)
  126. {
  127. string PartitionKey = GetPartitionKey(type);
  128. string CollectionName = "";
  129. int RU = 0;
  130. bool cache = false;
  131. bool monitor = false;
  132. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  133. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  134. {
  135. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  136. }
  137. else {
  138. throw new BizException("必须指定容器名",ResponseCode.PARAMS_ERROR);
  139. }
  140. if (attributes.First<CosmosDBAttribute>().Cache)
  141. {
  142. cache = attributes.First<CosmosDBAttribute>().Cache;
  143. }
  144. if (attributes.First<CosmosDBAttribute>().Monitor)
  145. {
  146. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  147. if (monitor)
  148. {
  149. isMonitor = true;
  150. }
  151. }
  152. if (attributes.First<CosmosDBAttribute>().RU > 400)
  153. {
  154. RU = attributes.First<CosmosDBAttribute>().RU;
  155. }
  156. else
  157. {
  158. RU = CollectionThroughput;
  159. }
  160. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  161. if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  162. { //更新RU
  163. cosmosModelInfo.cache = cache;
  164. Container container = CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id);
  165. int? throughputResponse = await container.ReadThroughputAsync();
  166. if (throughputResponse < RU)
  167. {
  168. await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
  169. }
  170. CosmosModelInfo cosmos = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type,partitionKey=PartitionKey };
  171. CosmosDict.nameCosmos[CollectionName] = cosmos;
  172. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  173. }
  174. else {
  175. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
  176. if (!string.IsNullOrEmpty(PartitionKey))
  177. {
  178. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  179. }
  180. if (RU > CollectionThroughput)
  181. {
  182. CollectionThroughput = RU;
  183. }
  184. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  185. CosmosModelInfo cosmos = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey };
  186. CosmosDict.nameCosmos[CollectionName] = cosmos;
  187. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  188. }
  189. }
  190. if (isMonitor)
  191. {
  192. ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
  193. Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
  194. CosmosDict.nameCosmos.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false, partitionKey = "/id" });
  195. }
  196. return CosmosDict;
  197. }
  198. private string GetPartitionKey<T>()
  199. {
  200. Type type = typeof(T);
  201. return GetPartitionKey(type);
  202. }
  203. private string GetPartitionKey(Type type)
  204. {
  205. PropertyInfo[] properties = type.GetProperties();
  206. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  207. foreach (PropertyInfo property in properties)
  208. {
  209. if (property.Name.Equals("PartitionKey"))
  210. {
  211. attrProperties.Add(property);
  212. break;
  213. }
  214. object[] attributes = property.GetCustomAttributes(true);
  215. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  216. {
  217. if (attribute is PartitionKeyAttribute)
  218. {
  219. attrProperties.Add(property);
  220. }
  221. }
  222. }
  223. if (attrProperties.Count <= 0)
  224. {
  225. throw new BizException(type.Name + " has no PartitionKey !");
  226. }
  227. else
  228. {
  229. if (attrProperties.Count == 1)
  230. {
  231. return attrProperties[0].Name;
  232. }
  233. else { throw new BizException(type.Name+" PartitionKey can only be single!"); }
  234. }
  235. }
  236. private async Task<CosmosModelInfo> InitializeCollection<T>()
  237. {
  238. Type type = typeof(T);
  239. string partitionKey = GetPartitionKey<T>();
  240. CosmosDBAttribute cosmosDBAttribute = null;
  241. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  242. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  243. {
  244. cosmosDBAttribute = attributes.First<CosmosDBAttribute>();
  245. }
  246. else {
  247. throw new BizException(type.Name+"未指定CosmosDB表名",ResponseCode.PARAMS_ERROR);
  248. }
  249. return await InitializeCollection(cosmosDBAttribute, type.Name, partitionKey);
  250. }
  251. private async Task<CosmosModelInfo> InitializeCollection(CosmosDBAttribute cosmosDBAttribute, string typeName, string PartitionKey)
  252. {
  253. /////内存中已经存在这个表则直接返回
  254. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo cosmosModelInfo))
  255. {
  256. return cosmosModelInfo;
  257. }///如果没有则尝试默认创建
  258. else
  259. {
  260. ContainerProperties containerProperties = new ContainerProperties { Id = cosmosDBAttribute.Name };
  261. if (!string.IsNullOrEmpty(PartitionKey))
  262. {
  263. containerProperties.PartitionKeyPath = "/"+ PartitionKey;
  264. }
  265. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: cosmosDBAttribute.RU);
  266. CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cosmosDBAttribute.Cache,monitor= cosmosDBAttribute.Monitor };
  267. CosmosDict. nameCosmos.TryAdd(cosmosDBAttribute.Name, cosmosModel);
  268. CosmosDict.typeCosmos.TryAdd(typeName, cosmosModel);
  269. return cosmosModel;
  270. }
  271. }
  272. /// <summary>
  273. /// 按TTL删除
  274. /// </summary>
  275. /// <typeparam name="T"></typeparam>
  276. /// <param name="list"></param>
  277. /// <returns></returns>
  278. private async Task<List<T>> DeleteTTL<T>(List<T> list) where T : ID
  279. {
  280. CosmosModelInfo container = await InitializeCollection<T>();
  281. list.ForEach(x => { x.ttl = ttl; });
  282. list = await DeleteTTlALL(list);
  283. if (container.cache && RedisHelper.Instance != null)
  284. {
  285. list.ForEach(async x => {
  286. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  287. });
  288. }
  289. return list;
  290. }
  291. private async Task<List<T>> DeleteTTlALL<T>(List<T> enyites) where T : ID
  292. {
  293. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  294. //{
  295. // Task.WaitAll(Update(item));
  296. //}));
  297. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  298. CosmosModelInfo container = await InitializeCollection<T>();
  299. bool flag = false;
  300. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  301. {
  302. flag = true;
  303. }
  304. string partitionKey = GetPartitionKey<T>();
  305. Type type = typeof(T);
  306. Stopwatch stopwatch = Stopwatch.StartNew();
  307. for (int i = 0; i < pages; i++)
  308. {
  309. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  310. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  311. lists.ForEach(async x =>
  312. {
  313. x.pk = type.Name;
  314. //x.ttl = null;
  315. MemoryStream stream = new MemoryStream();
  316. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  317. object o = type.GetProperty(partitionKey).GetValue(x, null);
  318. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  319. itemsToInsert.Add(keyValue);
  320. });
  321. List<Task> tasks = new List<Task>(lists.Count);
  322. itemsToInsert.ForEach(item =>
  323. {
  324. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  325. .ContinueWith((Task<ResponseMessage> task) =>
  326. {
  327. //using (ResponseMessage response = task.Result)
  328. //{
  329. // if (!response.IsSuccessStatusCode)
  330. // {
  331. // }
  332. //}
  333. }
  334. ));
  335. });
  336. await Task.WhenAll(tasks);
  337. if (container.cache && RedisHelper.Instance != null)
  338. {
  339. lists.ForEach(async x => {
  340. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  341. });
  342. }
  343. }
  344. if (container.cache && RedisHelper.Instance != null && !flag)
  345. {
  346. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  347. }
  348. stopwatch.Stop();
  349. return enyites;
  350. }
  351. public async Task<List<IdPk>> DeleteAll<T>(List<IdPk> ids) where T : ID
  352. {
  353. // string pk = GetPartitionKey<T>();
  354. CosmosModelInfo container = await InitializeCollection<T>();
  355. List<IdPk> idPks = new List<IdPk>();
  356. if (container.monitor)
  357. {
  358. List<T> list = await FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
  359. list = await DeleteTTL(list);
  360. return ids;
  361. }
  362. else
  363. {
  364. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  365. Stopwatch stopwatch = Stopwatch.StartNew();
  366. for (int i = 0; i < pages; i++)
  367. {
  368. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  369. List<Task> tasks = new List<Task>(lists.Count);
  370. lists.ForEach(item =>
  371. {
  372. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  373. .ContinueWith((Task<ResponseMessage> task) =>
  374. {
  375. using (ResponseMessage response = task.Result)
  376. {
  377. idPks.Add(new IdPk { id = item.id, pk = item.pk, StatusCode = response.StatusCode });
  378. // if (!response.IsSuccessStatusCode)
  379. // {
  380. // }
  381. }
  382. }
  383. ));
  384. });
  385. await Task.WhenAll(tasks);
  386. if (container.cache && RedisHelper.Instance != null)
  387. {
  388. lists.ForEach(async x => {
  389. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  390. });
  391. }
  392. }
  393. stopwatch.Stop();
  394. return idPks;
  395. }
  396. }
  397. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string, object> dict) where T : ID
  398. {
  399. if (dict.Keys.Count > 0)
  400. {
  401. List<T> list = await FindByDict<T>(dict);
  402. return await DeleteAll(list);
  403. }
  404. else
  405. {
  406. throw new BizException("参数为空", 500);
  407. }
  408. }
  409. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  410. {
  411. Type type = typeof(T);
  412. string pk = GetPartitionKey<T>();
  413. CosmosModelInfo container = await InitializeCollection<T>();
  414. List<IdPk> idPks = new List<IdPk>();
  415. //log4net 日志記錄
  416. string uuidKey = Guid.NewGuid().ToString();
  417. string logkey = "\r\n【" + uuidKey + "】\r\n";
  418. LogHelper.Info(this,
  419. logkey
  420. + "删除------->>\r\n"
  421. + "表:"
  422. + type.Name+ "\r\n"
  423. + "数据:"
  424. + enyites.ToApiJson()
  425. + "\r\n"+ logkey);
  426. if (container.monitor)
  427. {
  428. enyites = await DeleteTTL(enyites);
  429. foreach (T t in enyites)
  430. {
  431. object o = type.GetProperty(pk).GetValue(t, null);
  432. idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
  433. }
  434. return idPks;
  435. }
  436. else
  437. {
  438. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  439. Stopwatch stopwatch = Stopwatch.StartNew();
  440. for (int i = 0; i < pages; i++)
  441. {
  442. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  443. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  444. lists.ForEach(x =>
  445. {
  446. object o = type.GetProperty(pk).GetValue(x, null);
  447. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  448. itemsToInsert.Add(keyValue);
  449. });
  450. List<Task> tasks = new List<Task>(lists.Count);
  451. itemsToInsert.ForEach(item =>
  452. {
  453. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  454. .ContinueWith((Task<ResponseMessage> task) =>
  455. {
  456. using (ResponseMessage response = task.Result)
  457. {
  458. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  459. }
  460. }
  461. ));
  462. });
  463. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  464. {
  465. lists.ForEach(async x => {
  466. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  467. });
  468. }
  469. }
  470. stopwatch.Stop();
  471. return idPks;
  472. }
  473. }
  474. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID
  475. {
  476. return await DeleteAsync<T>(idPk.id, idPk.pk);
  477. }
  478. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  479. {
  480. // pk = GetPartitionKey<T>();
  481. CosmosModelInfo container = await InitializeCollection<T>();
  482. if (container.monitor)
  483. {
  484. List<T> list = await FindByDict<T>(new Dictionary<string, object>() { { "id", id } });
  485. if (list.Count > 0)
  486. {
  487. await DeleteTTL<T>(list);
  488. return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
  489. }
  490. else
  491. {
  492. throw new BizException("未找到ID匹配的数据,删除失败");
  493. }
  494. }
  495. else
  496. {
  497. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  498. if (container.cache && RedisHelper.Instance != null)
  499. {
  500. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  501. }
  502. return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
  503. }
  504. }
  505. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  506. {
  507. CosmosModelInfo container = await InitializeCollection<T>();
  508. string partitionKey = GetPartitionKey<T>();
  509. Type type = typeof(T);
  510. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  511. if (container.monitor)
  512. {
  513. await DeleteTTL<T>(new List<T>() { entity });
  514. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
  515. }
  516. else
  517. {
  518. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  519. if (container.cache && RedisHelper.Instance != null)
  520. {
  521. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  522. }
  523. return new IdPk { id = entity.id, pk = partitionKey, StatusCode = response.StatusCode };
  524. }
  525. }
  526. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  527. {
  528. CosmosModelInfo container = await InitializeCollection<T>();
  529. string pk = typeof(T).Name;
  530. StringBuilder sql;
  531. sql = SQLHelperParametric.GetSQLSelect(propertys);
  532. sql.Append(" where c.pk ='"+ pk + "'");
  533. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = sql.ToString() };
  534. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  535. return await ResultsFromFeedIterator(query);
  536. }
  537. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  538. {
  539. List<T> results = new List<T>();
  540. while (query.HasMoreResults)
  541. {
  542. foreach (T t in await query.ReadNextAsync())
  543. {
  544. results.Add(t);
  545. if (results.Count == maxItemCount)
  546. {
  547. return results;
  548. }
  549. }
  550. }
  551. return results;
  552. }
  553. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  554. {
  555. List<T> results = new List<T>();
  556. while (query.HasMoreResults)
  557. {
  558. if (results.Count() >= itemsPerPage)
  559. {
  560. await batchAction(results);
  561. results.Clear();
  562. }
  563. results.AddRange(await query.ReadNextAsync());
  564. }
  565. if (results.Count() > 0)
  566. {
  567. await batchAction(results);
  568. results.Clear();
  569. }
  570. return results;
  571. }
  572. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
  573. {
  574. if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
  575. {
  576. //StringBuilder sql = new StringBuilder("select value(c) from c");
  577. //SQLHelper.GetSQL(dict, ref sql);
  578. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery, int itemsPerPage = -1, int?
  579. //{
  580. // QueryText = sql.ToString()
  581. //};
  582. string pk = container.type.Name;
  583. StringBuilder sql;
  584. sql = SQLHelperParametric.GetSQLSelect(propertys);
  585. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  586. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  587. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  588. return await ResultsFromFeedIterator(query);
  589. }
  590. else
  591. {
  592. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  593. }
  594. }
  595. public async Task<List<int>> FindCountByDict<T>(Dictionary<string, object> dict) {
  596. CosmosModelInfo container = await InitializeCollection<T>();
  597. string pk = typeof(T).Name;
  598. StringBuilder sql = new StringBuilder("select value count(c) from c");
  599. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  600. if (cosmosDbQuery == null)
  601. {
  602. return new List<int> { 0 };
  603. }
  604. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  605. FeedIterator<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  606. return await ResultsFromFeedIterator(query);
  607. }
  608. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
  609. {
  610. if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
  611. {
  612. string pk = container.type.Name;
  613. dict.Remove("@CURRPAGE");
  614. dict.Remove("@PAGESIZE");
  615. dict.Remove("@ASC");
  616. dict.Remove("@DESC");
  617. StringBuilder sql = new StringBuilder("select value count(c) from c");
  618. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  619. if (cosmosDbQuery == null) {
  620. return new List<dynamic> {0 };
  621. }
  622. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  623. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  624. return await ResultsFromFeedIterator(query);
  625. }
  626. else
  627. {
  628. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  629. }
  630. }
  631. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  632. {
  633. return await FindByDict<T>(dict, propertys);
  634. }
  635. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  636. {
  637. StringBuilder sql;
  638. sql = SQLHelperParametric.GetSQLSelect(propertys);
  639. string pk = typeof(T).Name;
  640. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  641. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  642. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  643. }
  644. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  645. {
  646. if (cosmosDbQuery == null) {
  647. return null;
  648. }
  649. CosmosModelInfo container = await InitializeCollection<T>();
  650. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  651. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  652. requestOptions: queryOptions);
  653. return await ResultsFromFeedIterator(query);
  654. }
  655. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  656. {
  657. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  658. }
  659. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  660. int? maxBufferedItemCount = null,
  661. int? maxConcurrency = null)
  662. {
  663. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  664. {
  665. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  666. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  667. MaxConcurrency = maxConcurrency ?? 50
  668. };
  669. return queryRequestOptions;
  670. }
  671. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  672. {
  673. CosmosModelInfo container = await InitializeCollection<T>();
  674. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  675. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  676. requestOptions: queryOptions);
  677. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  678. }
  679. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  680. {
  681. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  682. {
  683. MaxItemCount = itemsPerPage
  684. };
  685. return queryRequestOptions;
  686. }
  687. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false) where T : ID
  688. {
  689. string pk = typeof(T).Name;
  690. query = query.And(x => x.pk == pk);
  691. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  692. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  693. FeedIterator<T> feedIterator;
  694. CosmosModelInfo container = await InitializeCollection<T>();
  695. if (query == null)
  696. {
  697. if (order != null)
  698. {
  699. if (isDesc)
  700. {
  701. feedIterator = container.container
  702. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  703. .ToFeedIterator();
  704. }
  705. else
  706. {
  707. feedIterator = container.container
  708. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  709. .ToFeedIterator();
  710. }
  711. }
  712. else
  713. {
  714. feedIterator = container.container
  715. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  716. .ToFeedIterator();
  717. }
  718. }
  719. else
  720. {
  721. if (order != null)
  722. {
  723. if (isDesc)
  724. {
  725. feedIterator = container.container
  726. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  727. .Where(query).OrderByDescending(order)
  728. .ToFeedIterator();
  729. }
  730. else
  731. {
  732. feedIterator = container.container
  733. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  734. .Where(query).OrderBy(order)
  735. .ToFeedIterator();
  736. }
  737. }
  738. else
  739. {
  740. feedIterator = container.container
  741. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  742. .Where(query)
  743. .ToFeedIterator();
  744. }
  745. }
  746. return await ResultsFromFeedIterator<T>(feedIterator);
  747. }
  748. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null) where T : ID
  749. {
  750. if (sql.Contains(".pk"))
  751. {
  752. CosmosModelInfo container = await InitializeCollection<T>();
  753. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  754. if (Parameters != null)
  755. {
  756. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  757. {
  758. QueryText = sql,
  759. Parameters = Parameters
  760. };
  761. FeedIterator<T> feedIterator = container.container
  762. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  763. return await ResultsFromFeedIterator(feedIterator);
  764. }
  765. else
  766. {
  767. QueryDefinition queryDefinition = new QueryDefinition(sql);
  768. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  769. }
  770. }
  771. else
  772. {
  773. throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
  774. }
  775. }
  776. public async Task<T> Save<T>(T entity) where T : ID
  777. {
  778. try
  779. {
  780. CosmosModelInfo container = await InitializeCollection<T>();
  781. entity.pk = container.type.Name;
  782. entity.ttl = null;
  783. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  784. if (container.cache && RedisHelper.Instance != null)
  785. {
  786. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  787. {
  788. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  789. }
  790. else
  791. {
  792. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  793. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  794. }
  795. }
  796. return response.Resource;
  797. }
  798. catch (Exception e)
  799. {
  800. throw new BizException(e.Message);
  801. }
  802. }
  803. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  804. {
  805. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  806. CosmosModelInfo container = await InitializeCollection<T>();
  807. bool flag = false;
  808. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  809. {
  810. flag = true;
  811. }
  812. string partitionKey = GetPartitionKey<T>();
  813. Type type = typeof(T);
  814. Stopwatch stopwatch = Stopwatch.StartNew();
  815. for (int i = 0; i < pages; i++)
  816. {
  817. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  818. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  819. lists.ForEach(async x =>
  820. {
  821. x.pk = type.Name;
  822. x.ttl = null;
  823. MemoryStream stream = new MemoryStream();
  824. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  825. object o = type.GetProperty(partitionKey).GetValue(x, null);
  826. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  827. itemsToInsert.Add(keyValue);
  828. });
  829. List<Task> tasks = new List<Task>(lists.Count);
  830. itemsToInsert.ForEach(item =>
  831. {
  832. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  833. .ContinueWith((Task<ResponseMessage> task) =>
  834. {
  835. using (ResponseMessage response = task.Result)
  836. {
  837. if (!response.IsSuccessStatusCode)
  838. {
  839. }
  840. }
  841. }
  842. ));
  843. });
  844. await Task.WhenAll(tasks);
  845. if (container.cache && RedisHelper.Instance != null)
  846. {
  847. lists.ForEach(async x => {
  848. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  849. });
  850. }
  851. }
  852. if (container.cache && RedisHelper.Instance != null && !flag)
  853. {
  854. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  855. }
  856. stopwatch.Stop();
  857. return enyites;
  858. }
  859. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  860. {
  861. CosmosModelInfo container = await InitializeCollection<T>();
  862. entity.pk = container.type.Name;
  863. entity.ttl = null;
  864. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  865. if (container.cache && RedisHelper.Instance != null)
  866. {
  867. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  868. {
  869. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  870. }
  871. else
  872. {
  873. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  874. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  875. }
  876. }
  877. return response.Resource;
  878. }
  879. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  880. {
  881. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  882. //{
  883. // Task.WaitAll(Update(item));
  884. //}));
  885. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  886. CosmosModelInfo container = await InitializeCollection<T>();
  887. bool flag = false;
  888. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  889. {
  890. flag = true;
  891. }
  892. string partitionKey = GetPartitionKey<T>();
  893. Type type = typeof(T);
  894. Stopwatch stopwatch = Stopwatch.StartNew();
  895. for (int i = 0; i < pages; i++)
  896. {
  897. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  898. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  899. lists.ForEach(async x =>
  900. {
  901. x.pk = type.Name;
  902. x.ttl = null;
  903. MemoryStream stream = new MemoryStream();
  904. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  905. object o = type.GetProperty(partitionKey).GetValue(x, null);
  906. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  907. itemsToInsert.Add(keyValue);
  908. });
  909. List<Task> tasks = new List<Task>(lists.Count);
  910. itemsToInsert.ForEach(item =>
  911. {
  912. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  913. .ContinueWith((Task<ResponseMessage> task) =>
  914. {
  915. //using (ResponseMessage response = task.Result)
  916. //{
  917. // if (!response.IsSuccessStatusCode)
  918. // {
  919. // }
  920. //}
  921. }
  922. ));
  923. });
  924. await Task.WhenAll(tasks);
  925. if (container.cache && RedisHelper.Instance != null)
  926. {
  927. lists.ForEach(async x => {
  928. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  929. });
  930. }
  931. }
  932. if (container.cache && RedisHelper.Instance != null && !flag)
  933. {
  934. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  935. }
  936. stopwatch.Stop();
  937. return enyites;
  938. }
  939. public async Task<List<dynamic>> UpdateAll (string typeName,List<dynamic> enyites)
  940. {
  941. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  942. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo container)) {
  943. }
  944. bool flag = false;
  945. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  946. {
  947. flag = true;
  948. }
  949. string partitionKey = container.partitionKey;
  950. Stopwatch stopwatch = Stopwatch.StartNew();
  951. for (int i = 0; i < pages; i++)
  952. {
  953. List<dynamic> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  954. List<Item> itemsToInsert = new List<Item>();
  955. lists.ForEach(async x =>
  956. {
  957. /* x.pk = typeName;
  958. x.ttl = null;*/
  959. MemoryStream stream = new MemoryStream();
  960. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  961. object o = x[partitionKey];
  962. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  963. itemsToInsert.Add(keyValue);
  964. });
  965. List<Task> tasks = new List<Task>(lists.Count);
  966. itemsToInsert.ForEach(item =>
  967. {
  968. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  969. .ContinueWith((Task<ResponseMessage> task) =>
  970. {
  971. //using (ResponseMessage response = task.Result)
  972. //{
  973. // if (!response.IsSuccessStatusCode)
  974. // {
  975. // }
  976. //}
  977. }
  978. ));
  979. });
  980. await Task.WhenAll(tasks);
  981. if (container.cache && RedisHelper.Instance != null)
  982. {
  983. lists.ForEach(async x => {
  984. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  985. });
  986. }
  987. }
  988. if (container.cache && RedisHelper.Instance != null && !flag)
  989. {
  990. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  991. }
  992. stopwatch.Stop();
  993. return enyites;
  994. }
  995. public async Task<T> Update<T>(T entity) where T : ID
  996. {
  997. CosmosModelInfo container = await InitializeCollection<T>();
  998. string partitionKey = GetPartitionKey<T>();
  999. Type type = typeof(T);
  1000. entity.pk = type.Name;
  1001. entity.ttl = null;
  1002. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  1003. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  1004. if (container.cache && RedisHelper.Instance != null)
  1005. {
  1006. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  1007. {
  1008. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  1009. }
  1010. else
  1011. {
  1012. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  1013. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  1014. }
  1015. }
  1016. return response.Resource;
  1017. }
  1018. internal class Item
  1019. {
  1020. public string id { get; set; }
  1021. public string pk { get; set; }
  1022. public MemoryStream stream { get; set; }
  1023. }
  1024. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  1025. {
  1026. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  1027. //{
  1028. // Task.WaitAll(Update(item));
  1029. //}));
  1030. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  1031. CosmosModelInfo container = await InitializeCollection<T>();
  1032. bool flag = false;
  1033. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  1034. {
  1035. flag = true;
  1036. }
  1037. string partitionKey = GetPartitionKey<T>();
  1038. Type type = typeof(T);
  1039. Stopwatch stopwatch = Stopwatch.StartNew();
  1040. for (int i = 0; i < pages; i++)
  1041. {
  1042. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  1043. List<Item> itemsToInsert = new List<Item>();
  1044. lists.ForEach(async x =>
  1045. {
  1046. x.pk = type.Name;
  1047. x.ttl = null;
  1048. MemoryStream stream = new MemoryStream();
  1049. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  1050. object o = type.GetProperty(partitionKey).GetValue(x, null);
  1051. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  1052. itemsToInsert.Add(keyValue);
  1053. });
  1054. List<Task> tasks = new List<Task>(lists.Count);
  1055. itemsToInsert.ForEach(item =>
  1056. {
  1057. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  1058. .ContinueWith((Task<ResponseMessage> task) =>
  1059. {
  1060. //using (ResponseMessage response = task.Result)
  1061. //{
  1062. // if (!response.IsSuccessStatusCode)
  1063. // {
  1064. // }
  1065. //}
  1066. }
  1067. ));
  1068. });
  1069. await Task.WhenAll(tasks);
  1070. if (container.cache && RedisHelper.Instance != null)
  1071. {
  1072. lists.ForEach(async x => {
  1073. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  1074. });
  1075. }
  1076. }
  1077. if (container.cache && RedisHelper.Instance != null && !flag)
  1078. {
  1079. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  1080. }
  1081. stopwatch.Stop();
  1082. return enyites;
  1083. }
  1084. private async Task<List<T>> FindByIdAsSql<T>(string id) where T : ID
  1085. {
  1086. CosmosModelInfo container = await InitializeCollection<T>();
  1087. string pk = container.type.Name;
  1088. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  1089. {
  1090. QueryText = @"SELECT *
  1091. FROM c
  1092. WHERE c.pk='" + pk + "' and c.id = @id",
  1093. Parameters = new Dictionary<string, object>
  1094. {
  1095. { "@id",id}
  1096. }
  1097. };
  1098. FeedIterator<T> feedIterator = container.container
  1099. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  1100. return (await ResultsFromFeedIterator(feedIterator)).ToList();
  1101. }
  1102. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  1103. {
  1104. CosmosModelInfo container = await InitializeCollection<T>();
  1105. try {
  1106. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  1107. return response.Resource;
  1108. }
  1109. catch (Exception ex) {
  1110. return default;
  1111. }
  1112. }
  1113. public async Task<List<T>> FindById<T>(string id, bool cache = true) where T : ID
  1114. {
  1115. CosmosModelInfo container = await InitializeCollection<T>();
  1116. if (container.cache && RedisHelper.Instance != null && cache == true)
  1117. {
  1118. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByIdAsSql<T>(id); });
  1119. }
  1120. else
  1121. {
  1122. return await FindByIdAsSql<T>(id);
  1123. }
  1124. }
  1125. public async Task<List<T>> FindByIds<T>(List<string> ids) where T : ID
  1126. {
  1127. CosmosModelInfo container = await InitializeCollection<T>();
  1128. if (container.cache && RedisHelper.Instance != null)
  1129. {
  1130. List<T> list = new List<T>();
  1131. List<string> NotIn = new List<string>();
  1132. foreach (string id in ids)
  1133. {
  1134. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  1135. {
  1136. NotIn.Add(id);
  1137. }
  1138. else
  1139. {
  1140. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  1141. }
  1142. }
  1143. if (NotIn.IsNotEmpty())
  1144. {
  1145. List<T> noInList = await FindByDict<T>(new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  1146. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  1147. list.AddRange(noInList);
  1148. }
  1149. return list;
  1150. }
  1151. else
  1152. {
  1153. return await FindByDict<T>(new Dictionary<string, object> { { "id", ids.ToArray() } });
  1154. }
  1155. }
  1156. public async Task<dynamic> FindById(string typeName, string id)
  1157. {
  1158. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
  1159. {
  1160. if (container.cache && RedisHelper.Instance != null)
  1161. {
  1162. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary<string, object> { { "id", id } }); });
  1163. }
  1164. else
  1165. {
  1166. return await FindByDict(typeName, new Dictionary<string, object> { { "id", id } });
  1167. }
  1168. }
  1169. else
  1170. {
  1171. throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
  1172. }
  1173. }
  1174. public async Task<List<dynamic>> FindByIds(string typeName, List<string> ids)
  1175. {
  1176. if (CosmosDict. typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
  1177. {
  1178. if (container.cache && RedisHelper.Instance != null)
  1179. {
  1180. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary<string, object> { { "id", ids.ToArray() } }); });
  1181. }
  1182. else
  1183. {
  1184. return await FindByDict(typeName, new Dictionary<string, object> { { "id", ids.ToArray() } });
  1185. }
  1186. }
  1187. else
  1188. {
  1189. throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
  1190. }
  1191. }
  1192. }
  1193. }