AzureCosmosDBV3Repository.cs 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Linq.Expressions;
  10. using System.Net;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. using TEAMModelOS.SDK.Context.Attributes.Azure;
  17. using TEAMModelOS.SDK.Context.Exception;
  18. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  19. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  20. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  21. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  22. {
  23. public class CosmosModelInfo
  24. {
  25. public Container container { get; set; }
  26. public bool cache { get; set; }
  27. public bool monitor { get; set; } = false;
  28. public Type type { get; set; }
  29. }
  30. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  31. {
  32. private CosmosClient CosmosClient { get; set; }
  33. /// <summary>
  34. /// 线程安全的dict类型
  35. /// </summary>
  36. private Dictionary<string, CosmosModelInfo> DocumentCollectionDict { get; set; } = new Dictionary<string, CosmosModelInfo>();
  37. private string DatabaseId { get; set; }
  38. private int CollectionThroughput { get; set; }
  39. private Database database { get; set; }
  40. int pageSize = 200;
  41. private const string CacheCosmosPrefix = "cosmos:";
  42. private string[] ScanModel { get; set; }
  43. private const int timeoutSeconds = 86400;
  44. private const int ttl = 1;
  45. private string leaseId = "AleaseContainer";
  46. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  47. {
  48. try
  49. {
  50. if (!string.IsNullOrEmpty(options.ConnectionString))
  51. {
  52. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  53. }
  54. else
  55. {
  56. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  57. }
  58. DatabaseId = options.Database;
  59. CollectionThroughput = options.CollectionThroughput;
  60. ScanModel = options.ScanModel;
  61. // InitializeDatabase().GetAwaiter().GetResult();
  62. }
  63. catch (CosmosException e)
  64. {
  65. // Dispose(true);
  66. throw new BizException(e.Message, 500, e.StackTrace);
  67. }
  68. }
  69. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  70. {
  71. try
  72. {
  73. if (!string.IsNullOrEmpty(options.ConnectionString))
  74. {
  75. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  76. }
  77. else
  78. {
  79. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  80. }
  81. DatabaseId = options.Database;
  82. CollectionThroughput = options.CollectionThroughput;
  83. ScanModel = options.ScanModel;
  84. // InitializeDatabase().GetAwaiter().GetResult();
  85. }
  86. catch (CosmosException e)
  87. {
  88. // Dispose(true);
  89. throw new BizException(e.Message, 500, e.StackTrace);
  90. }
  91. }
  92. public async Task<Dictionary<string, CosmosModelInfo>> InitializeDatabase()
  93. {
  94. try
  95. {
  96. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  97. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  98. while (resultSetIterator.HasMoreResults)
  99. {
  100. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  101. {
  102. DocumentCollectionDict.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), cache = false, monitor = false });
  103. }
  104. }
  105. bool isMonitor = false;
  106. //获取数据库所有的表
  107. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  108. foreach (Type type in types)
  109. {
  110. string PartitionKey = GetPartitionKey(type);
  111. string CollectionName = "";
  112. int RU = 0;
  113. bool cache = false;
  114. bool monitor = false;
  115. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  116. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  117. {
  118. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  119. }
  120. else
  121. {
  122. CollectionName = type.Name;
  123. }
  124. if (attributes.First<CosmosDBAttribute>().Cache)
  125. {
  126. cache = attributes.First<CosmosDBAttribute>().Cache;
  127. }
  128. if (attributes.First<CosmosDBAttribute>().Monitor)
  129. {
  130. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  131. if (monitor)
  132. {
  133. isMonitor = true;
  134. }
  135. }
  136. //else
  137. //{
  138. // cache = false;
  139. //}
  140. if (attributes.First<CosmosDBAttribute>().RU > 400)
  141. {
  142. RU = attributes.First<CosmosDBAttribute>().RU;
  143. }
  144. else
  145. {
  146. RU = CollectionThroughput;
  147. }
  148. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  149. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  150. { //更新RU
  151. cosmosModelInfo.cache = cache;
  152. Container container = CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id);
  153. int? throughputResponse = await container.ReadThroughputAsync();
  154. if (throughputResponse < RU)
  155. {
  156. await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
  157. }
  158. DocumentCollectionDict[CollectionName] = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type };
  159. }
  160. else
  161. {
  162. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName ,DefaultTimeToLive=-1 };
  163. if (!string.IsNullOrEmpty(PartitionKey))
  164. {
  165. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  166. }
  167. if (RU > CollectionThroughput)
  168. {
  169. CollectionThroughput = RU;
  170. }
  171. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  172. DocumentCollectionDict.TryAdd(CollectionName, new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type });
  173. }
  174. }
  175. if (isMonitor)
  176. {
  177. ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
  178. Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
  179. DocumentCollectionDict.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
  180. }
  181. return DocumentCollectionDict;
  182. }
  183. catch (CosmosException e)
  184. {
  185. throw new BizException(e.Message, 500, e.StackTrace);
  186. }
  187. }
  188. private string GetPartitionKey<T>()
  189. {
  190. Type type = typeof(T);
  191. return GetPartitionKey(type);
  192. }
  193. private string GetPartitionKey(Type type)
  194. {
  195. PropertyInfo[] properties = type.GetProperties();
  196. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  197. foreach (PropertyInfo property in properties)
  198. {
  199. if (property.Name.Equals("PartitionKey"))
  200. {
  201. attrProperties.Add(property);
  202. break;
  203. }
  204. object[] attributes = property.GetCustomAttributes(true);
  205. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  206. {
  207. if (attribute is PartitionKeyAttribute)
  208. {
  209. attrProperties.Add(property);
  210. }
  211. }
  212. }
  213. if (attrProperties.Count <= 0)
  214. {
  215. throw new BizException(type.Name + "has no PartitionKey !");
  216. }
  217. else
  218. {
  219. if (attrProperties.Count == 1)
  220. {
  221. return attrProperties[0].Name;
  222. }
  223. else { throw new BizException("PartitionKey can only be single!"); }
  224. }
  225. }
  226. private async Task<CosmosModelInfo> InitializeCollection<T>()
  227. {
  228. Type type = typeof(T);
  229. string partitionKey = GetPartitionKey<T>();
  230. string CollectionName;
  231. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  232. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  233. {
  234. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  235. }
  236. else
  237. {
  238. CollectionName = type.Name;
  239. }
  240. return await InitializeCollection(CollectionName, partitionKey);
  241. }
  242. private async Task<CosmosModelInfo> InitializeCollection(string CollectionName, string PartitionKey)
  243. {
  244. /////内存中已经存在这个表则直接返回
  245. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  246. {
  247. return cosmosModelInfo;
  248. }///如果没有则尝试默认创建
  249. else
  250. {
  251. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  252. if (!string.IsNullOrEmpty(PartitionKey))
  253. {
  254. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  255. }
  256. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  257. CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = false };
  258. DocumentCollectionDict.TryAdd(CollectionName, cosmosModel);
  259. return cosmosModel;
  260. }
  261. }
  262. /// <summary>
  263. /// 按TTL删除
  264. /// </summary>
  265. /// <typeparam name="T"></typeparam>
  266. /// <param name="list"></param>
  267. /// <returns></returns>
  268. private async Task<List<T>> DeleteTTL<T>(List<T> list)where T:ID {
  269. CosmosModelInfo container = await InitializeCollection<T>();
  270. list.ForEach(x=> { x.ttl = ttl; });
  271. list= await SaveOrUpdateAll(list);
  272. if (container.cache && RedisHelper.Instance != null)
  273. {
  274. list.ForEach(async x => {
  275. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  276. });
  277. }
  278. return list;
  279. }
  280. public async Task<List<IdPk>> DeleteAll<T>(List<IdPk> ids) where T : ID
  281. {
  282. CosmosModelInfo container = await InitializeCollection<T>();
  283. List<IdPk> idPks = new List<IdPk>();
  284. if (container.monitor)
  285. {
  286. List<T> list= await FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
  287. list= await DeleteTTL(list);
  288. return ids;
  289. }
  290. else {
  291. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  292. Stopwatch stopwatch = Stopwatch.StartNew();
  293. for (int i = 0; i < pages; i++)
  294. {
  295. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  296. List<Task> tasks = new List<Task>(lists.Count);
  297. lists.ForEach(item =>
  298. {
  299. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  300. .ContinueWith((Task<ResponseMessage> task) =>
  301. {
  302. using (ResponseMessage response = task.Result)
  303. {
  304. idPks.Add(new IdPk { id = item.id, pk = item.pk.ToString(), StatusCode = response.StatusCode });
  305. // if (!response.IsSuccessStatusCode)
  306. // {
  307. // }
  308. }
  309. }
  310. ));
  311. });
  312. await Task.WhenAll(tasks);
  313. if (container.cache && RedisHelper.Instance != null)
  314. {
  315. lists.ForEach(async x => {
  316. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  317. });
  318. }
  319. }
  320. stopwatch.Stop();
  321. return idPks;
  322. }
  323. }
  324. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string, object> dict) where T : ID
  325. {
  326. if (dict.Keys.Count > 0)
  327. {
  328. List<T> list = await FindByDict<T>(dict);
  329. return await DeleteAll(list);
  330. }
  331. else {
  332. throw new BizException("参数为空", 500);
  333. }
  334. }
  335. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  336. {
  337. Type type = typeof(T);
  338. string pk = GetPartitionKey<T>();
  339. CosmosModelInfo container = await InitializeCollection<T>();
  340. List<IdPk> idPks = new List<IdPk>();
  341. if (container.monitor)
  342. {
  343. enyites = await DeleteTTL(enyites);
  344. foreach (T t in enyites) {
  345. object o = type.GetProperty(pk).GetValue(t, null);
  346. idPks.Add(new IdPk { id = t.id, pk =o.ToString(), StatusCode = HttpStatusCode .NoContent});
  347. }
  348. return idPks;
  349. }
  350. else {
  351. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  352. Stopwatch stopwatch = Stopwatch.StartNew();
  353. for (int i = 0; i < pages; i++)
  354. {
  355. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  356. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  357. lists.ForEach(x =>
  358. {
  359. object o = type.GetProperty(pk).GetValue(x, null);
  360. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  361. itemsToInsert.Add(keyValue);
  362. });
  363. List<Task> tasks = new List<Task>(lists.Count);
  364. itemsToInsert.ForEach(item =>
  365. {
  366. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  367. .ContinueWith((Task<ResponseMessage> task) =>
  368. {
  369. using (ResponseMessage response = task.Result)
  370. {
  371. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  372. }
  373. }
  374. ));
  375. });
  376. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  377. {
  378. lists.ForEach(async x => {
  379. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  380. });
  381. }
  382. }
  383. stopwatch.Stop();
  384. return idPks;
  385. }
  386. }
  387. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID
  388. {
  389. return await DeleteAsync<T>(idPk.id, idPk.pk);
  390. }
  391. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  392. {
  393. CosmosModelInfo container = await InitializeCollection<T>();
  394. if (container.monitor)
  395. {
  396. List<T> list= await FindByDict<T>(new Dictionary<string, object>() { { "id", id } });
  397. if (list.Count > 0)
  398. {
  399. await DeleteTTL<T>(list);
  400. return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
  401. }
  402. else {
  403. throw new BizException("未找到ID匹配的数据,删除失败");
  404. }
  405. }
  406. else {
  407. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  408. if (container.cache && RedisHelper.Instance != null)
  409. {
  410. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  411. }
  412. return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
  413. }
  414. }
  415. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  416. {
  417. CosmosModelInfo container = await InitializeCollection<T>();
  418. string partitionKey = GetPartitionKey<T>();
  419. Type type = typeof(T);
  420. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  421. if (container.monitor)
  422. {
  423. await DeleteTTL<T>(new List<T>() { entity});
  424. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
  425. }
  426. else {
  427. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  428. if (container.cache && RedisHelper.Instance != null)
  429. {
  430. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  431. }
  432. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
  433. }
  434. }
  435. //public async Task<T> DeleteAsync<T>(string id) where T : ID
  436. //{
  437. // Container container = await InitializeCollection<T>();
  438. // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
  439. // return response.Resource;
  440. //}
  441. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  442. {
  443. CosmosModelInfo container = await InitializeCollection<T>();
  444. StringBuilder sql;
  445. sql = SQLHelperParametric.GetSQLSelect(propertys);
  446. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = sql.ToString() };
  447. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  448. return await ResultsFromFeedIterator(query);
  449. }
  450. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  451. {
  452. List<T> results = new List<T>();
  453. while (query.HasMoreResults)
  454. {
  455. foreach (T t in await query.ReadNextAsync())
  456. {
  457. results.Add(t);
  458. if (results.Count == maxItemCount)
  459. {
  460. return results;
  461. }
  462. }
  463. }
  464. return results;
  465. }
  466. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  467. {
  468. List<T> results = new List<T>();
  469. while (query.HasMoreResults)
  470. {
  471. if (results.Count() >= itemsPerPage)
  472. {
  473. await batchAction(results);
  474. results.Clear();
  475. }
  476. results.AddRange(await query.ReadNextAsync());
  477. }
  478. if (results.Count() > 0)
  479. {
  480. await batchAction(results);
  481. results.Clear();
  482. }
  483. return results;
  484. }
  485. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, string partitionKey = null, List<string> propertys = null)
  486. {
  487. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  488. {
  489. //StringBuilder sql = new StringBuilder("select value(c) from c");
  490. //SQLHelper.GetSQL(dict, ref sql);
  491. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery, int itemsPerPage = -1, int?
  492. //{
  493. // QueryText = sql.ToString()
  494. //};
  495. StringBuilder sql;
  496. sql = SQLHelperParametric.GetSQLSelect(propertys);
  497. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  498. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  499. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  500. return await ResultsFromFeedIterator(query);
  501. }
  502. else
  503. {
  504. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  505. }
  506. }
  507. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, string partitionKey = null)
  508. {
  509. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  510. {
  511. dict.Remove("@CURRPAGE");
  512. dict.Remove("@PAGESIZE");
  513. dict.Remove("@ASC");
  514. dict.Remove("@DESC");
  515. StringBuilder sql = new StringBuilder("select value count(c) from c");
  516. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  517. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  518. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  519. return await ResultsFromFeedIterator(query);
  520. }
  521. else
  522. {
  523. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  524. }
  525. }
  526. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, string partitionKey = null, List<string> propertys = null) where T : ID
  527. {
  528. return await FindByDict<T>(dict, partitionKey, propertys);
  529. }
  530. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, string partitionKey = null, List<string> propertys = null) where T : ID
  531. {
  532. StringBuilder sql;
  533. sql = SQLHelperParametric.GetSQLSelect(propertys);
  534. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  535. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  536. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  537. }
  538. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  539. {
  540. CosmosModelInfo container = await InitializeCollection<T>();
  541. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  542. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  543. requestOptions: queryOptions);
  544. return await ResultsFromFeedIterator(query);
  545. }
  546. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  547. {
  548. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  549. }
  550. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  551. int? maxBufferedItemCount = null,
  552. int? maxConcurrency = null)
  553. {
  554. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  555. {
  556. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  557. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  558. MaxConcurrency = maxConcurrency ?? 50
  559. };
  560. return queryRequestOptions;
  561. }
  562. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  563. {
  564. CosmosModelInfo container = await InitializeCollection<T>();
  565. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  566. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  567. requestOptions: queryOptions);
  568. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  569. }
  570. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  571. {
  572. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  573. {
  574. MaxItemCount = itemsPerPage
  575. };
  576. return queryRequestOptions;
  577. }
  578. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false) where T : ID
  579. {
  580. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  581. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  582. FeedIterator<T> feedIterator;
  583. CosmosModelInfo container = await InitializeCollection<T>();
  584. if (query == null)
  585. {
  586. if (order != null)
  587. {
  588. if (isDesc)
  589. {
  590. feedIterator = container.container
  591. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  592. .ToFeedIterator();
  593. }
  594. else
  595. {
  596. feedIterator = container.container
  597. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  598. .ToFeedIterator();
  599. }
  600. }
  601. else
  602. {
  603. feedIterator = container.container
  604. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  605. .ToFeedIterator();
  606. }
  607. }
  608. else
  609. {
  610. if (order != null)
  611. {
  612. if (isDesc)
  613. {
  614. feedIterator = container.container
  615. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  616. .Where(query).OrderByDescending(order)
  617. .ToFeedIterator();
  618. }
  619. else
  620. {
  621. feedIterator = container.container
  622. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  623. .Where(query).OrderBy(order)
  624. .ToFeedIterator();
  625. }
  626. }
  627. else
  628. {
  629. feedIterator = container.container
  630. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  631. .Where(query)
  632. .ToFeedIterator();
  633. }
  634. }
  635. return await ResultsFromFeedIterator<T>(feedIterator);
  636. }
  637. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null) where T : ID
  638. {
  639. CosmosModelInfo container = await InitializeCollection<T>();
  640. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  641. if (Parameters != null)
  642. {
  643. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  644. {
  645. QueryText = sql,
  646. Parameters = Parameters
  647. };
  648. FeedIterator<T> feedIterator = container.container
  649. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  650. return await ResultsFromFeedIterator(feedIterator);
  651. }
  652. else
  653. {
  654. QueryDefinition queryDefinition = new QueryDefinition(sql);
  655. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  656. }
  657. }
  658. public async Task<T> Save<T>(T entity) where T : ID
  659. {
  660. try
  661. {
  662. CosmosModelInfo container = await InitializeCollection<T>();
  663. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  664. if (container.cache && RedisHelper.Instance != null)
  665. {
  666. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  667. {
  668. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  669. }
  670. else
  671. {
  672. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  673. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  674. }
  675. }
  676. return response.Resource;
  677. }
  678. catch (Exception e)
  679. {
  680. throw new BizException(e.Message);
  681. }
  682. }
  683. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  684. {
  685. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  686. CosmosModelInfo container = await InitializeCollection<T>();
  687. bool flag = false;
  688. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  689. {
  690. flag = true;
  691. }
  692. string pk = GetPartitionKey<T>();
  693. Type type = typeof(T);
  694. Stopwatch stopwatch = Stopwatch.StartNew();
  695. for (int i = 0; i < pages; i++)
  696. {
  697. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  698. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  699. lists.ForEach(async x =>
  700. {
  701. MemoryStream stream = new MemoryStream();
  702. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  703. object o = type.GetProperty(pk).GetValue(x, null);
  704. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  705. itemsToInsert.Add(keyValue);
  706. });
  707. List<Task> tasks = new List<Task>(lists.Count);
  708. itemsToInsert.ForEach(item =>
  709. {
  710. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  711. .ContinueWith((Task<ResponseMessage> task) =>
  712. {
  713. using (ResponseMessage response = task.Result)
  714. {
  715. if (!response.IsSuccessStatusCode)
  716. {
  717. }
  718. }
  719. }
  720. ));
  721. });
  722. await Task.WhenAll(tasks);
  723. if (container.cache && RedisHelper.Instance != null)
  724. {
  725. lists.ForEach(async x => {
  726. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  727. });
  728. }
  729. }
  730. if (container.cache && RedisHelper.Instance != null && !flag)
  731. {
  732. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  733. }
  734. stopwatch.Stop();
  735. return enyites;
  736. }
  737. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  738. {
  739. CosmosModelInfo container = await InitializeCollection<T>();
  740. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  741. if (container.cache && RedisHelper.Instance != null)
  742. {
  743. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  744. {
  745. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  746. }
  747. else
  748. {
  749. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  750. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  751. }
  752. }
  753. return response.Resource;
  754. }
  755. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  756. {
  757. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  758. //{
  759. // Task.WaitAll(Update(item));
  760. //}));
  761. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  762. CosmosModelInfo container = await InitializeCollection<T>();
  763. bool flag = false;
  764. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  765. {
  766. flag = true;
  767. }
  768. string pk = GetPartitionKey<T>();
  769. Type type = typeof(T);
  770. Stopwatch stopwatch = Stopwatch.StartNew();
  771. for (int i = 0; i < pages; i++)
  772. {
  773. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  774. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  775. lists.ForEach(async x =>
  776. {
  777. MemoryStream stream = new MemoryStream();
  778. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  779. object o = type.GetProperty(pk).GetValue(x, null);
  780. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  781. itemsToInsert.Add(keyValue);
  782. });
  783. List<Task> tasks = new List<Task>(lists.Count);
  784. itemsToInsert.ForEach(item =>
  785. {
  786. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  787. .ContinueWith((Task<ResponseMessage> task) =>
  788. {
  789. //using (ResponseMessage response = task.Result)
  790. //{
  791. // if (!response.IsSuccessStatusCode)
  792. // {
  793. // }
  794. //}
  795. }
  796. ));
  797. });
  798. await Task.WhenAll(tasks);
  799. if (container.cache && RedisHelper.Instance != null)
  800. {
  801. lists.ForEach(async x => {
  802. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  803. });
  804. }
  805. }
  806. if (container.cache && RedisHelper.Instance != null && !flag)
  807. {
  808. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  809. }
  810. stopwatch.Stop();
  811. return enyites;
  812. }
  813. public async Task<T> Update<T>(T entity) where T : ID
  814. {
  815. CosmosModelInfo container = await InitializeCollection<T>();
  816. string pk = GetPartitionKey<T>();
  817. object o = typeof(T).GetProperty(pk).GetValue(entity, null);
  818. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  819. if (container.cache && RedisHelper.Instance != null)
  820. {
  821. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  822. {
  823. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  824. }
  825. else
  826. {
  827. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  828. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  829. }
  830. }
  831. return response.Resource;
  832. }
  833. internal class Item
  834. {
  835. public string id { get; set; }
  836. public string pk { get; set; }
  837. public MemoryStream stream { get; set; }
  838. }
  839. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  840. {
  841. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  842. //{
  843. // Task.WaitAll(Update(item));
  844. //}));
  845. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  846. CosmosModelInfo container = await InitializeCollection<T>();
  847. bool flag = false;
  848. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  849. {
  850. flag = true;
  851. }
  852. string pk = GetPartitionKey<T>();
  853. Type type = typeof(T);
  854. Stopwatch stopwatch = Stopwatch.StartNew();
  855. for (int i = 0; i < pages; i++)
  856. {
  857. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  858. List<Item> itemsToInsert = new List<Item>();
  859. lists.ForEach(async x =>
  860. {
  861. MemoryStream stream = new MemoryStream();
  862. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  863. object o = type.GetProperty(pk).GetValue(x, null);
  864. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  865. itemsToInsert.Add(keyValue);
  866. });
  867. List<Task> tasks = new List<Task>(lists.Count);
  868. itemsToInsert.ForEach(item =>
  869. {
  870. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  871. .ContinueWith((Task<ResponseMessage> task) =>
  872. {
  873. //using (ResponseMessage response = task.Result)
  874. //{
  875. // if (!response.IsSuccessStatusCode)
  876. // {
  877. // }
  878. //}
  879. }
  880. ));
  881. });
  882. await Task.WhenAll(tasks);
  883. if (container.cache && RedisHelper.Instance != null)
  884. {
  885. lists.ForEach(async x => {
  886. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  887. });
  888. }
  889. }
  890. if (container.cache && RedisHelper.Instance != null && !flag)
  891. {
  892. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  893. }
  894. stopwatch.Stop();
  895. return enyites;
  896. }
  897. //public void Dispose()
  898. //{
  899. // Dispose(true);
  900. //}
  901. //protected virtual void Dispose(bool disposing)
  902. //{
  903. // if (disposing)
  904. // {
  905. // CosmosClient?.Dispose();
  906. // }
  907. //}
  908. private async Task<T> FindByIdAsSql<T>(string id) where T : ID
  909. {
  910. CosmosModelInfo container = await InitializeCollection<T>();
  911. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  912. {
  913. QueryText = @"SELECT *
  914. FROM c
  915. WHERE c.id = @id",
  916. Parameters = new Dictionary<string, object>
  917. {
  918. { "@id",id}
  919. }
  920. };
  921. FeedIterator<T> feedIterator = container.container
  922. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  923. return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
  924. }
  925. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  926. {
  927. CosmosModelInfo container = await InitializeCollection<T>();
  928. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  929. return response.Resource;
  930. }
  931. public async Task<T> FindById<T>(string id, bool cache = true) where T : ID
  932. {
  933. CosmosModelInfo container = await InitializeCollection<T>();
  934. if (container.cache && RedisHelper.Instance != null && cache==true)
  935. {
  936. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByIdAsSql<T>(id); });
  937. }
  938. else
  939. {
  940. return await FindByIdAsSql<T>(id);
  941. }
  942. }
  943. public async Task<List<T>> FindByIds<T>(List<string> ids) where T : ID
  944. {
  945. CosmosModelInfo container = await InitializeCollection<T>();
  946. if (container.cache && RedisHelper.Instance != null)
  947. {
  948. List<T> list = new List<T>();
  949. List<string> NotIn = new List<string>();
  950. foreach (string id in ids)
  951. {
  952. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  953. {
  954. NotIn.Add(id);
  955. }
  956. else
  957. {
  958. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  959. }
  960. }
  961. if (NotIn.IsNotEmpty())
  962. {
  963. List<T> noInList = await FindByDict<T>(new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  964. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  965. list.AddRange(noInList);
  966. }
  967. return list;
  968. }
  969. else
  970. {
  971. return await FindByDict<T>(new Dictionary<string, object> { { "id", ids.ToArray() } });
  972. }
  973. }
  974. public async Task<dynamic> FindById(string CollectionName, string id)
  975. {
  976. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  977. {
  978. if (container.cache && RedisHelper.Instance != null)
  979. {
  980. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary<string, object> { { "id", id } }); });
  981. }
  982. else
  983. {
  984. return await FindByDict(CollectionName, new Dictionary<string, object> { { "id", id } });
  985. }
  986. }
  987. else
  988. {
  989. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  990. }
  991. }
  992. public async Task<List<dynamic>> FindByIds(string CollectionName, List<string> ids)
  993. {
  994. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  995. {
  996. if (container.cache && RedisHelper.Instance != null)
  997. {
  998. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary<string, object> { { "id", ids.ToArray() } }); });
  999. }
  1000. else
  1001. {
  1002. return await FindByDict(CollectionName, new Dictionary<string, object> { { "id", ids.ToArray() } });
  1003. }
  1004. }
  1005. else
  1006. {
  1007. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  1008. }
  1009. }
  1010. }
  1011. }