AzureCosmosDBV3Repository.cs 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using OpenXmlPowerTools;
  4. using System;
  5. using System.Collections.Concurrent;
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.IO;
  9. using System.Linq;
  10. using System.Linq.Expressions;
  11. using System.Net;
  12. using System.Reflection;
  13. using System.Text;
  14. using System.Text.Json;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. using TEAMModelOS.SDK.Context.Attributes.Azure;
  18. using TEAMModelOS.SDK.Context.Exception;
  19. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  20. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  21. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  22. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  23. {
  24. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  25. {
  26. private CosmosClient CosmosClient { get; set; }
  27. public CosmosDict CosmosDict { get; set; } = new CosmosDict();
  28. /// <summary>
  29. /// 数据库名
  30. /// </summary>
  31. private string DatabaseId { get; set; }
  32. /// <summary>
  33. /// RU
  34. /// </summary>
  35. private int CollectionThroughput { get; set; }
  36. /// <summary>
  37. /// 数据库对象
  38. /// </summary>
  39. private Database database { get; set; }
  40. /// <summary>
  41. /// 分页大小
  42. /// </summary>
  43. int pageSize = 200;
  44. /// <summary>
  45. /// 缓存前缀
  46. /// </summary>
  47. private const string CacheCosmosPrefix = "cosmos:";
  48. /// <summary>
  49. /// 扫描类
  50. /// </summary>
  51. private string[] ScanModel { get; set; }
  52. /// <summary>
  53. /// 超时时间
  54. /// </summary>
  55. private const int timeoutSeconds = 86400;
  56. /// <summary>
  57. /// ttl时长 1秒
  58. /// </summary>
  59. private const int ttl = 1;
  60. /// <summary>
  61. /// 更新源通知容器
  62. /// </summary>
  63. private string leaseId = "AleaseContainer";
  64. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  65. {
  66. try
  67. {
  68. if (!string.IsNullOrEmpty(options.ConnectionString))
  69. {
  70. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  71. }
  72. else
  73. {
  74. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  75. }
  76. DatabaseId = options.Database;
  77. CollectionThroughput = options.CollectionThroughput;
  78. ScanModel = options.ScanModel;
  79. }
  80. catch (CosmosException e)
  81. {
  82. throw new BizException(e.Message, 500, e.StackTrace);
  83. }
  84. }
  85. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  86. {
  87. try
  88. {
  89. if (!string.IsNullOrEmpty(options.ConnectionString))
  90. {
  91. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  92. }
  93. else
  94. {
  95. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  96. }
  97. DatabaseId = options.Database;
  98. CollectionThroughput = options.CollectionThroughput;
  99. ScanModel = options.ScanModel;
  100. }
  101. catch (CosmosException e)
  102. {
  103. throw new BizException(e.Message, 500, e.StackTrace);
  104. }
  105. }
  106. /// <summary>
  107. /// 初始化CosmosDB数据库
  108. /// </summary>
  109. /// <returns></returns>
  110. public async Task<CosmosDict> InitializeDatabase() {
  111. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  112. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  113. while (resultSetIterator.HasMoreResults)
  114. {
  115. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  116. {
  117. CosmosDict.nameCosmos.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), cache = false, monitor = false });
  118. }
  119. }
  120. bool isMonitor = false;
  121. //获取数据库所有的表
  122. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  123. foreach (Type type in types)
  124. {
  125. string PartitionKey = GetPartitionKey(type);
  126. string CollectionName = "";
  127. int RU = 0;
  128. bool cache = false;
  129. bool monitor = false;
  130. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  131. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  132. {
  133. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  134. }
  135. else {
  136. throw new BizException("必须指定容器名",ResponseCode.PARAMS_ERROR);
  137. }
  138. if (attributes.First<CosmosDBAttribute>().Cache)
  139. {
  140. cache = attributes.First<CosmosDBAttribute>().Cache;
  141. }
  142. if (attributes.First<CosmosDBAttribute>().Monitor)
  143. {
  144. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  145. if (monitor)
  146. {
  147. isMonitor = true;
  148. }
  149. }
  150. if (attributes.First<CosmosDBAttribute>().RU > 400)
  151. {
  152. RU = attributes.First<CosmosDBAttribute>().RU;
  153. }
  154. else
  155. {
  156. RU = CollectionThroughput;
  157. }
  158. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  159. if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  160. { //更新RU
  161. cosmosModelInfo.cache = cache;
  162. Container container = CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id);
  163. int? throughputResponse = await container.ReadThroughputAsync();
  164. if (throughputResponse < RU)
  165. {
  166. await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
  167. }
  168. CosmosModelInfo cosmos = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type };
  169. CosmosDict.nameCosmos[CollectionName] = cosmos;
  170. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  171. }
  172. else {
  173. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
  174. if (!string.IsNullOrEmpty(PartitionKey))
  175. {
  176. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  177. }
  178. if (RU > CollectionThroughput)
  179. {
  180. CollectionThroughput = RU;
  181. }
  182. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  183. CosmosModelInfo cosmos = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type };
  184. CosmosDict.nameCosmos[CollectionName] = cosmos;
  185. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  186. }
  187. }
  188. if (isMonitor)
  189. {
  190. ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
  191. Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
  192. CosmosDict.nameCosmos.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
  193. }
  194. return CosmosDict;
  195. }
  196. private string GetPartitionKey<T>()
  197. {
  198. Type type = typeof(T);
  199. return GetPartitionKey(type);
  200. }
  201. private string GetPartitionKey(Type type)
  202. {
  203. PropertyInfo[] properties = type.GetProperties();
  204. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  205. foreach (PropertyInfo property in properties)
  206. {
  207. if (property.Name.Equals("PartitionKey"))
  208. {
  209. attrProperties.Add(property);
  210. break;
  211. }
  212. object[] attributes = property.GetCustomAttributes(true);
  213. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  214. {
  215. if (attribute is PartitionKeyAttribute)
  216. {
  217. attrProperties.Add(property);
  218. }
  219. }
  220. }
  221. if (attrProperties.Count <= 0)
  222. {
  223. throw new BizException(type.Name + " has no PartitionKey !");
  224. }
  225. else
  226. {
  227. if (attrProperties.Count == 1)
  228. {
  229. return attrProperties[0].Name;
  230. }
  231. else { throw new BizException("PartitionKey can only be single!"); }
  232. }
  233. }
  234. private async Task<CosmosModelInfo> InitializeCollection<T>()
  235. {
  236. Type type = typeof(T);
  237. string partitionKey = GetPartitionKey<T>();
  238. CosmosDBAttribute cosmosDBAttribute = null;
  239. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  240. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  241. {
  242. cosmosDBAttribute = attributes.First<CosmosDBAttribute>();
  243. }
  244. else {
  245. throw new BizException(type.Name+"未指定CosmosDB表名",ResponseCode.PARAMS_ERROR);
  246. }
  247. return await InitializeCollection(cosmosDBAttribute, type.Name, partitionKey);
  248. }
  249. private async Task<CosmosModelInfo> InitializeCollection(CosmosDBAttribute cosmosDBAttribute, string typeName, string PartitionKey)
  250. {
  251. /////内存中已经存在这个表则直接返回
  252. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo cosmosModelInfo))
  253. {
  254. return cosmosModelInfo;
  255. }///如果没有则尝试默认创建
  256. else
  257. {
  258. ContainerProperties containerProperties = new ContainerProperties { Id = cosmosDBAttribute.Name };
  259. if (!string.IsNullOrEmpty(PartitionKey))
  260. {
  261. containerProperties.PartitionKeyPath = "/"+ PartitionKey;
  262. }
  263. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: cosmosDBAttribute.RU);
  264. CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cosmosDBAttribute.Cache,monitor= cosmosDBAttribute.Monitor };
  265. CosmosDict. nameCosmos.TryAdd(cosmosDBAttribute.Name, cosmosModel);
  266. CosmosDict.typeCosmos.TryAdd(typeName, cosmosModel);
  267. return cosmosModel;
  268. }
  269. }
  270. /// <summary>
  271. /// 按TTL删除
  272. /// </summary>
  273. /// <typeparam name="T"></typeparam>
  274. /// <param name="list"></param>
  275. /// <returns></returns>
  276. private async Task<List<T>> DeleteTTL<T>(List<T> list) where T : ID
  277. {
  278. CosmosModelInfo container = await InitializeCollection<T>();
  279. list.ForEach(x => { x.ttl = ttl; });
  280. list = await SaveOrUpdateAll(list);
  281. if (container.cache && RedisHelper.Instance != null)
  282. {
  283. list.ForEach(async x => {
  284. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  285. });
  286. }
  287. return list;
  288. }
  289. public async Task<List<IdPk>> DeleteAll<T>(List<IdPk> ids) where T : ID
  290. {
  291. // string pk = GetPartitionKey<T>();
  292. CosmosModelInfo container = await InitializeCollection<T>();
  293. List<IdPk> idPks = new List<IdPk>();
  294. if (container.monitor)
  295. {
  296. List<T> list = await FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
  297. list = await DeleteTTL(list);
  298. return ids;
  299. }
  300. else
  301. {
  302. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  303. Stopwatch stopwatch = Stopwatch.StartNew();
  304. for (int i = 0; i < pages; i++)
  305. {
  306. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  307. List<Task> tasks = new List<Task>(lists.Count);
  308. lists.ForEach(item =>
  309. {
  310. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  311. .ContinueWith((Task<ResponseMessage> task) =>
  312. {
  313. using (ResponseMessage response = task.Result)
  314. {
  315. idPks.Add(new IdPk { id = item.id, pk = item.pk, StatusCode = response.StatusCode });
  316. // if (!response.IsSuccessStatusCode)
  317. // {
  318. // }
  319. }
  320. }
  321. ));
  322. });
  323. await Task.WhenAll(tasks);
  324. if (container.cache && RedisHelper.Instance != null)
  325. {
  326. lists.ForEach(async x => {
  327. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  328. });
  329. }
  330. }
  331. stopwatch.Stop();
  332. return idPks;
  333. }
  334. }
  335. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string, object> dict) where T : ID
  336. {
  337. if (dict.Keys.Count > 0)
  338. {
  339. List<T> list = await FindByDict<T>(dict);
  340. return await DeleteAll(list);
  341. }
  342. else
  343. {
  344. throw new BizException("参数为空", 500);
  345. }
  346. }
  347. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  348. {
  349. Type type = typeof(T);
  350. string pk = GetPartitionKey<T>();
  351. CosmosModelInfo container = await InitializeCollection<T>();
  352. List<IdPk> idPks = new List<IdPk>();
  353. if (container.monitor)
  354. {
  355. enyites = await DeleteTTL(enyites);
  356. foreach (T t in enyites)
  357. {
  358. object o = type.GetProperty(pk).GetValue(t, null);
  359. idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
  360. }
  361. return idPks;
  362. }
  363. else
  364. {
  365. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  366. Stopwatch stopwatch = Stopwatch.StartNew();
  367. for (int i = 0; i < pages; i++)
  368. {
  369. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  370. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  371. lists.ForEach(x =>
  372. {
  373. object o = type.GetProperty(pk).GetValue(x, null);
  374. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  375. itemsToInsert.Add(keyValue);
  376. });
  377. List<Task> tasks = new List<Task>(lists.Count);
  378. itemsToInsert.ForEach(item =>
  379. {
  380. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  381. .ContinueWith((Task<ResponseMessage> task) =>
  382. {
  383. using (ResponseMessage response = task.Result)
  384. {
  385. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  386. }
  387. }
  388. ));
  389. });
  390. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  391. {
  392. lists.ForEach(async x => {
  393. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  394. });
  395. }
  396. }
  397. stopwatch.Stop();
  398. return idPks;
  399. }
  400. }
  401. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID
  402. {
  403. return await DeleteAsync<T>(idPk.id, idPk.pk);
  404. }
  405. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  406. {
  407. // pk = GetPartitionKey<T>();
  408. CosmosModelInfo container = await InitializeCollection<T>();
  409. if (container.monitor)
  410. {
  411. List<T> list = await FindByDict<T>(new Dictionary<string, object>() { { "id", id } });
  412. if (list.Count > 0)
  413. {
  414. await DeleteTTL<T>(list);
  415. return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
  416. }
  417. else
  418. {
  419. throw new BizException("未找到ID匹配的数据,删除失败");
  420. }
  421. }
  422. else
  423. {
  424. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  425. if (container.cache && RedisHelper.Instance != null)
  426. {
  427. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  428. }
  429. return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
  430. }
  431. }
  432. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  433. {
  434. CosmosModelInfo container = await InitializeCollection<T>();
  435. string partitionKey = GetPartitionKey<T>();
  436. Type type = typeof(T);
  437. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  438. if (container.monitor)
  439. {
  440. await DeleteTTL<T>(new List<T>() { entity });
  441. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
  442. }
  443. else
  444. {
  445. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  446. if (container.cache && RedisHelper.Instance != null)
  447. {
  448. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  449. }
  450. return new IdPk { id = entity.id, pk = partitionKey, StatusCode = response.StatusCode };
  451. }
  452. }
  453. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  454. {
  455. CosmosModelInfo container = await InitializeCollection<T>();
  456. string pk = typeof(T).Name;
  457. StringBuilder sql;
  458. sql = SQLHelperParametric.GetSQLSelect(propertys);
  459. sql.Append(" where c.pk = " + pk);
  460. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = sql.ToString() };
  461. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  462. return await ResultsFromFeedIterator(query);
  463. }
  464. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  465. {
  466. List<T> results = new List<T>();
  467. while (query.HasMoreResults)
  468. {
  469. foreach (T t in await query.ReadNextAsync())
  470. {
  471. results.Add(t);
  472. if (results.Count == maxItemCount)
  473. {
  474. return results;
  475. }
  476. }
  477. }
  478. return results;
  479. }
  480. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  481. {
  482. List<T> results = new List<T>();
  483. while (query.HasMoreResults)
  484. {
  485. if (results.Count() >= itemsPerPage)
  486. {
  487. await batchAction(results);
  488. results.Clear();
  489. }
  490. results.AddRange(await query.ReadNextAsync());
  491. }
  492. if (results.Count() > 0)
  493. {
  494. await batchAction(results);
  495. results.Clear();
  496. }
  497. return results;
  498. }
  499. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
  500. {
  501. if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
  502. {
  503. //StringBuilder sql = new StringBuilder("select value(c) from c");
  504. //SQLHelper.GetSQL(dict, ref sql);
  505. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery, int itemsPerPage = -1, int?
  506. //{
  507. // QueryText = sql.ToString()
  508. //};
  509. string pk = container.type.Name;
  510. StringBuilder sql;
  511. sql = SQLHelperParametric.GetSQLSelect(propertys);
  512. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  513. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  514. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  515. return await ResultsFromFeedIterator(query);
  516. }
  517. else
  518. {
  519. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  520. }
  521. }
  522. public async Task<List<int>> FindCountByDict<T>(Dictionary<string, object> dict) {
  523. CosmosModelInfo container = await InitializeCollection<T>();
  524. string pk = typeof(T).Name;
  525. StringBuilder sql = new StringBuilder("select value count(c) from c");
  526. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  527. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  528. FeedIterator<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  529. return await ResultsFromFeedIterator(query);
  530. }
  531. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
  532. {
  533. if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
  534. {
  535. string pk = container.type.Name;
  536. dict.Remove("@CURRPAGE");
  537. dict.Remove("@PAGESIZE");
  538. dict.Remove("@ASC");
  539. dict.Remove("@DESC");
  540. StringBuilder sql = new StringBuilder("select value count(c) from c");
  541. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  542. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  543. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  544. return await ResultsFromFeedIterator(query);
  545. }
  546. else
  547. {
  548. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  549. }
  550. }
  551. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  552. {
  553. return await FindByDict<T>(dict, propertys);
  554. }
  555. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  556. {
  557. StringBuilder sql;
  558. sql = SQLHelperParametric.GetSQLSelect(propertys);
  559. string pk = typeof(T).Name;
  560. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  561. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  562. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  563. }
  564. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  565. {
  566. CosmosModelInfo container = await InitializeCollection<T>();
  567. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  568. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  569. requestOptions: queryOptions);
  570. return await ResultsFromFeedIterator(query);
  571. }
  572. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  573. {
  574. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  575. }
  576. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  577. int? maxBufferedItemCount = null,
  578. int? maxConcurrency = null)
  579. {
  580. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  581. {
  582. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  583. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  584. MaxConcurrency = maxConcurrency ?? 50
  585. };
  586. return queryRequestOptions;
  587. }
  588. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  589. {
  590. CosmosModelInfo container = await InitializeCollection<T>();
  591. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  592. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  593. requestOptions: queryOptions);
  594. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  595. }
  596. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  597. {
  598. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  599. {
  600. MaxItemCount = itemsPerPage
  601. };
  602. return queryRequestOptions;
  603. }
  604. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false) where T : ID
  605. {
  606. string pk = typeof(T).Name;
  607. query = query.And(x => x.pk == pk);
  608. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  609. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  610. FeedIterator<T> feedIterator;
  611. CosmosModelInfo container = await InitializeCollection<T>();
  612. if (query == null)
  613. {
  614. if (order != null)
  615. {
  616. if (isDesc)
  617. {
  618. feedIterator = container.container
  619. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  620. .ToFeedIterator();
  621. }
  622. else
  623. {
  624. feedIterator = container.container
  625. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  626. .ToFeedIterator();
  627. }
  628. }
  629. else
  630. {
  631. feedIterator = container.container
  632. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  633. .ToFeedIterator();
  634. }
  635. }
  636. else
  637. {
  638. if (order != null)
  639. {
  640. if (isDesc)
  641. {
  642. feedIterator = container.container
  643. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  644. .Where(query).OrderByDescending(order)
  645. .ToFeedIterator();
  646. }
  647. else
  648. {
  649. feedIterator = container.container
  650. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  651. .Where(query).OrderBy(order)
  652. .ToFeedIterator();
  653. }
  654. }
  655. else
  656. {
  657. feedIterator = container.container
  658. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  659. .Where(query)
  660. .ToFeedIterator();
  661. }
  662. }
  663. return await ResultsFromFeedIterator<T>(feedIterator);
  664. }
  665. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null) where T : ID
  666. {
  667. if (sql.Contains(".pk"))
  668. {
  669. CosmosModelInfo container = await InitializeCollection<T>();
  670. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  671. if (Parameters != null)
  672. {
  673. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  674. {
  675. QueryText = sql,
  676. Parameters = Parameters
  677. };
  678. FeedIterator<T> feedIterator = container.container
  679. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  680. return await ResultsFromFeedIterator(feedIterator);
  681. }
  682. else
  683. {
  684. QueryDefinition queryDefinition = new QueryDefinition(sql);
  685. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  686. }
  687. }
  688. else
  689. {
  690. throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
  691. }
  692. }
  693. public async Task<T> Save<T>(T entity) where T : ID
  694. {
  695. try
  696. {
  697. CosmosModelInfo container = await InitializeCollection<T>();
  698. entity.pk = container.type.Name;
  699. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  700. if (container.cache && RedisHelper.Instance != null)
  701. {
  702. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  703. {
  704. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  705. }
  706. else
  707. {
  708. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  709. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  710. }
  711. }
  712. return response.Resource;
  713. }
  714. catch (Exception e)
  715. {
  716. throw new BizException(e.Message);
  717. }
  718. }
  719. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  720. {
  721. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  722. CosmosModelInfo container = await InitializeCollection<T>();
  723. bool flag = false;
  724. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  725. {
  726. flag = true;
  727. }
  728. string partitionKey = GetPartitionKey<T>();
  729. Type type = typeof(T);
  730. Stopwatch stopwatch = Stopwatch.StartNew();
  731. for (int i = 0; i < pages; i++)
  732. {
  733. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  734. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  735. lists.ForEach(async x =>
  736. {
  737. x.pk = type.Name;
  738. MemoryStream stream = new MemoryStream();
  739. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  740. object o = type.GetProperty(partitionKey).GetValue(x, null);
  741. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  742. itemsToInsert.Add(keyValue);
  743. });
  744. List<Task> tasks = new List<Task>(lists.Count);
  745. itemsToInsert.ForEach(item =>
  746. {
  747. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  748. .ContinueWith((Task<ResponseMessage> task) =>
  749. {
  750. using (ResponseMessage response = task.Result)
  751. {
  752. if (!response.IsSuccessStatusCode)
  753. {
  754. }
  755. }
  756. }
  757. ));
  758. });
  759. await Task.WhenAll(tasks);
  760. if (container.cache && RedisHelper.Instance != null)
  761. {
  762. lists.ForEach(async x => {
  763. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  764. });
  765. }
  766. }
  767. if (container.cache && RedisHelper.Instance != null && !flag)
  768. {
  769. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  770. }
  771. stopwatch.Stop();
  772. return enyites;
  773. }
  774. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  775. {
  776. CosmosModelInfo container = await InitializeCollection<T>();
  777. entity.pk = container.type.Name;
  778. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  779. if (container.cache && RedisHelper.Instance != null)
  780. {
  781. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  782. {
  783. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  784. }
  785. else
  786. {
  787. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  788. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  789. }
  790. }
  791. return response.Resource;
  792. }
  793. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  794. {
  795. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  796. //{
  797. // Task.WaitAll(Update(item));
  798. //}));
  799. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  800. CosmosModelInfo container = await InitializeCollection<T>();
  801. bool flag = false;
  802. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  803. {
  804. flag = true;
  805. }
  806. string partitionKey = GetPartitionKey<T>();
  807. Type type = typeof(T);
  808. Stopwatch stopwatch = Stopwatch.StartNew();
  809. for (int i = 0; i < pages; i++)
  810. {
  811. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  812. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  813. lists.ForEach(async x =>
  814. {
  815. x.pk = type.Name;
  816. MemoryStream stream = new MemoryStream();
  817. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  818. object o = type.GetProperty(partitionKey).GetValue(x, null);
  819. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  820. itemsToInsert.Add(keyValue);
  821. });
  822. List<Task> tasks = new List<Task>(lists.Count);
  823. itemsToInsert.ForEach(item =>
  824. {
  825. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  826. .ContinueWith((Task<ResponseMessage> task) =>
  827. {
  828. //using (ResponseMessage response = task.Result)
  829. //{
  830. // if (!response.IsSuccessStatusCode)
  831. // {
  832. // }
  833. //}
  834. }
  835. ));
  836. });
  837. await Task.WhenAll(tasks);
  838. if (container.cache && RedisHelper.Instance != null)
  839. {
  840. lists.ForEach(async x => {
  841. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  842. });
  843. }
  844. }
  845. if (container.cache && RedisHelper.Instance != null && !flag)
  846. {
  847. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  848. }
  849. stopwatch.Stop();
  850. return enyites;
  851. }
  852. public async Task<T> Update<T>(T entity) where T : ID
  853. {
  854. CosmosModelInfo container = await InitializeCollection<T>();
  855. string partitionKey = GetPartitionKey<T>();
  856. Type type = typeof(T);
  857. entity.pk = type.Name;
  858. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  859. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  860. if (container.cache && RedisHelper.Instance != null)
  861. {
  862. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  863. {
  864. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  865. }
  866. else
  867. {
  868. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  869. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  870. }
  871. }
  872. return response.Resource;
  873. }
  874. internal class Item
  875. {
  876. public string id { get; set; }
  877. public string pk { get; set; }
  878. public MemoryStream stream { get; set; }
  879. }
  880. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  881. {
  882. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  883. //{
  884. // Task.WaitAll(Update(item));
  885. //}));
  886. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  887. CosmosModelInfo container = await InitializeCollection<T>();
  888. bool flag = false;
  889. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  890. {
  891. flag = true;
  892. }
  893. string partitionKey = GetPartitionKey<T>();
  894. Type type = typeof(T);
  895. Stopwatch stopwatch = Stopwatch.StartNew();
  896. for (int i = 0; i < pages; i++)
  897. {
  898. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  899. List<Item> itemsToInsert = new List<Item>();
  900. lists.ForEach(async x =>
  901. {
  902. x.pk = type.Name;
  903. MemoryStream stream = new MemoryStream();
  904. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  905. object o = type.GetProperty(partitionKey).GetValue(x, null);
  906. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  907. itemsToInsert.Add(keyValue);
  908. });
  909. List<Task> tasks = new List<Task>(lists.Count);
  910. itemsToInsert.ForEach(item =>
  911. {
  912. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  913. .ContinueWith((Task<ResponseMessage> task) =>
  914. {
  915. //using (ResponseMessage response = task.Result)
  916. //{
  917. // if (!response.IsSuccessStatusCode)
  918. // {
  919. // }
  920. //}
  921. }
  922. ));
  923. });
  924. await Task.WhenAll(tasks);
  925. if (container.cache && RedisHelper.Instance != null)
  926. {
  927. lists.ForEach(async x => {
  928. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  929. });
  930. }
  931. }
  932. if (container.cache && RedisHelper.Instance != null && !flag)
  933. {
  934. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  935. }
  936. stopwatch.Stop();
  937. return enyites;
  938. }
  939. private async Task<List<T>> FindByIdAsSql<T>(string id) where T : ID
  940. {
  941. CosmosModelInfo container = await InitializeCollection<T>();
  942. string pk = container.type.Name;
  943. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  944. {
  945. QueryText = @"SELECT *
  946. FROM c
  947. WHERE c.pk='" + pk + "' and c.id = @id",
  948. Parameters = new Dictionary<string, object>
  949. {
  950. { "@id",id}
  951. }
  952. };
  953. FeedIterator<T> feedIterator = container.container
  954. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  955. return (await ResultsFromFeedIterator(feedIterator)).ToList();
  956. }
  957. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  958. {
  959. CosmosModelInfo container = await InitializeCollection<T>();
  960. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  961. return response.Resource;
  962. }
  963. public async Task<List<T>> FindById<T>(string id, bool cache = true) where T : ID
  964. {
  965. CosmosModelInfo container = await InitializeCollection<T>();
  966. if (container.cache && RedisHelper.Instance != null && cache == true)
  967. {
  968. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByIdAsSql<T>(id); });
  969. }
  970. else
  971. {
  972. return await FindByIdAsSql<T>(id);
  973. }
  974. }
  975. public async Task<List<T>> FindByIds<T>(List<string> ids) where T : ID
  976. {
  977. CosmosModelInfo container = await InitializeCollection<T>();
  978. if (container.cache && RedisHelper.Instance != null)
  979. {
  980. List<T> list = new List<T>();
  981. List<string> NotIn = new List<string>();
  982. foreach (string id in ids)
  983. {
  984. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  985. {
  986. NotIn.Add(id);
  987. }
  988. else
  989. {
  990. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  991. }
  992. }
  993. if (NotIn.IsNotEmpty())
  994. {
  995. List<T> noInList = await FindByDict<T>(new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  996. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  997. list.AddRange(noInList);
  998. }
  999. return list;
  1000. }
  1001. else
  1002. {
  1003. return await FindByDict<T>(new Dictionary<string, object> { { "id", ids.ToArray() } });
  1004. }
  1005. }
  1006. public async Task<dynamic> FindById(string typeName, string id)
  1007. {
  1008. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
  1009. {
  1010. if (container.cache && RedisHelper.Instance != null)
  1011. {
  1012. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary<string, object> { { "id", id } }); });
  1013. }
  1014. else
  1015. {
  1016. return await FindByDict(typeName, new Dictionary<string, object> { { "id", id } });
  1017. }
  1018. }
  1019. else
  1020. {
  1021. throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
  1022. }
  1023. }
  1024. public async Task<List<dynamic>> FindByIds(string typeName, List<string> ids)
  1025. {
  1026. if (CosmosDict. typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
  1027. {
  1028. if (container.cache && RedisHelper.Instance != null)
  1029. {
  1030. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary<string, object> { { "id", ids.ToArray() } }); });
  1031. }
  1032. else
  1033. {
  1034. return await FindByDict(typeName, new Dictionary<string, object> { { "id", ids.ToArray() } });
  1035. }
  1036. }
  1037. else
  1038. {
  1039. throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
  1040. }
  1041. }
  1042. }
  1043. }