AzureCosmosDBV3Repository.cs 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Linq.Expressions;
  10. using System.Net;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading.Tasks;
  15. using TEAMModelOS.SDK.Context.Attributes.Azure;
  16. using TEAMModelOS.SDK.Context.Exception;
  17. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  18. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  19. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  20. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  21. {
  22. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  23. {
  24. private CosmosClient CosmosClient { get; set; }
  25. /// <summary>
  26. /// 线程安全的dict类型
  27. /// </summary>
  28. private Dictionary<string, Container> DocumentCollectionDict { get; set; } = new Dictionary<string, Container>();
  29. private string DatabaseId { get; set; }
  30. private int CollectionThroughput { get; set; }
  31. private Database database { get; set; }
  32. int pageSize = 200;
  33. private string[] ScanModel { get; set; }
  34. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  35. {
  36. try
  37. {
  38. if (!string.IsNullOrEmpty(options.ConnectionString))
  39. {
  40. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  41. }
  42. else
  43. {
  44. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  45. }
  46. DatabaseId = options.Database;
  47. CollectionThroughput = options.CollectionThroughput;
  48. ScanModel = options.ScanModel;
  49. // InitializeDatabase().GetAwaiter().GetResult();
  50. }
  51. catch (CosmosException e)
  52. {
  53. // Dispose(true);
  54. throw new BizException(e.Message, 500, e.StackTrace);
  55. }
  56. }
  57. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  58. {
  59. try
  60. {
  61. if (!string.IsNullOrEmpty(options.ConnectionString))
  62. {
  63. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  64. }
  65. else
  66. {
  67. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  68. }
  69. DatabaseId = options.Database;
  70. CollectionThroughput = options.CollectionThroughput;
  71. ScanModel = options.ScanModel;
  72. // InitializeDatabase().GetAwaiter().GetResult();
  73. }
  74. catch (CosmosException e)
  75. {
  76. // Dispose(true);
  77. throw new BizException(e.Message, 500, e.StackTrace);
  78. }
  79. }
  80. public async Task InitializeDatabase()
  81. {
  82. try
  83. {
  84. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  85. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  86. while (resultSetIterator.HasMoreResults)
  87. {
  88. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  89. {
  90. DocumentCollectionDict.TryAdd(container.Id, database.GetContainer(container.Id));
  91. }
  92. }
  93. //获取数据库所有的表
  94. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  95. foreach (Type type in types)
  96. {
  97. string PartitionKey = GetPartitionKey(type);
  98. string CollectionName = "";
  99. int RU = 0;
  100. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  101. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  102. {
  103. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  104. }
  105. else
  106. {
  107. CollectionName = type.Name;
  108. }
  109. if (attributes.First<CosmosDBAttribute>().RU > 400)
  110. {
  111. RU = attributes.First<CosmosDBAttribute>().RU;
  112. }
  113. else
  114. {
  115. RU = CollectionThroughput;
  116. }
  117. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  118. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container collection))
  119. { //更新RU
  120. int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReadThroughputAsync();
  121. if (throughputResponse < RU)
  122. {
  123. await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReplaceThroughputAsync(RU);
  124. }
  125. }
  126. else
  127. {
  128. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  129. if (!string.IsNullOrEmpty(PartitionKey))
  130. {
  131. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  132. }
  133. if (RU > CollectionThroughput)
  134. {
  135. CollectionThroughput = RU;
  136. }
  137. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  138. DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
  139. }
  140. }
  141. }
  142. catch (CosmosException e)
  143. {
  144. throw new BizException(e.Message, 500, e.StackTrace);
  145. }
  146. }
  147. private string GetPartitionKey<T>()
  148. {
  149. Type type = typeof(T);
  150. return GetPartitionKey(type);
  151. }
  152. private string GetPartitionKey(Type type)
  153. {
  154. PropertyInfo[] properties = type.GetProperties();
  155. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  156. foreach (PropertyInfo property in properties)
  157. {
  158. if (property.Name.Equals("PartitionKey"))
  159. {
  160. attrProperties.Add(property);
  161. break;
  162. }
  163. object[] attributes = property.GetCustomAttributes(true);
  164. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  165. {
  166. if (attribute is PartitionKeyAttribute)
  167. {
  168. attrProperties.Add(property);
  169. }
  170. }
  171. }
  172. if (attrProperties.Count <= 0)
  173. {
  174. throw new BizException(type.Name + "has no PartitionKey !");
  175. }
  176. else
  177. {
  178. if (attrProperties.Count == 1)
  179. {
  180. return attrProperties[0].Name;
  181. }
  182. else { throw new BizException("PartitionKey can only be single!"); }
  183. }
  184. }
  185. private async Task<Container> InitializeCollection<T>()
  186. {
  187. Type type = typeof(T);
  188. string partitionKey = GetPartitionKey<T>();
  189. string CollectionName;
  190. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  191. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  192. {
  193. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  194. }
  195. else
  196. {
  197. CollectionName = type.Name;
  198. }
  199. return await InitializeCollection(CollectionName, partitionKey);
  200. }
  201. private async Task<Container> InitializeCollection(string CollectionName, string PartitionKey)
  202. {
  203. /////内存中已经存在这个表则直接返回
  204. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container DocumentCollection))
  205. {
  206. return DocumentCollection;
  207. }///如果没有则尝试默认创建
  208. else
  209. {
  210. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  211. if (!string.IsNullOrEmpty(PartitionKey))
  212. {
  213. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  214. }
  215. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  216. DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
  217. return containerWithConsistentIndexing;
  218. }
  219. }
  220. public async Task<List<IdPk>> DeleteAll<T>(List<KeyValuePair<string, string>> ids) where T : ID
  221. {
  222. Container container = await InitializeCollection<T>();
  223. //string partitionKey = GetPartitionKey<T>();
  224. //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  225. //{
  226. // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
  227. //}));
  228. List<IdPk> idPks = new List<IdPk>();
  229. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  230. Stopwatch stopwatch = Stopwatch.StartNew();
  231. for (int i = 0; i < pages; i++)
  232. {
  233. List<KeyValuePair<string, string>> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  234. List<Task> tasks = new List<Task>(lists.Count);
  235. lists.ForEach(item =>
  236. {
  237. tasks.Add(container.DeleteItemStreamAsync(item.Value, new PartitionKey(item.Key))
  238. .ContinueWith((Task<ResponseMessage> task) =>
  239. {
  240. using (ResponseMessage response = task.Result)
  241. {
  242. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  243. // if (!response.IsSuccessStatusCode)
  244. // {
  245. // }
  246. }
  247. }
  248. ));
  249. });
  250. await Task.WhenAll(tasks);
  251. }
  252. stopwatch.Stop();
  253. return idPks;
  254. }
  255. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string,object> dict) where T : ID
  256. {
  257. List<T> list= await FindByDict<T>(dict);
  258. return await DeleteAll(list);
  259. }
  260. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  261. {
  262. List<IdPk> idPks = new List<IdPk>();
  263. Container container = await InitializeCollection<T>();
  264. string pk = GetPartitionKey<T>();
  265. Type type = typeof(T);
  266. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  267. Stopwatch stopwatch = Stopwatch.StartNew();
  268. for (int i = 0; i < pages; i++)
  269. {
  270. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  271. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  272. lists.ForEach(x =>
  273. {
  274. object o = type.GetProperty(pk).GetValue(x, null);
  275. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  276. itemsToInsert.Add(keyValue);
  277. });
  278. List<Task> tasks = new List<Task>(lists.Count);
  279. itemsToInsert.ForEach(item =>
  280. {
  281. tasks.Add(container.DeleteItemStreamAsync(item.Value, item.Key)
  282. .ContinueWith((Task<ResponseMessage> task) =>
  283. {
  284. using (ResponseMessage response = task.Result)
  285. {
  286. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  287. }
  288. }
  289. ));
  290. });
  291. await Task.WhenAll(tasks);
  292. }
  293. stopwatch.Stop();
  294. return idPks;
  295. }
  296. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID {
  297. return await DeleteAsync<T>(idPk.id, idPk.pk);
  298. }
  299. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  300. {
  301. Container container = await InitializeCollection<T>();
  302. ResponseMessage response = await container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  303. return new IdPk { id =id, pk = pk, StatusCode = response.StatusCode };
  304. }
  305. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  306. {
  307. Container container = await InitializeCollection<T>();
  308. string partitionKey = GetPartitionKey<T>();
  309. Type type = typeof(T);
  310. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  311. ResponseMessage response = await container.DeleteItemStreamAsync (id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  312. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
  313. }
  314. //public async Task<T> DeleteAsync<T>(string id) where T : ID
  315. //{
  316. // Container container = await InitializeCollection<T>();
  317. // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
  318. // return response.Resource;
  319. //}
  320. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  321. {
  322. Container container = await InitializeCollection<T>();
  323. StringBuilder sql;
  324. sql = SQLHelperParametric.GetSQLSelect(propertys);
  325. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery {QueryText = sql.ToString() };
  326. FeedIterator<T> query = container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  327. return await ResultsFromFeedIterator(query);
  328. }
  329. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  330. {
  331. List<T> results = new List<T>();
  332. while (query.HasMoreResults)
  333. {
  334. foreach (T t in await query.ReadNextAsync())
  335. {
  336. results.Add(t);
  337. if (results.Count == maxItemCount)
  338. {
  339. return results;
  340. }
  341. }
  342. }
  343. return results;
  344. }
  345. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  346. {
  347. List<T> results = new List<T>();
  348. while (query.HasMoreResults)
  349. {
  350. if (results.Count() >= itemsPerPage)
  351. {
  352. await batchAction(results);
  353. results.Clear();
  354. }
  355. results.AddRange(await query.ReadNextAsync());
  356. }
  357. if (results.Count() > 0)
  358. {
  359. await batchAction(results);
  360. results.Clear();
  361. }
  362. return results;
  363. }
  364. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List<string> propertys = null)
  365. {
  366. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
  367. {
  368. //StringBuilder sql = new StringBuilder("select value(c) from c");
  369. //SQLHelper.GetSQL(dict, ref sql);
  370. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  371. //{
  372. // QueryText = sql.ToString()
  373. //};
  374. StringBuilder sql;
  375. sql = SQLHelperParametric.GetSQLSelect(propertys);
  376. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  377. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  378. FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  379. return await ResultsFromFeedIterator(query, maxItemCount);
  380. }
  381. else
  382. {
  383. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  384. }
  385. }
  386. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
  387. {
  388. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
  389. {
  390. //StringBuilder sql = new StringBuilder("select value count(c) from c");
  391. //SQLHelper.GetSQL(dict, ref sql);
  392. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  393. //{
  394. // QueryText = sql.ToString()
  395. //};
  396. StringBuilder sql = new StringBuilder("select value count(c) from c");
  397. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  398. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  399. FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  400. return await ResultsFromFeedIterator(query, maxItemCount);
  401. }
  402. else
  403. {
  404. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  405. }
  406. }
  407. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List<string> propertys = null) where T : ID
  408. {
  409. return await FindByDict<T>(dict, itemsPerPage, maxItemCount, partitionKey, propertys);
  410. }
  411. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null,List<string> propertys = null) where T : ID
  412. {
  413. StringBuilder sql;
  414. sql = SQLHelperParametric.GetSQLSelect(propertys);
  415. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  416. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  417. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  418. }
  419. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
  420. {
  421. Container container = await InitializeCollection<T>();
  422. FeedIterator<T> query = container.GetItemQueryIterator<T>(
  423. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  424. requestOptions: queryOptions);
  425. return await ResultsFromFeedIterator(query, maxItemCount);
  426. }
  427. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  428. {
  429. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  430. }
  431. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  432. int? maxBufferedItemCount = null,
  433. int? maxConcurrency = null)
  434. {
  435. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  436. {
  437. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  438. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  439. MaxConcurrency = maxConcurrency ?? 50
  440. };
  441. return queryRequestOptions;
  442. }
  443. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  444. {
  445. Container container = await InitializeCollection<T>();
  446. FeedIterator<T> query = container.GetItemQueryIterator<T>(
  447. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  448. requestOptions: queryOptions);
  449. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  450. }
  451. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  452. {
  453. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  454. {
  455. MaxItemCount = itemsPerPage
  456. };
  457. return queryRequestOptions;
  458. }
  459. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  460. {
  461. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  462. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  463. FeedIterator<T> feedIterator;
  464. Container container = await InitializeCollection<T>();
  465. if (query == null)
  466. {
  467. if (order != null)
  468. {
  469. if (isDesc)
  470. {
  471. feedIterator = container
  472. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  473. .ToFeedIterator();
  474. }
  475. else
  476. {
  477. feedIterator = container
  478. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  479. .ToFeedIterator();
  480. }
  481. }
  482. else
  483. {
  484. feedIterator = container
  485. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  486. .ToFeedIterator();
  487. }
  488. }
  489. else
  490. {
  491. if (order != null)
  492. {
  493. if (isDesc)
  494. {
  495. feedIterator = container
  496. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  497. .Where(query).OrderByDescending(order)
  498. .ToFeedIterator();
  499. }
  500. else
  501. {
  502. feedIterator = container
  503. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  504. .Where(query).OrderBy(order)
  505. .ToFeedIterator();
  506. }
  507. }
  508. else
  509. {
  510. feedIterator = container
  511. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  512. .Where(query)
  513. .ToFeedIterator();
  514. }
  515. }
  516. return await ResultsFromFeedIterator<T>(feedIterator);
  517. }
  518. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  519. {
  520. Container container = await InitializeCollection<T>();
  521. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
  522. if (Parameters != null)
  523. {
  524. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  525. {
  526. QueryText = sql,
  527. Parameters = Parameters
  528. };
  529. FeedIterator<T> feedIterator = container
  530. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  531. return await ResultsFromFeedIterator(feedIterator);
  532. }
  533. else
  534. {
  535. QueryDefinition queryDefinition = new QueryDefinition(sql);
  536. return await ResultsFromFeedIterator<T>(container.GetItemQueryIterator<T>(queryDefinition));
  537. }
  538. }
  539. public async Task<T> Save<T>(T entity) where T : ID
  540. {
  541. try
  542. {
  543. Container container = await InitializeCollection<T>();
  544. ItemResponse<T> response = await container.CreateItemAsync<T>(entity);
  545. return response.Resource;
  546. }
  547. catch (Exception e)
  548. {
  549. throw new BizException(e.Message);
  550. }
  551. }
  552. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  553. {
  554. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  555. Container container = await InitializeCollection<T>();
  556. string pk = GetPartitionKey<T>();
  557. Type type = typeof(T);
  558. Stopwatch stopwatch = Stopwatch.StartNew();
  559. for (int i = 0; i < pages; i++)
  560. {
  561. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  562. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  563. lists.ForEach(async x =>
  564. {
  565. MemoryStream stream = new MemoryStream();
  566. await JsonSerializer.SerializeAsync(stream, x ,new JsonSerializerOptions { IgnoreNullValues=true});
  567. object o = type.GetProperty(pk).GetValue(x, null);
  568. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  569. itemsToInsert.Add(keyValue);
  570. });
  571. List<Task> tasks = new List<Task>(lists.Count);
  572. itemsToInsert.ForEach(item =>
  573. {
  574. tasks.Add(container.CreateItemStreamAsync(item.Value, item.Key)
  575. .ContinueWith((Task<ResponseMessage> task) =>
  576. {
  577. using (ResponseMessage response = task.Result)
  578. {
  579. if (!response.IsSuccessStatusCode)
  580. {
  581. }
  582. }
  583. }
  584. ));
  585. });
  586. await Task.WhenAll(tasks);
  587. }
  588. stopwatch.Stop();
  589. return enyites;
  590. }
  591. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  592. {
  593. Container container = await InitializeCollection<T>();
  594. ItemResponse<T> response = await container.UpsertItemAsync(item: entity);
  595. return response.Resource;
  596. }
  597. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  598. {
  599. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  600. //{
  601. // Task.WaitAll(Update(item));
  602. //}));
  603. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  604. Container container = await InitializeCollection<T>();
  605. string pk = GetPartitionKey<T>();
  606. Type type = typeof(T);
  607. Stopwatch stopwatch = Stopwatch.StartNew();
  608. for (int i = 0; i < pages; i++)
  609. {
  610. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  611. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  612. lists.ForEach(async x =>
  613. {
  614. MemoryStream stream = new MemoryStream();
  615. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  616. object o = type.GetProperty(pk).GetValue(x, null);
  617. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  618. itemsToInsert.Add(keyValue);
  619. });
  620. List<Task> tasks = new List<Task>(lists.Count);
  621. itemsToInsert.ForEach(item =>
  622. {
  623. tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
  624. .ContinueWith((Task<ResponseMessage> task) =>
  625. {
  626. //using (ResponseMessage response = task.Result)
  627. //{
  628. // if (!response.IsSuccessStatusCode)
  629. // {
  630. // }
  631. //}
  632. }
  633. ));
  634. });
  635. await Task.WhenAll(tasks);
  636. }
  637. stopwatch.Stop();
  638. return enyites;
  639. }
  640. public async Task<T> Update<T>(T entity) where T : ID
  641. {
  642. Container container = await InitializeCollection<T>();
  643. string pk = GetPartitionKey<T>();
  644. object o = typeof(T).GetProperty(pk).GetValue(entity, null);
  645. ItemResponse<T> response = await container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  646. return response.Resource;
  647. }
  648. internal class Item
  649. {
  650. public string id { get; set; }
  651. public string pk { get; set; }
  652. public MemoryStream stream { get; set; }
  653. }
  654. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  655. {
  656. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  657. //{
  658. // Task.WaitAll(Update(item));
  659. //}));
  660. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  661. Container container = await InitializeCollection<T>();
  662. string pk = GetPartitionKey<T>();
  663. Type type = typeof(T);
  664. Stopwatch stopwatch = Stopwatch.StartNew();
  665. for (int i = 0; i < pages; i++)
  666. {
  667. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  668. List<Item> itemsToInsert = new List<Item>();
  669. lists.ForEach(async x =>
  670. {
  671. MemoryStream stream = new MemoryStream();
  672. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  673. object o = type.GetProperty(pk).GetValue(x, null);
  674. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  675. itemsToInsert.Add(keyValue);
  676. });
  677. List<Task> tasks = new List<Task>(lists.Count);
  678. itemsToInsert.ForEach(item =>
  679. {
  680. tasks.Add(container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  681. .ContinueWith((Task<ResponseMessage> task) =>
  682. {
  683. //using (ResponseMessage response = task.Result)
  684. //{
  685. // if (!response.IsSuccessStatusCode)
  686. // {
  687. // }
  688. //}
  689. }
  690. ));
  691. });
  692. await Task.WhenAll(tasks);
  693. }
  694. stopwatch.Stop();
  695. return enyites;
  696. }
  697. //public void Dispose()
  698. //{
  699. // Dispose(true);
  700. //}
  701. //protected virtual void Dispose(bool disposing)
  702. //{
  703. // if (disposing)
  704. // {
  705. // CosmosClient?.Dispose();
  706. // }
  707. //}
  708. public async Task<T> FindById<T>(string id) where T : ID
  709. {
  710. Container container = await InitializeCollection<T>();
  711. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  712. {
  713. QueryText = @"SELECT *
  714. FROM c
  715. WHERE c.id = @id",
  716. Parameters = new Dictionary<string, object>
  717. {
  718. { "@id",id}
  719. }
  720. };
  721. FeedIterator<T> feedIterator = container
  722. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  723. return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
  724. }
  725. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  726. {
  727. Container container = await InitializeCollection<T>();
  728. ItemResponse<T> response = await container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  729. return response.Resource;
  730. }
  731. }
  732. }