AzureCosmosDBV3Repository.cs 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Linq.Expressions;
  10. using System.Net;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. using TEAMModelOS.SDK.Context.Attributes.Azure;
  17. using TEAMModelOS.SDK.Context.Exception;
  18. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  19. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  20. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  21. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  22. {
  23. public class CosmosModelInfo
  24. {
  25. public Container container { get; set; }
  26. public bool cache { get; set; }
  27. public bool monitor { get; set; } = false;
  28. public Type type { get; set; }
  29. }
  30. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  31. {
  32. private CosmosClient CosmosClient { get; set; }
  33. /// <summary>
  34. /// 线程安全的dict类型
  35. /// </summary>
  36. private Dictionary<string, CosmosModelInfo> DocumentCollectionDict { get; set; } = new Dictionary<string, CosmosModelInfo>();
  37. private string DatabaseId { get; set; }
  38. private int CollectionThroughput { get; set; }
  39. private Database database { get; set; }
  40. int pageSize = 200;
  41. private const string CacheCosmosPrefix = "cosmos:";
  42. private string[] ScanModel { get; set; }
  43. private const int timeoutSeconds = 86400;
  44. private string leaseId = "AleaseContainer";
  45. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  46. {
  47. try
  48. {
  49. if (!string.IsNullOrEmpty(options.ConnectionString))
  50. {
  51. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  52. }
  53. else
  54. {
  55. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  56. }
  57. DatabaseId = options.Database;
  58. CollectionThroughput = options.CollectionThroughput;
  59. ScanModel = options.ScanModel;
  60. // InitializeDatabase().GetAwaiter().GetResult();
  61. }
  62. catch (CosmosException e)
  63. {
  64. // Dispose(true);
  65. throw new BizException(e.Message, 500, e.StackTrace);
  66. }
  67. }
  68. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  69. {
  70. try
  71. {
  72. if (!string.IsNullOrEmpty(options.ConnectionString))
  73. {
  74. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  75. }
  76. else
  77. {
  78. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  79. }
  80. DatabaseId = options.Database;
  81. CollectionThroughput = options.CollectionThroughput;
  82. ScanModel = options.ScanModel;
  83. // InitializeDatabase().GetAwaiter().GetResult();
  84. }
  85. catch (CosmosException e)
  86. {
  87. // Dispose(true);
  88. throw new BizException(e.Message, 500, e.StackTrace);
  89. }
  90. }
  91. public async Task InitializeDatabase()
  92. {
  93. try
  94. {
  95. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  96. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  97. while (resultSetIterator.HasMoreResults)
  98. {
  99. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  100. {
  101. DocumentCollectionDict.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), cache = false, monitor = false });
  102. }
  103. }
  104. bool isMonitor = false;
  105. //获取数据库所有的表
  106. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  107. foreach (Type type in types)
  108. {
  109. string PartitionKey = GetPartitionKey(type);
  110. string CollectionName = "";
  111. int RU = 0;
  112. bool cache = false;
  113. bool monitor = false;
  114. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  115. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  116. {
  117. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  118. }
  119. else
  120. {
  121. CollectionName = type.Name;
  122. }
  123. if (attributes.First<CosmosDBAttribute>().Cache)
  124. {
  125. cache = attributes.First<CosmosDBAttribute>().Cache;
  126. }
  127. if (attributes.First<CosmosDBAttribute>().Monitor)
  128. {
  129. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  130. if (monitor)
  131. {
  132. isMonitor = true;
  133. }
  134. }
  135. //else
  136. //{
  137. // cache = false;
  138. //}
  139. if (attributes.First<CosmosDBAttribute>().RU > 400)
  140. {
  141. RU = attributes.First<CosmosDBAttribute>().RU;
  142. }
  143. else
  144. {
  145. RU = CollectionThroughput;
  146. }
  147. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  148. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  149. { //更新RU
  150. cosmosModelInfo.cache = cache;
  151. Container container = CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id);
  152. int? throughputResponse = await container.ReadThroughputAsync();
  153. if (throughputResponse < RU)
  154. {
  155. await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
  156. }
  157. DocumentCollectionDict[CollectionName] = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type };
  158. }
  159. else
  160. {
  161. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  162. if (!string.IsNullOrEmpty(PartitionKey))
  163. {
  164. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  165. }
  166. if (RU > CollectionThroughput)
  167. {
  168. CollectionThroughput = RU;
  169. }
  170. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  171. DocumentCollectionDict.TryAdd(CollectionName, new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type });
  172. }
  173. }
  174. if (isMonitor)
  175. {
  176. ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id" };
  177. Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
  178. DocumentCollectionDict.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
  179. }
  180. }
  181. catch (CosmosException e)
  182. {
  183. throw new BizException(e.Message, 500, e.StackTrace);
  184. }
  185. }
  186. public Dictionary<string, CosmosModelInfo> GetCosmosModelInfo()
  187. {
  188. return this.DocumentCollectionDict;
  189. }
  190. private string GetPartitionKey<T>()
  191. {
  192. Type type = typeof(T);
  193. return GetPartitionKey(type);
  194. }
  195. private string GetPartitionKey(Type type)
  196. {
  197. PropertyInfo[] properties = type.GetProperties();
  198. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  199. foreach (PropertyInfo property in properties)
  200. {
  201. if (property.Name.Equals("PartitionKey"))
  202. {
  203. attrProperties.Add(property);
  204. break;
  205. }
  206. object[] attributes = property.GetCustomAttributes(true);
  207. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  208. {
  209. if (attribute is PartitionKeyAttribute)
  210. {
  211. attrProperties.Add(property);
  212. }
  213. }
  214. }
  215. if (attrProperties.Count <= 0)
  216. {
  217. throw new BizException(type.Name + "has no PartitionKey !");
  218. }
  219. else
  220. {
  221. if (attrProperties.Count == 1)
  222. {
  223. return attrProperties[0].Name;
  224. }
  225. else { throw new BizException("PartitionKey can only be single!"); }
  226. }
  227. }
  228. private async Task<CosmosModelInfo> InitializeCollection<T>()
  229. {
  230. Type type = typeof(T);
  231. string partitionKey = GetPartitionKey<T>();
  232. string CollectionName;
  233. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  234. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  235. {
  236. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  237. }
  238. else
  239. {
  240. CollectionName = type.Name;
  241. }
  242. return await InitializeCollection(CollectionName, partitionKey);
  243. }
  244. private async Task<CosmosModelInfo> InitializeCollection(string CollectionName, string PartitionKey)
  245. {
  246. /////内存中已经存在这个表则直接返回
  247. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  248. {
  249. return cosmosModelInfo;
  250. }///如果没有则尝试默认创建
  251. else
  252. {
  253. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  254. if (!string.IsNullOrEmpty(PartitionKey))
  255. {
  256. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  257. }
  258. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  259. CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = false };
  260. DocumentCollectionDict.TryAdd(CollectionName, cosmosModel);
  261. return cosmosModel;
  262. }
  263. }
  264. public async Task<List<IdPk>> DeleteAll<T>(List<IdPk> ids) where T : ID
  265. {
  266. CosmosModelInfo container = await InitializeCollection<T>();
  267. //string partitionKey = GetPartitionKey<T>();
  268. //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  269. //{
  270. // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
  271. //}));
  272. List<IdPk> idPks = new List<IdPk>();
  273. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  274. Stopwatch stopwatch = Stopwatch.StartNew();
  275. for (int i = 0; i < pages; i++)
  276. {
  277. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  278. List<Task> tasks = new List<Task>(lists.Count);
  279. lists.ForEach(item =>
  280. {
  281. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  282. .ContinueWith((Task<ResponseMessage> task) =>
  283. {
  284. using (ResponseMessage response = task.Result)
  285. {
  286. idPks.Add(new IdPk { id = item.id, pk = item.pk.ToString(), StatusCode = response.StatusCode });
  287. // if (!response.IsSuccessStatusCode)
  288. // {
  289. // }
  290. }
  291. }
  292. ));
  293. });
  294. await Task.WhenAll(tasks);
  295. if (container.cache && RedisHelper.Instance != null)
  296. {
  297. lists.ForEach(async x => {
  298. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  299. });
  300. }
  301. }
  302. stopwatch.Stop();
  303. return idPks;
  304. }
  305. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string, object> dict) where T : ID
  306. {
  307. List<T> list = await FindByDict<T>(dict);
  308. return await DeleteAll(list);
  309. }
  310. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  311. {
  312. List<IdPk> idPks = new List<IdPk>();
  313. CosmosModelInfo container = await InitializeCollection<T>();
  314. string pk = GetPartitionKey<T>();
  315. Type type = typeof(T);
  316. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  317. Stopwatch stopwatch = Stopwatch.StartNew();
  318. for (int i = 0; i < pages; i++)
  319. {
  320. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  321. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  322. lists.ForEach(x =>
  323. {
  324. object o = type.GetProperty(pk).GetValue(x, null);
  325. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  326. itemsToInsert.Add(keyValue);
  327. });
  328. List<Task> tasks = new List<Task>(lists.Count);
  329. itemsToInsert.ForEach(item =>
  330. {
  331. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  332. .ContinueWith((Task<ResponseMessage> task) =>
  333. {
  334. using (ResponseMessage response = task.Result)
  335. {
  336. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  337. }
  338. }
  339. ));
  340. });
  341. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  342. {
  343. lists.ForEach(async x => {
  344. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  345. });
  346. }
  347. }
  348. stopwatch.Stop();
  349. return idPks;
  350. }
  351. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID
  352. {
  353. return await DeleteAsync<T>(idPk.id, idPk.pk);
  354. }
  355. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  356. {
  357. CosmosModelInfo container = await InitializeCollection<T>();
  358. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  359. if (container.cache && RedisHelper.Instance != null)
  360. {
  361. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  362. }
  363. return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
  364. }
  365. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  366. {
  367. CosmosModelInfo container = await InitializeCollection<T>();
  368. string partitionKey = GetPartitionKey<T>();
  369. Type type = typeof(T);
  370. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  371. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  372. if (container.cache && RedisHelper.Instance != null)
  373. {
  374. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  375. }
  376. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
  377. }
  378. //public async Task<T> DeleteAsync<T>(string id) where T : ID
  379. //{
  380. // Container container = await InitializeCollection<T>();
  381. // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
  382. // return response.Resource;
  383. //}
  384. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  385. {
  386. CosmosModelInfo container = await InitializeCollection<T>();
  387. StringBuilder sql;
  388. sql = SQLHelperParametric.GetSQLSelect(propertys);
  389. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = sql.ToString() };
  390. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  391. return await ResultsFromFeedIterator(query);
  392. }
  393. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  394. {
  395. List<T> results = new List<T>();
  396. while (query.HasMoreResults)
  397. {
  398. foreach (T t in await query.ReadNextAsync())
  399. {
  400. results.Add(t);
  401. if (results.Count == maxItemCount)
  402. {
  403. return results;
  404. }
  405. }
  406. }
  407. return results;
  408. }
  409. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  410. {
  411. List<T> results = new List<T>();
  412. while (query.HasMoreResults)
  413. {
  414. if (results.Count() >= itemsPerPage)
  415. {
  416. await batchAction(results);
  417. results.Clear();
  418. }
  419. results.AddRange(await query.ReadNextAsync());
  420. }
  421. if (results.Count() > 0)
  422. {
  423. await batchAction(results);
  424. results.Clear();
  425. }
  426. return results;
  427. }
  428. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, string partitionKey = null, List<string> propertys = null)
  429. {
  430. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  431. {
  432. //StringBuilder sql = new StringBuilder("select value(c) from c");
  433. //SQLHelper.GetSQL(dict, ref sql);
  434. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery, int itemsPerPage = -1, int?
  435. //{
  436. // QueryText = sql.ToString()
  437. //};
  438. StringBuilder sql;
  439. sql = SQLHelperParametric.GetSQLSelect(propertys);
  440. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  441. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  442. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  443. return await ResultsFromFeedIterator(query);
  444. }
  445. else
  446. {
  447. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  448. }
  449. }
  450. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, string partitionKey = null)
  451. {
  452. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  453. {
  454. dict.Remove("@CURRPAGE");
  455. dict.Remove("@PAGESIZE");
  456. dict.Remove("@ASC");
  457. dict.Remove("@DESC");
  458. StringBuilder sql = new StringBuilder("select value count(c) from c");
  459. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  460. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  461. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  462. return await ResultsFromFeedIterator(query);
  463. }
  464. else
  465. {
  466. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  467. }
  468. }
  469. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, string partitionKey = null, List<string> propertys = null) where T : ID
  470. {
  471. return await FindByDict<T>(dict, partitionKey, propertys);
  472. }
  473. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, string partitionKey = null, List<string> propertys = null) where T : ID
  474. {
  475. StringBuilder sql;
  476. sql = SQLHelperParametric.GetSQLSelect(propertys);
  477. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  478. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  479. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  480. }
  481. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  482. {
  483. CosmosModelInfo container = await InitializeCollection<T>();
  484. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  485. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  486. requestOptions: queryOptions);
  487. return await ResultsFromFeedIterator(query);
  488. }
  489. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  490. {
  491. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  492. }
  493. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  494. int? maxBufferedItemCount = null,
  495. int? maxConcurrency = null)
  496. {
  497. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  498. {
  499. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  500. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  501. MaxConcurrency = maxConcurrency ?? 50
  502. };
  503. return queryRequestOptions;
  504. }
  505. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  506. {
  507. CosmosModelInfo container = await InitializeCollection<T>();
  508. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  509. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  510. requestOptions: queryOptions);
  511. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  512. }
  513. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  514. {
  515. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  516. {
  517. MaxItemCount = itemsPerPage
  518. };
  519. return queryRequestOptions;
  520. }
  521. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false) where T : ID
  522. {
  523. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  524. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  525. FeedIterator<T> feedIterator;
  526. CosmosModelInfo container = await InitializeCollection<T>();
  527. if (query == null)
  528. {
  529. if (order != null)
  530. {
  531. if (isDesc)
  532. {
  533. feedIterator = container.container
  534. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  535. .ToFeedIterator();
  536. }
  537. else
  538. {
  539. feedIterator = container.container
  540. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  541. .ToFeedIterator();
  542. }
  543. }
  544. else
  545. {
  546. feedIterator = container.container
  547. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  548. .ToFeedIterator();
  549. }
  550. }
  551. else
  552. {
  553. if (order != null)
  554. {
  555. if (isDesc)
  556. {
  557. feedIterator = container.container
  558. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  559. .Where(query).OrderByDescending(order)
  560. .ToFeedIterator();
  561. }
  562. else
  563. {
  564. feedIterator = container.container
  565. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  566. .Where(query).OrderBy(order)
  567. .ToFeedIterator();
  568. }
  569. }
  570. else
  571. {
  572. feedIterator = container.container
  573. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  574. .Where(query)
  575. .ToFeedIterator();
  576. }
  577. }
  578. return await ResultsFromFeedIterator<T>(feedIterator);
  579. }
  580. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null) where T : ID
  581. {
  582. CosmosModelInfo container = await InitializeCollection<T>();
  583. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  584. if (Parameters != null)
  585. {
  586. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  587. {
  588. QueryText = sql,
  589. Parameters = Parameters
  590. };
  591. FeedIterator<T> feedIterator = container.container
  592. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  593. return await ResultsFromFeedIterator(feedIterator);
  594. }
  595. else
  596. {
  597. QueryDefinition queryDefinition = new QueryDefinition(sql);
  598. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  599. }
  600. }
  601. public async Task<T> Save<T>(T entity) where T : ID
  602. {
  603. try
  604. {
  605. CosmosModelInfo container = await InitializeCollection<T>();
  606. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  607. if (container.cache && RedisHelper.Instance != null)
  608. {
  609. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  610. {
  611. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  612. }
  613. else
  614. {
  615. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  616. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  617. }
  618. }
  619. return response.Resource;
  620. }
  621. catch (Exception e)
  622. {
  623. throw new BizException(e.Message);
  624. }
  625. }
  626. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  627. {
  628. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  629. CosmosModelInfo container = await InitializeCollection<T>();
  630. bool flag = false;
  631. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  632. {
  633. flag = true;
  634. }
  635. string pk = GetPartitionKey<T>();
  636. Type type = typeof(T);
  637. Stopwatch stopwatch = Stopwatch.StartNew();
  638. for (int i = 0; i < pages; i++)
  639. {
  640. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  641. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  642. lists.ForEach(async x =>
  643. {
  644. MemoryStream stream = new MemoryStream();
  645. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  646. object o = type.GetProperty(pk).GetValue(x, null);
  647. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  648. itemsToInsert.Add(keyValue);
  649. });
  650. List<Task> tasks = new List<Task>(lists.Count);
  651. itemsToInsert.ForEach(item =>
  652. {
  653. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  654. .ContinueWith((Task<ResponseMessage> task) =>
  655. {
  656. using (ResponseMessage response = task.Result)
  657. {
  658. if (!response.IsSuccessStatusCode)
  659. {
  660. }
  661. }
  662. }
  663. ));
  664. });
  665. await Task.WhenAll(tasks);
  666. if (container.cache && RedisHelper.Instance != null)
  667. {
  668. lists.ForEach(async x => {
  669. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  670. });
  671. }
  672. }
  673. if (container.cache && RedisHelper.Instance != null && !flag)
  674. {
  675. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  676. }
  677. stopwatch.Stop();
  678. return enyites;
  679. }
  680. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  681. {
  682. CosmosModelInfo container = await InitializeCollection<T>();
  683. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  684. if (container.cache && RedisHelper.Instance != null)
  685. {
  686. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  687. {
  688. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  689. }
  690. else
  691. {
  692. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  693. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  694. }
  695. }
  696. return response.Resource;
  697. }
  698. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  699. {
  700. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  701. //{
  702. // Task.WaitAll(Update(item));
  703. //}));
  704. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  705. CosmosModelInfo container = await InitializeCollection<T>();
  706. bool flag = false;
  707. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  708. {
  709. flag = true;
  710. }
  711. string pk = GetPartitionKey<T>();
  712. Type type = typeof(T);
  713. Stopwatch stopwatch = Stopwatch.StartNew();
  714. for (int i = 0; i < pages; i++)
  715. {
  716. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  717. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  718. lists.ForEach(async x =>
  719. {
  720. MemoryStream stream = new MemoryStream();
  721. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  722. object o = type.GetProperty(pk).GetValue(x, null);
  723. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  724. itemsToInsert.Add(keyValue);
  725. });
  726. List<Task> tasks = new List<Task>(lists.Count);
  727. itemsToInsert.ForEach(item =>
  728. {
  729. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  730. .ContinueWith((Task<ResponseMessage> task) =>
  731. {
  732. //using (ResponseMessage response = task.Result)
  733. //{
  734. // if (!response.IsSuccessStatusCode)
  735. // {
  736. // }
  737. //}
  738. }
  739. ));
  740. });
  741. await Task.WhenAll(tasks);
  742. if (container.cache && RedisHelper.Instance != null)
  743. {
  744. lists.ForEach(async x => {
  745. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  746. });
  747. }
  748. }
  749. if (container.cache && RedisHelper.Instance != null && !flag)
  750. {
  751. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  752. }
  753. stopwatch.Stop();
  754. return enyites;
  755. }
  756. public async Task<T> Update<T>(T entity) where T : ID
  757. {
  758. CosmosModelInfo container = await InitializeCollection<T>();
  759. string pk = GetPartitionKey<T>();
  760. object o = typeof(T).GetProperty(pk).GetValue(entity, null);
  761. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  762. if (container.cache && RedisHelper.Instance != null)
  763. {
  764. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  765. {
  766. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  767. }
  768. else
  769. {
  770. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  771. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  772. }
  773. }
  774. return response.Resource;
  775. }
  776. internal class Item
  777. {
  778. public string id { get; set; }
  779. public string pk { get; set; }
  780. public MemoryStream stream { get; set; }
  781. }
  782. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  783. {
  784. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  785. //{
  786. // Task.WaitAll(Update(item));
  787. //}));
  788. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  789. CosmosModelInfo container = await InitializeCollection<T>();
  790. bool flag = false;
  791. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  792. {
  793. flag = true;
  794. }
  795. string pk = GetPartitionKey<T>();
  796. Type type = typeof(T);
  797. Stopwatch stopwatch = Stopwatch.StartNew();
  798. for (int i = 0; i < pages; i++)
  799. {
  800. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  801. List<Item> itemsToInsert = new List<Item>();
  802. lists.ForEach(async x =>
  803. {
  804. MemoryStream stream = new MemoryStream();
  805. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  806. object o = type.GetProperty(pk).GetValue(x, null);
  807. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  808. itemsToInsert.Add(keyValue);
  809. });
  810. List<Task> tasks = new List<Task>(lists.Count);
  811. itemsToInsert.ForEach(item =>
  812. {
  813. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  814. .ContinueWith((Task<ResponseMessage> task) =>
  815. {
  816. //using (ResponseMessage response = task.Result)
  817. //{
  818. // if (!response.IsSuccessStatusCode)
  819. // {
  820. // }
  821. //}
  822. }
  823. ));
  824. });
  825. await Task.WhenAll(tasks);
  826. if (container.cache && RedisHelper.Instance != null)
  827. {
  828. lists.ForEach(async x => {
  829. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  830. });
  831. }
  832. }
  833. if (container.cache && RedisHelper.Instance != null && !flag)
  834. {
  835. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  836. }
  837. stopwatch.Stop();
  838. return enyites;
  839. }
  840. //public void Dispose()
  841. //{
  842. // Dispose(true);
  843. //}
  844. //protected virtual void Dispose(bool disposing)
  845. //{
  846. // if (disposing)
  847. // {
  848. // CosmosClient?.Dispose();
  849. // }
  850. //}
  851. private async Task<T> FindByIdAsSql<T>(string id) where T : ID
  852. {
  853. CosmosModelInfo container = await InitializeCollection<T>();
  854. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  855. {
  856. QueryText = @"SELECT *
  857. FROM c
  858. WHERE c.id = @id",
  859. Parameters = new Dictionary<string, object>
  860. {
  861. { "@id",id}
  862. }
  863. };
  864. FeedIterator<T> feedIterator = container.container
  865. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  866. return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
  867. }
  868. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  869. {
  870. CosmosModelInfo container = await InitializeCollection<T>();
  871. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  872. return response.Resource;
  873. }
  874. public async Task<T> FindById<T>(string id) where T : ID
  875. {
  876. CosmosModelInfo container = await InitializeCollection<T>();
  877. if (container.cache && RedisHelper.Instance != null)
  878. {
  879. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByIdAsSql<T>(id); });
  880. }
  881. else
  882. {
  883. return await FindByIdAsSql<T>(id);
  884. }
  885. }
  886. public async Task<List<T>> FindByIds<T>(List<string> ids) where T : ID
  887. {
  888. CosmosModelInfo container = await InitializeCollection<T>();
  889. if (container.cache && RedisHelper.Instance != null)
  890. {
  891. List<T> list = new List<T>();
  892. List<string> NotIn = new List<string>();
  893. foreach (string id in ids)
  894. {
  895. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  896. {
  897. NotIn.Add(id);
  898. }
  899. else
  900. {
  901. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  902. }
  903. }
  904. if (NotIn.IsNotEmpty())
  905. {
  906. List<T> noInList = await FindByDict<T>(new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  907. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  908. list.AddRange(noInList);
  909. }
  910. return list;
  911. }
  912. else
  913. {
  914. return await FindByDict<T>(new Dictionary<string, object> { { "id", ids.ToArray() } });
  915. }
  916. }
  917. public async Task<dynamic> FindById(string CollectionName, string id)
  918. {
  919. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  920. {
  921. if (container.cache && RedisHelper.Instance != null)
  922. {
  923. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary<string, object> { { "id", id } }); });
  924. }
  925. else
  926. {
  927. return await FindByDict(CollectionName, new Dictionary<string, object> { { "id", id } });
  928. }
  929. }
  930. else
  931. {
  932. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  933. }
  934. }
  935. public async Task<List<dynamic>> FindByIds(string CollectionName, List<string> ids)
  936. {
  937. if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
  938. {
  939. if (container.cache && RedisHelper.Instance != null)
  940. {
  941. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary<string, object> { { "id", ids.ToArray() } }); });
  942. }
  943. else
  944. {
  945. return await FindByDict(CollectionName, new Dictionary<string, object> { { "id", ids.ToArray() } });
  946. }
  947. }
  948. else
  949. {
  950. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  951. }
  952. }
  953. }
  954. }