AzureCosmosDBV3Repository.cs 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Linq.Expressions;
  10. using System.Net;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. using TEAMModelOS.SDK.Context.Attributes.Azure;
  17. using TEAMModelOS.SDK.Context.Exception;
  18. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  19. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  20. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  21. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  22. {
  23. public class CosmosModelInfo {
  24. public Container container{ get; set; }
  25. public bool cache { get; set; }
  26. public bool monitor { get; set; } = false;
  27. public Type type { get; set; }
  28. }
  29. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  30. {
  31. private CosmosClient CosmosClient { get; set; }
  32. /// <summary>
  33. /// 线程安全的dict类型
  34. /// </summary>
  35. private Dictionary<string, CosmosModelInfo> DocumentCollectionDict { get; set; } = new Dictionary<string, CosmosModelInfo>();
  36. private string DatabaseId { get; set; }
  37. private int CollectionThroughput { get; set; }
  38. private Database database { get; set; }
  39. int pageSize = 200;
  40. private const string CacheCosmosPrefix = "cosmos:";
  41. private string[] ScanModel { get; set; }
  42. private const int timeoutSeconds = 86400;
  43. private string leaseId = "leaseContainer";
  44. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  45. {
  46. try
  47. {
  48. if (!string.IsNullOrEmpty(options.ConnectionString))
  49. {
  50. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  51. }
  52. else
  53. {
  54. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  55. }
  56. DatabaseId = options.Database;
  57. CollectionThroughput = options.CollectionThroughput;
  58. ScanModel = options.ScanModel;
  59. // InitializeDatabase().GetAwaiter().GetResult();
  60. }
  61. catch (CosmosException e)
  62. {
  63. // Dispose(true);
  64. throw new BizException(e.Message, 500, e.StackTrace);
  65. }
  66. }
  67. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  68. {
  69. try
  70. {
  71. if (!string.IsNullOrEmpty(options.ConnectionString))
  72. {
  73. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  74. }
  75. else
  76. {
  77. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  78. }
  79. DatabaseId = options.Database;
  80. CollectionThroughput = options.CollectionThroughput;
  81. ScanModel = options.ScanModel;
  82. // InitializeDatabase().GetAwaiter().GetResult();
  83. }
  84. catch (CosmosException e)
  85. {
  86. // Dispose(true);
  87. throw new BizException(e.Message, 500, e.StackTrace);
  88. }
  89. }
  90. public async Task InitializeDatabase()
  91. {
  92. try
  93. {
  94. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  95. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  96. while (resultSetIterator.HasMoreResults)
  97. {
  98. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  99. {
  100. DocumentCollectionDict.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), cache = false,monitor=false });
  101. }
  102. }
  103. bool isMonitor = false;
  104. //获取数据库所有的表
  105. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  106. foreach (Type type in types)
  107. {
  108. string PartitionKey = GetPartitionKey(type);
  109. string CollectionName = "";
  110. int RU = 0;
  111. bool cache = false;
  112. bool monitor = false;
  113. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  114. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  115. {
  116. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  117. }
  118. else
  119. {
  120. CollectionName = type.Name;
  121. }
  122. if ( attributes.First<CosmosDBAttribute>().Cache)
  123. {
  124. cache = attributes.First<CosmosDBAttribute>().Cache;
  125. }
  126. if (attributes.First<CosmosDBAttribute>().Monitor)
  127. {
  128. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  129. if (monitor) {
  130. isMonitor = true;
  131. }
  132. }
  133. //else
  134. //{
  135. // cache = false;
  136. //}
  137. if (attributes.First<CosmosDBAttribute>().RU > 400)
  138. {
  139. RU = attributes.First<CosmosDBAttribute>().RU;
  140. }
  141. else
  142. {
  143. RU = CollectionThroughput;
  144. }
  145. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  146. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  147. { //更新RU
  148. cosmosModelInfo.cache = cache;
  149. Container container = CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id);
  150. int? throughputResponse = await container.ReadThroughputAsync();
  151. if (throughputResponse < RU)
  152. {
  153. await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
  154. }
  155. DocumentCollectionDict[CollectionName] = new CosmosModelInfo { container = container, cache = cache,monitor= monitor ,type= type } ;
  156. }
  157. else
  158. {
  159. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  160. if (!string.IsNullOrEmpty(PartitionKey))
  161. {
  162. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  163. }
  164. if (RU > CollectionThroughput)
  165. {
  166. CollectionThroughput = RU;
  167. }
  168. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  169. DocumentCollectionDict.TryAdd(CollectionName, new CosmosModelInfo { container= containerWithConsistentIndexing ,cache=cache , monitor = monitor ,type=type });
  170. }
  171. }
  172. if (isMonitor) {
  173. ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id" };
  174. Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
  175. DocumentCollectionDict.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
  176. }
  177. }
  178. catch (CosmosException e)
  179. {
  180. throw new BizException(e.Message, 500, e.StackTrace);
  181. }
  182. }
  183. public Dictionary<string, CosmosModelInfo> GetCosmosModelInfo() {
  184. return this.DocumentCollectionDict;
  185. }
  186. private string GetPartitionKey<T>()
  187. {
  188. Type type = typeof(T);
  189. return GetPartitionKey(type);
  190. }
  191. private string GetPartitionKey(Type type)
  192. {
  193. PropertyInfo[] properties = type.GetProperties();
  194. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  195. foreach (PropertyInfo property in properties)
  196. {
  197. if (property.Name.Equals("PartitionKey"))
  198. {
  199. attrProperties.Add(property);
  200. break;
  201. }
  202. object[] attributes = property.GetCustomAttributes(true);
  203. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  204. {
  205. if (attribute is PartitionKeyAttribute)
  206. {
  207. attrProperties.Add(property);
  208. }
  209. }
  210. }
  211. if (attrProperties.Count <= 0)
  212. {
  213. throw new BizException(type.Name + "has no PartitionKey !");
  214. }
  215. else
  216. {
  217. if (attrProperties.Count == 1)
  218. {
  219. return attrProperties[0].Name;
  220. }
  221. else { throw new BizException("PartitionKey can only be single!"); }
  222. }
  223. }
  224. private async Task<CosmosModelInfo> InitializeCollection<T>()
  225. {
  226. Type type = typeof(T);
  227. string partitionKey = GetPartitionKey<T>();
  228. string CollectionName;
  229. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  230. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  231. {
  232. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  233. }
  234. else
  235. {
  236. CollectionName = type.Name;
  237. }
  238. return await InitializeCollection(CollectionName, partitionKey);
  239. }
  240. private async Task<CosmosModelInfo> InitializeCollection(string CollectionName, string PartitionKey)
  241. {
  242. /////内存中已经存在这个表则直接返回
  243. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  244. {
  245. return cosmosModelInfo;
  246. }///如果没有则尝试默认创建
  247. else
  248. {
  249. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  250. if (!string.IsNullOrEmpty(PartitionKey))
  251. {
  252. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  253. }
  254. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  255. CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = false };
  256. DocumentCollectionDict.TryAdd(CollectionName, cosmosModel);
  257. return cosmosModel;
  258. }
  259. }
  260. public async Task<List<IdPk>> DeleteAll<T>(List<IdPk> ids) where T : ID
  261. {
  262. CosmosModelInfo container = await InitializeCollection<T>();
  263. //string partitionKey = GetPartitionKey<T>();
  264. //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  265. //{
  266. // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
  267. //}));
  268. List<IdPk> idPks = new List<IdPk>();
  269. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  270. Stopwatch stopwatch = Stopwatch.StartNew();
  271. for (int i = 0; i < pages; i++)
  272. {
  273. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  274. List<Task> tasks = new List<Task>(lists.Count);
  275. lists.ForEach(item =>
  276. {
  277. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  278. .ContinueWith((Task<ResponseMessage> task) =>
  279. {
  280. using (ResponseMessage response = task.Result)
  281. {
  282. idPks.Add(new IdPk { id = item.id, pk = item.pk.ToString(), StatusCode = response.StatusCode });
  283. // if (!response.IsSuccessStatusCode)
  284. // {
  285. // }
  286. }
  287. }
  288. ));
  289. });
  290. await Task.WhenAll(tasks);
  291. if (container.cache && RedisHelper.Instance != null)
  292. {
  293. lists.ForEach(async x => {
  294. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id,x.id );
  295. });
  296. }
  297. }
  298. stopwatch.Stop();
  299. return idPks;
  300. }
  301. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string,object> dict) where T : ID
  302. {
  303. List<T> list= await FindByDict<T>(dict);
  304. return await DeleteAll(list);
  305. }
  306. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  307. {
  308. List<IdPk> idPks = new List<IdPk>();
  309. CosmosModelInfo container = await InitializeCollection<T>();
  310. string pk = GetPartitionKey<T>();
  311. Type type = typeof(T);
  312. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  313. Stopwatch stopwatch = Stopwatch.StartNew();
  314. for (int i = 0; i < pages; i++)
  315. {
  316. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  317. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  318. lists.ForEach(x =>
  319. {
  320. object o = type.GetProperty(pk).GetValue(x, null);
  321. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  322. itemsToInsert.Add(keyValue);
  323. });
  324. List<Task> tasks = new List<Task>(lists.Count);
  325. itemsToInsert.ForEach(item =>
  326. {
  327. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  328. .ContinueWith((Task<ResponseMessage> task) =>
  329. {
  330. using (ResponseMessage response = task.Result)
  331. {
  332. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  333. }
  334. }
  335. ));
  336. });
  337. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  338. {
  339. lists.ForEach(async x => {
  340. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  341. });
  342. }
  343. }
  344. stopwatch.Stop();
  345. return idPks;
  346. }
  347. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID {
  348. return await DeleteAsync<T>(idPk.id, idPk.pk);
  349. }
  350. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  351. {
  352. CosmosModelInfo container = await InitializeCollection<T>();
  353. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  354. if (container.cache && RedisHelper.Instance != null)
  355. {
  356. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  357. }
  358. return new IdPk { id =id, pk = pk, StatusCode = response.StatusCode };
  359. }
  360. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  361. {
  362. CosmosModelInfo container = await InitializeCollection<T>();
  363. string partitionKey = GetPartitionKey<T>();
  364. Type type = typeof(T);
  365. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  366. ResponseMessage response = await container.container.DeleteItemStreamAsync (id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  367. if (container.cache && RedisHelper.Instance != null)
  368. {
  369. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  370. }
  371. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
  372. }
  373. //public async Task<T> DeleteAsync<T>(string id) where T : ID
  374. //{
  375. // Container container = await InitializeCollection<T>();
  376. // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
  377. // return response.Resource;
  378. //}
  379. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  380. {
  381. CosmosModelInfo container = await InitializeCollection<T>();
  382. StringBuilder sql;
  383. sql = SQLHelperParametric.GetSQLSelect(propertys);
  384. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery {QueryText = sql.ToString() };
  385. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  386. return await ResultsFromFeedIterator(query);
  387. }
  388. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  389. {
  390. List<T> results = new List<T>();
  391. while (query.HasMoreResults)
  392. {
  393. foreach (T t in await query.ReadNextAsync())
  394. {
  395. results.Add(t);
  396. if (results.Count == maxItemCount)
  397. {
  398. return results;
  399. }
  400. }
  401. }
  402. return results;
  403. }
  404. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  405. {
  406. List<T> results = new List<T>();
  407. while (query.HasMoreResults)
  408. {
  409. if (results.Count() >= itemsPerPage)
  410. {
  411. await batchAction(results);
  412. results.Clear();
  413. }
  414. results.AddRange(await query.ReadNextAsync());
  415. }
  416. if (results.Count() > 0)
  417. {
  418. await batchAction(results);
  419. results.Clear();
  420. }
  421. return results;
  422. }
  423. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, string partitionKey = null, List<string> propertys = null)
  424. {
  425. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  426. {
  427. //StringBuilder sql = new StringBuilder("select value(c) from c");
  428. //SQLHelper.GetSQL(dict, ref sql);
  429. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery, int itemsPerPage = -1, int?
  430. //{
  431. // QueryText = sql.ToString()
  432. //};
  433. StringBuilder sql;
  434. sql = SQLHelperParametric.GetSQLSelect(propertys);
  435. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  436. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1,null));
  437. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  438. return await ResultsFromFeedIterator(query);
  439. }
  440. else
  441. {
  442. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  443. }
  444. }
  445. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, string partitionKey = null)
  446. {
  447. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  448. {
  449. dict.Remove("@CURRPAGE");
  450. dict.Remove("@PAGESIZE");
  451. dict.Remove("@ASC");
  452. dict.Remove("@DESC");
  453. StringBuilder sql = new StringBuilder("select value count(c) from c");
  454. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  455. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1,null));
  456. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  457. return await ResultsFromFeedIterator(query);
  458. }
  459. else
  460. {
  461. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  462. }
  463. }
  464. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, string partitionKey = null, List<string> propertys = null) where T : ID
  465. {
  466. return await FindByDict<T>(dict,partitionKey, propertys);
  467. }
  468. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, string partitionKey = null,List<string> propertys = null) where T : ID
  469. {
  470. StringBuilder sql;
  471. sql = SQLHelperParametric.GetSQLSelect(propertys);
  472. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  473. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  474. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  475. }
  476. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  477. {
  478. CosmosModelInfo container = await InitializeCollection<T>();
  479. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  480. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  481. requestOptions: queryOptions);
  482. return await ResultsFromFeedIterator(query);
  483. }
  484. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  485. {
  486. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  487. }
  488. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  489. int? maxBufferedItemCount = null,
  490. int? maxConcurrency = null)
  491. {
  492. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  493. {
  494. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  495. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  496. MaxConcurrency = maxConcurrency ?? 50
  497. };
  498. return queryRequestOptions;
  499. }
  500. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  501. {
  502. CosmosModelInfo container = await InitializeCollection<T>();
  503. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  504. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  505. requestOptions: queryOptions);
  506. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  507. }
  508. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  509. {
  510. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  511. {
  512. MaxItemCount = itemsPerPage
  513. };
  514. return queryRequestOptions;
  515. }
  516. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false) where T : ID
  517. {
  518. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  519. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1,null));
  520. FeedIterator<T> feedIterator;
  521. CosmosModelInfo container = await InitializeCollection<T>();
  522. if (query == null)
  523. {
  524. if (order != null)
  525. {
  526. if (isDesc)
  527. {
  528. feedIterator = container.container
  529. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  530. .ToFeedIterator();
  531. }
  532. else
  533. {
  534. feedIterator = container.container
  535. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  536. .ToFeedIterator();
  537. }
  538. }
  539. else
  540. {
  541. feedIterator = container.container
  542. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  543. .ToFeedIterator();
  544. }
  545. }
  546. else
  547. {
  548. if (order != null)
  549. {
  550. if (isDesc)
  551. {
  552. feedIterator = container.container
  553. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  554. .Where(query).OrderByDescending(order)
  555. .ToFeedIterator();
  556. }
  557. else
  558. {
  559. feedIterator = container.container
  560. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  561. .Where(query).OrderBy(order)
  562. .ToFeedIterator();
  563. }
  564. }
  565. else
  566. {
  567. feedIterator = container.container
  568. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  569. .Where(query)
  570. .ToFeedIterator();
  571. }
  572. }
  573. return await ResultsFromFeedIterator<T>(feedIterator);
  574. }
  575. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null) where T : ID
  576. {
  577. CosmosModelInfo container = await InitializeCollection<T>();
  578. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  579. if (Parameters != null)
  580. {
  581. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  582. {
  583. QueryText = sql,
  584. Parameters = Parameters
  585. };
  586. FeedIterator<T> feedIterator = container.container
  587. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  588. return await ResultsFromFeedIterator(feedIterator);
  589. }
  590. else
  591. {
  592. QueryDefinition queryDefinition = new QueryDefinition(sql);
  593. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  594. }
  595. }
  596. public async Task<T> Save<T>(T entity) where T : ID
  597. {
  598. try
  599. {
  600. CosmosModelInfo container = await InitializeCollection<T>();
  601. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  602. if (container.cache && RedisHelper.Instance!=null) {
  603. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  604. {
  605. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  606. }
  607. else {
  608. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  609. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  610. }
  611. }
  612. return response.Resource;
  613. }
  614. catch (Exception e)
  615. {
  616. throw new BizException(e.Message);
  617. }
  618. }
  619. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  620. {
  621. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  622. CosmosModelInfo container = await InitializeCollection<T>();
  623. bool flag = false;
  624. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  625. {
  626. flag = true;
  627. }
  628. string pk = GetPartitionKey<T>();
  629. Type type = typeof(T);
  630. Stopwatch stopwatch = Stopwatch.StartNew();
  631. for (int i = 0; i < pages; i++)
  632. {
  633. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  634. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  635. lists.ForEach(async x =>
  636. {
  637. MemoryStream stream = new MemoryStream();
  638. await JsonSerializer.SerializeAsync(stream, x ,new JsonSerializerOptions { IgnoreNullValues=true});
  639. object o = type.GetProperty(pk).GetValue(x, null);
  640. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  641. itemsToInsert.Add(keyValue);
  642. });
  643. List<Task> tasks = new List<Task>(lists.Count);
  644. itemsToInsert.ForEach(item =>
  645. {
  646. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  647. .ContinueWith((Task<ResponseMessage> task) =>
  648. {
  649. using (ResponseMessage response = task.Result)
  650. {
  651. if (!response.IsSuccessStatusCode)
  652. {
  653. }
  654. }
  655. }
  656. ));
  657. });
  658. await Task.WhenAll(tasks);
  659. if (container.cache && RedisHelper.Instance != null)
  660. {
  661. lists.ForEach(async x => {
  662. await RedisHelper.HSetAsync(CacheCosmosPrefix+container.container.Id, x.id, x);
  663. });
  664. }
  665. }
  666. if (container.cache && RedisHelper.Instance != null&&!flag) {
  667. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  668. }
  669. stopwatch.Stop();
  670. return enyites;
  671. }
  672. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  673. {
  674. CosmosModelInfo container = await InitializeCollection<T>();
  675. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  676. if (container.cache && RedisHelper.Instance != null)
  677. {
  678. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  679. {
  680. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  681. }
  682. else
  683. {
  684. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  685. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  686. }
  687. }
  688. return response.Resource;
  689. }
  690. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  691. {
  692. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  693. //{
  694. // Task.WaitAll(Update(item));
  695. //}));
  696. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  697. CosmosModelInfo container = await InitializeCollection<T>();
  698. bool flag = false;
  699. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  700. {
  701. flag = true;
  702. }
  703. string pk = GetPartitionKey<T>();
  704. Type type = typeof(T);
  705. Stopwatch stopwatch = Stopwatch.StartNew();
  706. for (int i = 0; i < pages; i++)
  707. {
  708. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  709. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  710. lists.ForEach(async x =>
  711. {
  712. MemoryStream stream = new MemoryStream();
  713. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  714. object o = type.GetProperty(pk).GetValue(x, null);
  715. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  716. itemsToInsert.Add(keyValue);
  717. });
  718. List<Task> tasks = new List<Task>(lists.Count);
  719. itemsToInsert.ForEach(item =>
  720. {
  721. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  722. .ContinueWith((Task<ResponseMessage> task) =>
  723. {
  724. //using (ResponseMessage response = task.Result)
  725. //{
  726. // if (!response.IsSuccessStatusCode)
  727. // {
  728. // }
  729. //}
  730. }
  731. ));
  732. });
  733. await Task.WhenAll(tasks);
  734. if (container.cache && RedisHelper.Instance != null)
  735. {
  736. lists.ForEach(async x => {
  737. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  738. });
  739. }
  740. }
  741. if (container.cache && RedisHelper.Instance != null&&!flag)
  742. {
  743. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  744. }
  745. stopwatch.Stop();
  746. return enyites;
  747. }
  748. public async Task<T> Update<T>(T entity) where T : ID
  749. {
  750. CosmosModelInfo container = await InitializeCollection<T>();
  751. string pk = GetPartitionKey<T>();
  752. object o = typeof(T).GetProperty(pk).GetValue(entity, null);
  753. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  754. if (container.cache && RedisHelper.Instance != null)
  755. {
  756. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  757. {
  758. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  759. }
  760. else
  761. {
  762. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  763. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  764. }
  765. }
  766. return response.Resource;
  767. }
  768. internal class Item
  769. {
  770. public string id { get; set; }
  771. public string pk { get; set; }
  772. public MemoryStream stream { get; set; }
  773. }
  774. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  775. {
  776. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  777. //{
  778. // Task.WaitAll(Update(item));
  779. //}));
  780. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  781. CosmosModelInfo container = await InitializeCollection<T>();
  782. bool flag = false;
  783. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  784. {
  785. flag = true;
  786. }
  787. string pk = GetPartitionKey<T>();
  788. Type type = typeof(T);
  789. Stopwatch stopwatch = Stopwatch.StartNew();
  790. for (int i = 0; i < pages; i++)
  791. {
  792. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  793. List<Item> itemsToInsert = new List<Item>();
  794. lists.ForEach(async x =>
  795. {
  796. MemoryStream stream = new MemoryStream();
  797. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  798. object o = type.GetProperty(pk).GetValue(x, null);
  799. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  800. itemsToInsert.Add(keyValue);
  801. });
  802. List<Task> tasks = new List<Task>(lists.Count);
  803. itemsToInsert.ForEach(item =>
  804. {
  805. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  806. .ContinueWith((Task<ResponseMessage> task) =>
  807. {
  808. //using (ResponseMessage response = task.Result)
  809. //{
  810. // if (!response.IsSuccessStatusCode)
  811. // {
  812. // }
  813. //}
  814. }
  815. ));
  816. });
  817. await Task.WhenAll(tasks);
  818. if (container.cache && RedisHelper.Instance != null)
  819. {
  820. lists.ForEach(async x => {
  821. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  822. });
  823. }
  824. }
  825. if (container.cache && RedisHelper.Instance != null&&!flag)
  826. {
  827. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  828. }
  829. stopwatch.Stop();
  830. return enyites;
  831. }
  832. //public void Dispose()
  833. //{
  834. // Dispose(true);
  835. //}
  836. //protected virtual void Dispose(bool disposing)
  837. //{
  838. // if (disposing)
  839. // {
  840. // CosmosClient?.Dispose();
  841. // }
  842. //}
  843. private async Task<T> FindByIdAsSql<T>(string id) where T : ID
  844. {
  845. CosmosModelInfo container = await InitializeCollection<T>();
  846. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  847. {
  848. QueryText = @"SELECT *
  849. FROM c
  850. WHERE c.id = @id",
  851. Parameters = new Dictionary<string, object>
  852. {
  853. { "@id",id}
  854. }
  855. };
  856. FeedIterator<T> feedIterator = container.container
  857. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  858. return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
  859. }
  860. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  861. {
  862. CosmosModelInfo container = await InitializeCollection<T>();
  863. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  864. return response.Resource;
  865. }
  866. public async Task<T> FindById<T>(string id) where T : ID
  867. {
  868. CosmosModelInfo container = await InitializeCollection<T>();
  869. if (container.cache && RedisHelper.Instance != null)
  870. {
  871. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id , id , timeoutSeconds, () => { return FindByIdAsSql<T>(id); }) ;
  872. }
  873. else {
  874. return await FindByIdAsSql<T>(id);
  875. }
  876. }
  877. public async Task<List<T>> FindByIds<T>(List<string> ids) where T : ID
  878. {
  879. CosmosModelInfo container = await InitializeCollection<T>();
  880. if (container.cache && RedisHelper.Instance != null)
  881. {
  882. List<T> list = new List<T>();
  883. List<string> NotIn = new List<string>();
  884. foreach (string id in ids) {
  885. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  886. {
  887. NotIn.Add(id);
  888. }
  889. else {
  890. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  891. }
  892. }
  893. if (NotIn.IsNotEmpty()) {
  894. List<T> noInList = await FindByDict<T>(new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  895. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  896. list.AddRange(noInList);
  897. }
  898. return list;
  899. }
  900. else
  901. {
  902. return await FindByDict<T>(new Dictionary<string, object> { { "id", ids.ToArray() } });
  903. }
  904. }
  905. public async Task<dynamic> FindById(string CollectionName, string id)
  906. {
  907. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  908. {
  909. if (container.cache && RedisHelper.Instance != null)
  910. {
  911. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id,id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary<string, object> { { "id", id } }); });
  912. }
  913. else
  914. {
  915. return await FindByDict(CollectionName, new Dictionary<string, object> { { "id", id } });
  916. }
  917. }
  918. else
  919. {
  920. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  921. }
  922. }
  923. public async Task<List<dynamic>> FindByIds(string CollectionName, List<string> ids)
  924. {
  925. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  926. {
  927. if (container.cache && RedisHelper.Instance != null)
  928. {
  929. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary<string, object> { { "id", ids.ToArray() } }); });
  930. }
  931. else
  932. {
  933. return await FindByDict(CollectionName, new Dictionary<string, object> { { "id", ids.ToArray() } });
  934. }
  935. }
  936. else
  937. {
  938. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  939. }
  940. }
  941. }
  942. }