AzureCosmosDBV3Repository.cs 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Linq.Expressions;
  10. using System.Net;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading.Tasks;
  15. using TEAMModelOS.SDK.Context.Attributes.Azure;
  16. using TEAMModelOS.SDK.Context.Exception;
  17. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  18. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  19. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  20. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  21. {
  22. internal class CosmosModelInfo {
  23. public Container container{ get; set; }
  24. public bool cache { get; set; }
  25. }
  26. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  27. {
  28. private CosmosClient CosmosClient { get; set; }
  29. /// <summary>
  30. /// 线程安全的dict类型
  31. /// </summary>
  32. private Dictionary<string, CosmosModelInfo> DocumentCollectionDict { get; set; } = new Dictionary<string, CosmosModelInfo>();
  33. private string DatabaseId { get; set; }
  34. private int CollectionThroughput { get; set; }
  35. private Database database { get; set; }
  36. int pageSize = 200;
  37. private const string CacheCosmosPrefix = "cosmos:";
  38. private string[] ScanModel { get; set; }
  39. private const int timeoutSeconds = 86400;
  40. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  41. {
  42. try
  43. {
  44. if (!string.IsNullOrEmpty(options.ConnectionString))
  45. {
  46. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  47. }
  48. else
  49. {
  50. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  51. }
  52. DatabaseId = options.Database;
  53. CollectionThroughput = options.CollectionThroughput;
  54. ScanModel = options.ScanModel;
  55. // InitializeDatabase().GetAwaiter().GetResult();
  56. }
  57. catch (CosmosException e)
  58. {
  59. // Dispose(true);
  60. throw new BizException(e.Message, 500, e.StackTrace);
  61. }
  62. }
  63. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  64. {
  65. try
  66. {
  67. if (!string.IsNullOrEmpty(options.ConnectionString))
  68. {
  69. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  70. }
  71. else
  72. {
  73. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  74. }
  75. DatabaseId = options.Database;
  76. CollectionThroughput = options.CollectionThroughput;
  77. ScanModel = options.ScanModel;
  78. // InitializeDatabase().GetAwaiter().GetResult();
  79. }
  80. catch (CosmosException e)
  81. {
  82. // Dispose(true);
  83. throw new BizException(e.Message, 500, e.StackTrace);
  84. }
  85. }
  86. public async Task InitializeDatabase()
  87. {
  88. try
  89. {
  90. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  91. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  92. while (resultSetIterator.HasMoreResults)
  93. {
  94. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  95. {
  96. DocumentCollectionDict.TryAdd(container.Id, new CosmosModelInfo { container= database.GetContainer(container.Id) ,cache=false});
  97. }
  98. }
  99. //获取数据库所有的表
  100. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  101. foreach (Type type in types)
  102. {
  103. string PartitionKey = GetPartitionKey(type);
  104. string CollectionName = "";
  105. int RU = 0;
  106. bool cache = false;
  107. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  108. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  109. {
  110. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  111. }
  112. else
  113. {
  114. CollectionName = type.Name;
  115. }
  116. if ( attributes.First<CosmosDBAttribute>().Cache)
  117. {
  118. cache = attributes.First<CosmosDBAttribute>().Cache;
  119. }
  120. //else
  121. //{
  122. // cache = false;
  123. //}
  124. if (attributes.First<CosmosDBAttribute>().RU > 400)
  125. {
  126. RU = attributes.First<CosmosDBAttribute>().RU;
  127. }
  128. else
  129. {
  130. RU = CollectionThroughput;
  131. }
  132. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  133. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  134. { //更新RU
  135. cosmosModelInfo.cache = cache;
  136. int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReadThroughputAsync();
  137. if (throughputResponse < RU)
  138. {
  139. await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
  140. }
  141. }
  142. else
  143. {
  144. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  145. if (!string.IsNullOrEmpty(PartitionKey))
  146. {
  147. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  148. }
  149. if (RU > CollectionThroughput)
  150. {
  151. CollectionThroughput = RU;
  152. }
  153. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  154. DocumentCollectionDict.TryAdd(CollectionName, new CosmosModelInfo { container= containerWithConsistentIndexing ,cache=cache});
  155. }
  156. }
  157. }
  158. catch (CosmosException e)
  159. {
  160. throw new BizException(e.Message, 500, e.StackTrace);
  161. }
  162. }
  163. private string GetPartitionKey<T>()
  164. {
  165. Type type = typeof(T);
  166. return GetPartitionKey(type);
  167. }
  168. private string GetPartitionKey(Type type)
  169. {
  170. PropertyInfo[] properties = type.GetProperties();
  171. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  172. foreach (PropertyInfo property in properties)
  173. {
  174. if (property.Name.Equals("PartitionKey"))
  175. {
  176. attrProperties.Add(property);
  177. break;
  178. }
  179. object[] attributes = property.GetCustomAttributes(true);
  180. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  181. {
  182. if (attribute is PartitionKeyAttribute)
  183. {
  184. attrProperties.Add(property);
  185. }
  186. }
  187. }
  188. if (attrProperties.Count <= 0)
  189. {
  190. throw new BizException(type.Name + "has no PartitionKey !");
  191. }
  192. else
  193. {
  194. if (attrProperties.Count == 1)
  195. {
  196. return attrProperties[0].Name;
  197. }
  198. else { throw new BizException("PartitionKey can only be single!"); }
  199. }
  200. }
  201. private async Task<CosmosModelInfo> InitializeCollection<T>()
  202. {
  203. Type type = typeof(T);
  204. string partitionKey = GetPartitionKey<T>();
  205. string CollectionName;
  206. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  207. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  208. {
  209. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  210. }
  211. else
  212. {
  213. CollectionName = type.Name;
  214. }
  215. return await InitializeCollection(CollectionName, partitionKey);
  216. }
  217. private async Task<CosmosModelInfo> InitializeCollection(string CollectionName, string PartitionKey)
  218. {
  219. /////内存中已经存在这个表则直接返回
  220. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  221. {
  222. return cosmosModelInfo;
  223. }///如果没有则尝试默认创建
  224. else
  225. {
  226. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  227. if (!string.IsNullOrEmpty(PartitionKey))
  228. {
  229. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  230. }
  231. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  232. CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = false };
  233. DocumentCollectionDict.TryAdd(CollectionName, cosmosModel);
  234. return cosmosModel;
  235. }
  236. }
  237. public async Task<List<IdPk>> DeleteAll<T>(List<KeyValuePair<string, string>> ids) where T : ID
  238. {
  239. CosmosModelInfo container = await InitializeCollection<T>();
  240. //string partitionKey = GetPartitionKey<T>();
  241. //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  242. //{
  243. // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
  244. //}));
  245. List<IdPk> idPks = new List<IdPk>();
  246. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  247. Stopwatch stopwatch = Stopwatch.StartNew();
  248. for (int i = 0; i < pages; i++)
  249. {
  250. List<KeyValuePair<string, string>> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  251. List<Task> tasks = new List<Task>(lists.Count);
  252. lists.ForEach(item =>
  253. {
  254. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, new PartitionKey(item.Key))
  255. .ContinueWith((Task<ResponseMessage> task) =>
  256. {
  257. using (ResponseMessage response = task.Result)
  258. {
  259. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  260. // if (!response.IsSuccessStatusCode)
  261. // {
  262. // }
  263. }
  264. }
  265. ));
  266. });
  267. await Task.WhenAll(tasks);
  268. if (container.cache && RedisHelper.Instance != null)
  269. {
  270. lists.ForEach(async x => {
  271. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id,x.Value );
  272. });
  273. }
  274. }
  275. stopwatch.Stop();
  276. return idPks;
  277. }
  278. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string,object> dict) where T : ID
  279. {
  280. List<T> list= await FindByDict<T>(dict);
  281. return await DeleteAll(list);
  282. }
  283. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  284. {
  285. List<IdPk> idPks = new List<IdPk>();
  286. CosmosModelInfo container = await InitializeCollection<T>();
  287. string pk = GetPartitionKey<T>();
  288. Type type = typeof(T);
  289. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  290. Stopwatch stopwatch = Stopwatch.StartNew();
  291. for (int i = 0; i < pages; i++)
  292. {
  293. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  294. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  295. lists.ForEach(x =>
  296. {
  297. object o = type.GetProperty(pk).GetValue(x, null);
  298. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  299. itemsToInsert.Add(keyValue);
  300. });
  301. List<Task> tasks = new List<Task>(lists.Count);
  302. itemsToInsert.ForEach(item =>
  303. {
  304. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  305. .ContinueWith((Task<ResponseMessage> task) =>
  306. {
  307. using (ResponseMessage response = task.Result)
  308. {
  309. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  310. }
  311. }
  312. ));
  313. });
  314. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  315. {
  316. lists.ForEach(async x => {
  317. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  318. });
  319. }
  320. }
  321. stopwatch.Stop();
  322. return idPks;
  323. }
  324. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID {
  325. return await DeleteAsync<T>(idPk.id, idPk.pk);
  326. }
  327. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  328. {
  329. CosmosModelInfo container = await InitializeCollection<T>();
  330. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  331. if (container.cache && RedisHelper.Instance != null)
  332. {
  333. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  334. }
  335. return new IdPk { id =id, pk = pk, StatusCode = response.StatusCode };
  336. }
  337. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  338. {
  339. CosmosModelInfo container = await InitializeCollection<T>();
  340. string partitionKey = GetPartitionKey<T>();
  341. Type type = typeof(T);
  342. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  343. ResponseMessage response = await container.container.DeleteItemStreamAsync (id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  344. if (container.cache && RedisHelper.Instance != null)
  345. {
  346. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  347. }
  348. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
  349. }
  350. //public async Task<T> DeleteAsync<T>(string id) where T : ID
  351. //{
  352. // Container container = await InitializeCollection<T>();
  353. // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
  354. // return response.Resource;
  355. //}
  356. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  357. {
  358. CosmosModelInfo container = await InitializeCollection<T>();
  359. StringBuilder sql;
  360. sql = SQLHelperParametric.GetSQLSelect(propertys);
  361. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery {QueryText = sql.ToString() };
  362. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  363. return await ResultsFromFeedIterator(query);
  364. }
  365. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  366. {
  367. List<T> results = new List<T>();
  368. while (query.HasMoreResults)
  369. {
  370. foreach (T t in await query.ReadNextAsync())
  371. {
  372. results.Add(t);
  373. if (results.Count == maxItemCount)
  374. {
  375. return results;
  376. }
  377. }
  378. }
  379. return results;
  380. }
  381. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  382. {
  383. List<T> results = new List<T>();
  384. while (query.HasMoreResults)
  385. {
  386. if (results.Count() >= itemsPerPage)
  387. {
  388. await batchAction(results);
  389. results.Clear();
  390. }
  391. results.AddRange(await query.ReadNextAsync());
  392. }
  393. if (results.Count() > 0)
  394. {
  395. await batchAction(results);
  396. results.Clear();
  397. }
  398. return results;
  399. }
  400. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List<string> propertys = null)
  401. {
  402. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  403. {
  404. //StringBuilder sql = new StringBuilder("select value(c) from c");
  405. //SQLHelper.GetSQL(dict, ref sql);
  406. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  407. //{
  408. // QueryText = sql.ToString()
  409. //};
  410. StringBuilder sql;
  411. sql = SQLHelperParametric.GetSQLSelect(propertys);
  412. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  413. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  414. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  415. return await ResultsFromFeedIterator(query, maxItemCount);
  416. }
  417. else
  418. {
  419. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  420. }
  421. }
  422. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
  423. {
  424. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  425. {
  426. dict.Remove("@CURRPAGE");
  427. dict.Remove("@PAGESIZE");
  428. dict.Remove("@ASC");
  429. dict.Remove("@DESC");
  430. StringBuilder sql = new StringBuilder("select value count(c) from c");
  431. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  432. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  433. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  434. return await ResultsFromFeedIterator(query, maxItemCount);
  435. }
  436. else
  437. {
  438. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  439. }
  440. }
  441. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List<string> propertys = null) where T : ID
  442. {
  443. return await FindByDict<T>(dict, itemsPerPage, maxItemCount, partitionKey, propertys);
  444. }
  445. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null,List<string> propertys = null) where T : ID
  446. {
  447. StringBuilder sql;
  448. sql = SQLHelperParametric.GetSQLSelect(propertys);
  449. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  450. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  451. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  452. }
  453. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
  454. {
  455. CosmosModelInfo container = await InitializeCollection<T>();
  456. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  457. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  458. requestOptions: queryOptions);
  459. return await ResultsFromFeedIterator(query, maxItemCount);
  460. }
  461. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  462. {
  463. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  464. }
  465. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  466. int? maxBufferedItemCount = null,
  467. int? maxConcurrency = null)
  468. {
  469. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  470. {
  471. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  472. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  473. MaxConcurrency = maxConcurrency ?? 50
  474. };
  475. return queryRequestOptions;
  476. }
  477. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  478. {
  479. CosmosModelInfo container = await InitializeCollection<T>();
  480. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  481. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  482. requestOptions: queryOptions);
  483. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  484. }
  485. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  486. {
  487. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  488. {
  489. MaxItemCount = itemsPerPage
  490. };
  491. return queryRequestOptions;
  492. }
  493. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  494. {
  495. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  496. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  497. FeedIterator<T> feedIterator;
  498. CosmosModelInfo container = await InitializeCollection<T>();
  499. if (query == null)
  500. {
  501. if (order != null)
  502. {
  503. if (isDesc)
  504. {
  505. feedIterator = container.container
  506. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  507. .ToFeedIterator();
  508. }
  509. else
  510. {
  511. feedIterator = container.container
  512. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  513. .ToFeedIterator();
  514. }
  515. }
  516. else
  517. {
  518. feedIterator = container.container
  519. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  520. .ToFeedIterator();
  521. }
  522. }
  523. else
  524. {
  525. if (order != null)
  526. {
  527. if (isDesc)
  528. {
  529. feedIterator = container.container
  530. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  531. .Where(query).OrderByDescending(order)
  532. .ToFeedIterator();
  533. }
  534. else
  535. {
  536. feedIterator = container.container
  537. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  538. .Where(query).OrderBy(order)
  539. .ToFeedIterator();
  540. }
  541. }
  542. else
  543. {
  544. feedIterator = container.container
  545. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  546. .Where(query)
  547. .ToFeedIterator();
  548. }
  549. }
  550. return await ResultsFromFeedIterator<T>(feedIterator);
  551. }
  552. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  553. {
  554. CosmosModelInfo container = await InitializeCollection<T>();
  555. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
  556. if (Parameters != null)
  557. {
  558. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  559. {
  560. QueryText = sql,
  561. Parameters = Parameters
  562. };
  563. FeedIterator<T> feedIterator = container.container
  564. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  565. return await ResultsFromFeedIterator(feedIterator);
  566. }
  567. else
  568. {
  569. QueryDefinition queryDefinition = new QueryDefinition(sql);
  570. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  571. }
  572. }
  573. public async Task<T> Save<T>(T entity) where T : ID
  574. {
  575. try
  576. {
  577. CosmosModelInfo container = await InitializeCollection<T>();
  578. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  579. if (container.cache && RedisHelper.Instance!=null) {
  580. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  581. {
  582. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  583. }
  584. else {
  585. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  586. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  587. }
  588. }
  589. return response.Resource;
  590. }
  591. catch (Exception e)
  592. {
  593. throw new BizException(e.Message);
  594. }
  595. }
  596. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  597. {
  598. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  599. CosmosModelInfo container = await InitializeCollection<T>();
  600. bool flag = false;
  601. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  602. {
  603. flag = true;
  604. }
  605. string pk = GetPartitionKey<T>();
  606. Type type = typeof(T);
  607. Stopwatch stopwatch = Stopwatch.StartNew();
  608. for (int i = 0; i < pages; i++)
  609. {
  610. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  611. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  612. lists.ForEach(async x =>
  613. {
  614. MemoryStream stream = new MemoryStream();
  615. await JsonSerializer.SerializeAsync(stream, x ,new JsonSerializerOptions { IgnoreNullValues=true});
  616. object o = type.GetProperty(pk).GetValue(x, null);
  617. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  618. itemsToInsert.Add(keyValue);
  619. });
  620. List<Task> tasks = new List<Task>(lists.Count);
  621. itemsToInsert.ForEach(item =>
  622. {
  623. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  624. .ContinueWith((Task<ResponseMessage> task) =>
  625. {
  626. using (ResponseMessage response = task.Result)
  627. {
  628. if (!response.IsSuccessStatusCode)
  629. {
  630. }
  631. }
  632. }
  633. ));
  634. });
  635. await Task.WhenAll(tasks);
  636. if (container.cache && RedisHelper.Instance != null)
  637. {
  638. lists.ForEach(async x => {
  639. await RedisHelper.HSetAsync(CacheCosmosPrefix+container.container.Id, x.id, x);
  640. });
  641. }
  642. }
  643. if (container.cache && RedisHelper.Instance != null&&!flag) {
  644. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  645. }
  646. stopwatch.Stop();
  647. return enyites;
  648. }
  649. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  650. {
  651. CosmosModelInfo container = await InitializeCollection<T>();
  652. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  653. if (container.cache && RedisHelper.Instance != null)
  654. {
  655. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  656. {
  657. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  658. }
  659. else
  660. {
  661. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  662. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  663. }
  664. }
  665. return response.Resource;
  666. }
  667. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  668. {
  669. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  670. //{
  671. // Task.WaitAll(Update(item));
  672. //}));
  673. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  674. CosmosModelInfo container = await InitializeCollection<T>();
  675. bool flag = false;
  676. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  677. {
  678. flag = true;
  679. }
  680. string pk = GetPartitionKey<T>();
  681. Type type = typeof(T);
  682. Stopwatch stopwatch = Stopwatch.StartNew();
  683. for (int i = 0; i < pages; i++)
  684. {
  685. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  686. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  687. lists.ForEach(async x =>
  688. {
  689. MemoryStream stream = new MemoryStream();
  690. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  691. object o = type.GetProperty(pk).GetValue(x, null);
  692. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  693. itemsToInsert.Add(keyValue);
  694. });
  695. List<Task> tasks = new List<Task>(lists.Count);
  696. itemsToInsert.ForEach(item =>
  697. {
  698. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  699. .ContinueWith((Task<ResponseMessage> task) =>
  700. {
  701. //using (ResponseMessage response = task.Result)
  702. //{
  703. // if (!response.IsSuccessStatusCode)
  704. // {
  705. // }
  706. //}
  707. }
  708. ));
  709. });
  710. await Task.WhenAll(tasks);
  711. if (container.cache && RedisHelper.Instance != null)
  712. {
  713. lists.ForEach(async x => {
  714. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  715. });
  716. }
  717. }
  718. if (container.cache && RedisHelper.Instance != null&&!flag)
  719. {
  720. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  721. }
  722. stopwatch.Stop();
  723. return enyites;
  724. }
  725. public async Task<T> Update<T>(T entity) where T : ID
  726. {
  727. CosmosModelInfo container = await InitializeCollection<T>();
  728. string pk = GetPartitionKey<T>();
  729. object o = typeof(T).GetProperty(pk).GetValue(entity, null);
  730. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  731. if (container.cache && RedisHelper.Instance != null)
  732. {
  733. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  734. {
  735. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  736. }
  737. else
  738. {
  739. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  740. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  741. }
  742. }
  743. return response.Resource;
  744. }
  745. internal class Item
  746. {
  747. public string id { get; set; }
  748. public string pk { get; set; }
  749. public MemoryStream stream { get; set; }
  750. }
  751. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  752. {
  753. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  754. //{
  755. // Task.WaitAll(Update(item));
  756. //}));
  757. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  758. CosmosModelInfo container = await InitializeCollection<T>();
  759. bool flag = false;
  760. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  761. {
  762. flag = true;
  763. }
  764. string pk = GetPartitionKey<T>();
  765. Type type = typeof(T);
  766. Stopwatch stopwatch = Stopwatch.StartNew();
  767. for (int i = 0; i < pages; i++)
  768. {
  769. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  770. List<Item> itemsToInsert = new List<Item>();
  771. lists.ForEach(async x =>
  772. {
  773. MemoryStream stream = new MemoryStream();
  774. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  775. object o = type.GetProperty(pk).GetValue(x, null);
  776. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  777. itemsToInsert.Add(keyValue);
  778. });
  779. List<Task> tasks = new List<Task>(lists.Count);
  780. itemsToInsert.ForEach(item =>
  781. {
  782. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  783. .ContinueWith((Task<ResponseMessage> task) =>
  784. {
  785. //using (ResponseMessage response = task.Result)
  786. //{
  787. // if (!response.IsSuccessStatusCode)
  788. // {
  789. // }
  790. //}
  791. }
  792. ));
  793. });
  794. await Task.WhenAll(tasks);
  795. if (container.cache && RedisHelper.Instance != null)
  796. {
  797. lists.ForEach(async x => {
  798. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  799. });
  800. }
  801. }
  802. if (container.cache && RedisHelper.Instance != null&&!flag)
  803. {
  804. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  805. }
  806. stopwatch.Stop();
  807. return enyites;
  808. }
  809. //public void Dispose()
  810. //{
  811. // Dispose(true);
  812. //}
  813. //protected virtual void Dispose(bool disposing)
  814. //{
  815. // if (disposing)
  816. // {
  817. // CosmosClient?.Dispose();
  818. // }
  819. //}
  820. private async Task<T> FindByIdAsSql<T>(string id) where T : ID
  821. {
  822. CosmosModelInfo container = await InitializeCollection<T>();
  823. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  824. {
  825. QueryText = @"SELECT *
  826. FROM c
  827. WHERE c.id = @id",
  828. Parameters = new Dictionary<string, object>
  829. {
  830. { "@id",id}
  831. }
  832. };
  833. FeedIterator<T> feedIterator = container.container
  834. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  835. return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
  836. }
  837. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  838. {
  839. CosmosModelInfo container = await InitializeCollection<T>();
  840. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  841. return response.Resource;
  842. }
  843. public async Task<T> FindById<T>(string id) where T : ID
  844. {
  845. CosmosModelInfo container = await InitializeCollection<T>();
  846. if (container.cache && RedisHelper.Instance != null)
  847. {
  848. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id , id , timeoutSeconds, () => { return FindByIdAsSql<T>(id); }) ;
  849. }
  850. else {
  851. return await FindByIdAsSql<T>(id);
  852. }
  853. }
  854. public async Task<List<T>> FindByIds<T>(List<string> ids) where T : ID
  855. {
  856. CosmosModelInfo container = await InitializeCollection<T>();
  857. if (container.cache && RedisHelper.Instance != null)
  858. {
  859. List<T> list = new List<T>();
  860. List<string> NotIn = new List<string>();
  861. foreach (string id in ids) {
  862. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  863. {
  864. NotIn.Add(id);
  865. }
  866. else {
  867. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  868. }
  869. }
  870. if (NotIn.IsNotEmpty()) {
  871. List<T> noInList = await FindByDict<T>(new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  872. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  873. list.AddRange(noInList);
  874. }
  875. return list;
  876. }
  877. else
  878. {
  879. return await FindByDict<T>(new Dictionary<string, object> { { "id", ids.ToArray() } });
  880. }
  881. }
  882. public async Task<dynamic> FindById(string CollectionName, string id)
  883. {
  884. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  885. {
  886. if (container.cache && RedisHelper.Instance != null)
  887. {
  888. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id,id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary<string, object> { { "id", id } }); });
  889. }
  890. else
  891. {
  892. return await FindByDict(CollectionName, new Dictionary<string, object> { { "id", id } });
  893. }
  894. }
  895. else
  896. {
  897. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  898. }
  899. }
  900. public async Task<List<dynamic>> FindByIds(string CollectionName, List<string> ids)
  901. {
  902. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  903. {
  904. if (container.cache && RedisHelper.Instance != null)
  905. {
  906. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary<string, object> { { "id", ids.ToArray() } }); });
  907. }
  908. else
  909. {
  910. return await FindByDict(CollectionName, new Dictionary<string, object> { { "id", ids.ToArray() } });
  911. }
  912. }
  913. else
  914. {
  915. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  916. }
  917. }
  918. }
  919. }