AzureCosmosDBV3Repository.cs 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Linq.Expressions;
  10. using System.Net;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading.Tasks;
  15. using TEAMModelOS.SDK.Context.Attributes.Azure;
  16. using TEAMModelOS.SDK.Context.Exception;
  17. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  18. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  19. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  20. {
  21. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  22. {
  23. private CosmosClient CosmosClient { get; set; }
  24. /// <summary>
  25. /// 线程安全的dict类型
  26. /// </summary>
  27. private Dictionary<string, Container> DocumentCollectionDict { get; set; } = new Dictionary<string, Container>();
  28. private string DatabaseId { get; set; }
  29. private int CollectionThroughput { get; set; }
  30. private Database database { get; set; }
  31. int pageSize =200;
  32. private string[] ScanModel { get; set; }
  33. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  34. {
  35. try
  36. {
  37. if (!string.IsNullOrEmpty(options.ConnectionString))
  38. {
  39. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  40. }
  41. else
  42. {
  43. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  44. }
  45. DatabaseId = options.Database;
  46. CollectionThroughput = options.CollectionThroughput;
  47. ScanModel = options.ScanModel;
  48. // InitializeDatabase().GetAwaiter().GetResult();
  49. }
  50. catch (CosmosException e)
  51. {
  52. // Dispose(true);
  53. throw new BizException(e.Message, 500, e.StackTrace);
  54. }
  55. }
  56. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  57. {
  58. try
  59. {
  60. if (!string.IsNullOrEmpty(options.ConnectionString))
  61. {
  62. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  63. }
  64. else
  65. {
  66. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  67. }
  68. DatabaseId = options.Database;
  69. CollectionThroughput = options.CollectionThroughput;
  70. ScanModel = options.ScanModel;
  71. // InitializeDatabase().GetAwaiter().GetResult();
  72. }
  73. catch (CosmosException e)
  74. {
  75. // Dispose(true);
  76. throw new BizException(e.Message, 500, e.StackTrace);
  77. }
  78. }
  79. public async Task InitializeDatabase()
  80. {
  81. try
  82. {
  83. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  84. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  85. while (resultSetIterator.HasMoreResults)
  86. {
  87. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  88. {
  89. DocumentCollectionDict.TryAdd(container.Id, database.GetContainer(container.Id));
  90. }
  91. }
  92. //获取数据库所有的表
  93. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  94. foreach (Type type in types)
  95. {
  96. string PartitionKey = GetPartitionKey(type);
  97. string CollectionName = "";
  98. int RU = 0;
  99. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  100. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  101. {
  102. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  103. }
  104. else
  105. {
  106. CollectionName = type.Name;
  107. }
  108. if (attributes.First<CosmosDBAttribute>().RU > 400)
  109. {
  110. RU = attributes.First<CosmosDBAttribute>().RU;
  111. }
  112. else
  113. {
  114. RU = CollectionThroughput;
  115. }
  116. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  117. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container collection))
  118. { //更新RU
  119. int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReadThroughputAsync();
  120. if (throughputResponse < RU)
  121. {
  122. await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReplaceThroughputAsync(RU);
  123. }
  124. }
  125. else
  126. {
  127. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  128. if (!string.IsNullOrEmpty(PartitionKey))
  129. {
  130. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  131. }
  132. if (RU > CollectionThroughput)
  133. {
  134. CollectionThroughput = RU;
  135. }
  136. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  137. DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
  138. }
  139. }
  140. }
  141. catch (CosmosException e)
  142. {
  143. throw new BizException(e.Message, 500, e.StackTrace);
  144. }
  145. }
  146. private string GetPartitionKey<T>()
  147. {
  148. Type type = typeof(T);
  149. return GetPartitionKey(type);
  150. }
  151. private string GetPartitionKey(Type type)
  152. {
  153. PropertyInfo[] properties = type.GetProperties();
  154. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  155. foreach (PropertyInfo property in properties)
  156. {
  157. if (property.Name.Equals("PartitionKey"))
  158. {
  159. attrProperties.Add(property);
  160. break;
  161. }
  162. object[] attributes = property.GetCustomAttributes(true);
  163. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  164. {
  165. if (attribute is PartitionKeyAttribute)
  166. {
  167. attrProperties.Add(property);
  168. }
  169. }
  170. }
  171. if (attrProperties.Count <= 0)
  172. {
  173. throw new BizException(type.Name + "has no PartitionKey !");
  174. }
  175. else
  176. {
  177. if (attrProperties.Count == 1)
  178. {
  179. return attrProperties[0].Name;
  180. }
  181. else { throw new BizException("PartitionKey can only be single!"); }
  182. }
  183. }
  184. private async Task<Container> InitializeCollection<T>()
  185. {
  186. Type type = typeof(T);
  187. string partitionKey = GetPartitionKey<T>();
  188. string CollectionName;
  189. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  190. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  191. {
  192. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  193. }
  194. else
  195. {
  196. CollectionName = type.Name;
  197. }
  198. return await InitializeCollection(CollectionName, partitionKey);
  199. }
  200. private async Task<Container> InitializeCollection(string CollectionName, string PartitionKey)
  201. {
  202. /////内存中已经存在这个表则直接返回
  203. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container DocumentCollection))
  204. {
  205. return DocumentCollection;
  206. }///如果没有则尝试默认创建
  207. else
  208. {
  209. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  210. if (!string.IsNullOrEmpty(PartitionKey))
  211. {
  212. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  213. }
  214. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  215. DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
  216. return containerWithConsistentIndexing;
  217. }
  218. }
  219. public async Task DeleteAll<T>(List<KeyValuePair<string, string>> ids) where T : ID
  220. {
  221. Container container = await InitializeCollection<T>();
  222. //string partitionKey = GetPartitionKey<T>();
  223. //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  224. //{
  225. // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
  226. //}));
  227. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  228. Stopwatch stopwatch = Stopwatch.StartNew();
  229. for (int i = 0; i < pages; i++)
  230. {
  231. List<KeyValuePair<string, string>> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  232. List<Task> tasks = new List<Task>(lists.Count);
  233. lists.ForEach(item =>
  234. {
  235. tasks.Add(container.DeleteItemStreamAsync(item.Value, new PartitionKey(item.Key))
  236. .ContinueWith((Task<ResponseMessage> task) =>
  237. {
  238. //using (ResponseMessage response = task.Result)
  239. //{
  240. // if (!response.IsSuccessStatusCode)
  241. // {
  242. // }
  243. //}
  244. }
  245. ));
  246. });
  247. await Task.WhenAll(tasks);
  248. }
  249. stopwatch.Stop();
  250. }
  251. public async Task DeleteAll<T>(List<T> enyites) where T : ID
  252. {
  253. Container container = await InitializeCollection<T>();
  254. string pk = GetPartitionKey<T>();
  255. Type type = typeof(T);
  256. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  257. Stopwatch stopwatch = Stopwatch.StartNew();
  258. for (int i = 0; i < pages; i++)
  259. {
  260. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  261. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  262. lists.ForEach(x =>
  263. {
  264. object o = type.GetProperty(pk).GetValue(x, null);
  265. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  266. itemsToInsert.Add(keyValue);
  267. });
  268. List<Task> tasks = new List<Task>(lists.Count);
  269. itemsToInsert.ForEach(item =>
  270. {
  271. tasks.Add(container.DeleteItemStreamAsync(item.Value, item.Key)
  272. .ContinueWith((Task<ResponseMessage> task) =>
  273. {
  274. //using (ResponseMessage response = task.Result)
  275. //{
  276. // if (!response.IsSuccessStatusCode)
  277. // {
  278. // }
  279. //}
  280. }
  281. ));
  282. });
  283. await Task.WhenAll(tasks);
  284. }
  285. stopwatch.Stop();
  286. }
  287. public async Task<T> DeleteAsync<T>(string id, string pk) where T : ID
  288. {
  289. Container container = await InitializeCollection<T>();
  290. ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  291. return response.Resource;
  292. }
  293. public async Task<T> DeleteAsync<T>(T entity) where T : ID
  294. {
  295. Container container = await InitializeCollection<T>();
  296. string partitionKey = GetPartitionKey<T>();
  297. Type type = typeof(T);
  298. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  299. ItemResponse<T> response = await container.DeleteItemAsync<T>(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  300. return response.Resource;
  301. }
  302. //public async Task<T> DeleteAsync<T>(string id) where T : ID
  303. //{
  304. // Container container = await InitializeCollection<T>();
  305. // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
  306. // return response.Resource;
  307. //}
  308. public async Task<List<T>> FindAll<T>() where T : ID
  309. {
  310. Container container = await InitializeCollection<T>();
  311. return await ResultsFromFeedIterator(container.GetItemQueryIterator<T>());
  312. }
  313. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  314. {
  315. List<T> results = new List<T>();
  316. while (query.HasMoreResults)
  317. {
  318. foreach (T t in await query.ReadNextAsync())
  319. {
  320. results.Add(t);
  321. if (results.Count == maxItemCount)
  322. {
  323. return results;
  324. }
  325. }
  326. }
  327. return results;
  328. }
  329. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  330. {
  331. List<T> results = new List<T>();
  332. while (query.HasMoreResults)
  333. {
  334. if (results.Count() >= itemsPerPage)
  335. {
  336. await batchAction(results);
  337. results.Clear();
  338. }
  339. results.AddRange(await query.ReadNextAsync());
  340. }
  341. if (results.Count() > 0)
  342. {
  343. await batchAction(results);
  344. results.Clear();
  345. }
  346. return results;
  347. }
  348. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
  349. {
  350. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
  351. {
  352. //StringBuilder sql = new StringBuilder("select value(c) from c");
  353. //SQLHelper.GetSQL(dict, ref sql);
  354. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  355. //{
  356. // QueryText = sql.ToString()
  357. //};
  358. StringBuilder sql = new StringBuilder("select value(c) from c");
  359. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict,sql);
  360. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  361. FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  362. return await ResultsFromFeedIterator(query, maxItemCount);
  363. }
  364. else
  365. {
  366. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  367. }
  368. }
  369. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
  370. {
  371. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
  372. {
  373. //StringBuilder sql = new StringBuilder("select value count(c) from c");
  374. //SQLHelper.GetSQL(dict, ref sql);
  375. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  376. //{
  377. // QueryText = sql.ToString()
  378. //};
  379. StringBuilder sql = new StringBuilder("select value count(c) from c");
  380. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  381. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  382. FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  383. return await ResultsFromFeedIterator(query, maxItemCount);
  384. }
  385. else
  386. {
  387. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  388. }
  389. }
  390. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) where T : ID
  391. {
  392. return await FindByDict<T>(dict, itemsPerPage, maxItemCount, partitionKey);
  393. }
  394. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) where T : ID
  395. {
  396. StringBuilder sql = new StringBuilder("select value(c) from c");
  397. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  398. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  399. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  400. }
  401. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
  402. {
  403. Container container = await InitializeCollection<T>();
  404. FeedIterator<T> query = container.GetItemQueryIterator<T>(
  405. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  406. requestOptions: queryOptions);
  407. return await ResultsFromFeedIterator(query, maxItemCount);
  408. }
  409. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  410. {
  411. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  412. }
  413. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  414. int? maxBufferedItemCount = null,
  415. int? maxConcurrency = null)
  416. {
  417. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  418. {
  419. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  420. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  421. MaxConcurrency = maxConcurrency ?? 50
  422. };
  423. return queryRequestOptions;
  424. }
  425. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  426. {
  427. Container container = await InitializeCollection<T>();
  428. FeedIterator<T> query = container.GetItemQueryIterator<T>(
  429. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  430. requestOptions: queryOptions);
  431. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  432. }
  433. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  434. {
  435. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  436. {
  437. MaxItemCount = itemsPerPage
  438. };
  439. return queryRequestOptions;
  440. }
  441. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  442. {
  443. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  444. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  445. FeedIterator<T> feedIterator;
  446. Container container = await InitializeCollection<T>();
  447. if (query == null)
  448. {
  449. if (order != null)
  450. {
  451. if (isDesc)
  452. {
  453. feedIterator = container
  454. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  455. .ToFeedIterator();
  456. }
  457. else
  458. {
  459. feedIterator = container
  460. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  461. .ToFeedIterator();
  462. }
  463. }
  464. else
  465. {
  466. feedIterator = container
  467. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  468. .ToFeedIterator();
  469. }
  470. }
  471. else
  472. {
  473. if (order != null)
  474. {
  475. if (isDesc)
  476. {
  477. feedIterator = container
  478. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  479. .Where(query).OrderByDescending(order)
  480. .ToFeedIterator();
  481. }
  482. else
  483. {
  484. feedIterator = container
  485. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  486. .Where(query).OrderBy(order)
  487. .ToFeedIterator();
  488. }
  489. }
  490. else
  491. {
  492. feedIterator = container
  493. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  494. .Where(query)
  495. .ToFeedIterator();
  496. }
  497. }
  498. return await ResultsFromFeedIterator<T>(feedIterator);
  499. }
  500. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  501. {
  502. Container container = await InitializeCollection<T>();
  503. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
  504. if (Parameters != null)
  505. {
  506. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  507. {
  508. QueryText = sql,
  509. Parameters = Parameters
  510. };
  511. FeedIterator<T> feedIterator = container
  512. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  513. return await ResultsFromFeedIterator(feedIterator);
  514. }
  515. else
  516. {
  517. QueryDefinition queryDefinition = new QueryDefinition(sql);
  518. return await ResultsFromFeedIterator<T>(container.GetItemQueryIterator<T>(queryDefinition));
  519. }
  520. }
  521. public async Task<T> Save<T>(T entity) where T : ID
  522. {
  523. try
  524. {
  525. Container container = await InitializeCollection<T>();
  526. ItemResponse<T> response = await container.CreateItemAsync<T>(entity);
  527. return response.Resource;
  528. }
  529. catch (Exception e)
  530. {
  531. throw new BizException(e.Message);
  532. }
  533. }
  534. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  535. {
  536. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  537. Container container = await InitializeCollection<T>();
  538. string pk= GetPartitionKey<T>();
  539. Type type = typeof(T);
  540. Stopwatch stopwatch = Stopwatch.StartNew();
  541. for (int i = 0; i < pages; i++)
  542. {
  543. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  544. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  545. lists.ForEach(async x =>
  546. {
  547. MemoryStream stream = new MemoryStream();
  548. await JsonSerializer.SerializeAsync(stream, x);
  549. object o = type.GetProperty(pk).GetValue(x, null);
  550. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  551. itemsToInsert.Add(keyValue);
  552. });
  553. List<Task> tasks = new List<Task>(lists.Count);
  554. itemsToInsert.ForEach(item =>
  555. {
  556. tasks.Add(container.CreateItemStreamAsync(item.Value, item.Key)
  557. .ContinueWith((Task<ResponseMessage> task) =>
  558. {
  559. using (ResponseMessage response = task.Result)
  560. {
  561. if (!response.IsSuccessStatusCode)
  562. {
  563. }
  564. }
  565. }
  566. ));
  567. });
  568. await Task.WhenAll(tasks);
  569. }
  570. stopwatch.Stop();
  571. return enyites;
  572. }
  573. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  574. {
  575. Container container = await InitializeCollection<T>();
  576. ItemResponse<T> response = await container.UpsertItemAsync(item: entity);
  577. if (response.StatusCode.Equals(HttpStatusCode.OK))
  578. {
  579. return response.Resource;
  580. }
  581. else { throw new BizException("error"); }
  582. }
  583. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  584. {
  585. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  586. //{
  587. // Task.WaitAll(Update(item));
  588. //}));
  589. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  590. Container container = await InitializeCollection<T>();
  591. string pk = GetPartitionKey<T>();
  592. Type type = typeof(T);
  593. Stopwatch stopwatch = Stopwatch.StartNew();
  594. for (int i = 0; i < pages; i++)
  595. {
  596. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  597. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  598. lists.ForEach(async x =>
  599. {
  600. MemoryStream stream = new MemoryStream();
  601. await JsonSerializer.SerializeAsync(stream, x);
  602. object o = type.GetProperty(pk).GetValue(x, null);
  603. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  604. itemsToInsert.Add(keyValue);
  605. });
  606. List<Task> tasks = new List<Task>(lists.Count);
  607. itemsToInsert.ForEach(item =>
  608. {
  609. tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
  610. .ContinueWith((Task<ResponseMessage> task) =>
  611. {
  612. //using (ResponseMessage response = task.Result)
  613. //{
  614. // if (!response.IsSuccessStatusCode)
  615. // {
  616. // }
  617. //}
  618. }
  619. ));
  620. });
  621. await Task.WhenAll(tasks);
  622. }
  623. stopwatch.Stop();
  624. return enyites;
  625. }
  626. public async Task<T> Update<T>(T entity) where T : ID
  627. {
  628. Container container = await InitializeCollection<T>();
  629. string pk = GetPartitionKey<T>();
  630. object o = typeof(T).GetProperty(pk).GetValue(entity, null);
  631. ItemResponse<T> response = await container.ReplaceItemAsync(entity,entity.id,new PartitionKey(o.ToString()));
  632. return response.Resource;
  633. }
  634. internal class Item {
  635. public string id { get; set; }
  636. public string pk { get; set; }
  637. public MemoryStream stream { get; set; }
  638. }
  639. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  640. {
  641. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  642. //{
  643. // Task.WaitAll(Update(item));
  644. //}));
  645. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  646. Container container = await InitializeCollection<T>();
  647. string pk = GetPartitionKey<T>();
  648. Type type = typeof(T);
  649. Stopwatch stopwatch = Stopwatch.StartNew();
  650. for (int i = 0; i < pages; i++)
  651. {
  652. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  653. List<Item> itemsToInsert = new List<Item>();
  654. lists.ForEach(async x =>
  655. {
  656. MemoryStream stream = new MemoryStream();
  657. await JsonSerializer.SerializeAsync(stream, x);
  658. object o = type.GetProperty(pk).GetValue(x, null);
  659. Item keyValue = new Item { id=x.id,pk=o.ToString(),stream=stream};
  660. itemsToInsert.Add(keyValue);
  661. });
  662. List<Task> tasks = new List<Task>(lists.Count);
  663. itemsToInsert.ForEach(item =>
  664. {
  665. tasks.Add(container.ReplaceItemStreamAsync(item.stream, item.id,new PartitionKey(item.pk))
  666. .ContinueWith((Task<ResponseMessage> task) =>
  667. {
  668. //using (ResponseMessage response = task.Result)
  669. //{
  670. // if (!response.IsSuccessStatusCode)
  671. // {
  672. // }
  673. //}
  674. }
  675. ));
  676. });
  677. await Task.WhenAll(tasks);
  678. }
  679. stopwatch.Stop();
  680. return enyites;
  681. }
  682. //public void Dispose()
  683. //{
  684. // Dispose(true);
  685. //}
  686. //protected virtual void Dispose(bool disposing)
  687. //{
  688. // if (disposing)
  689. // {
  690. // CosmosClient?.Dispose();
  691. // }
  692. //}
  693. public async Task<T> FindById<T>(string id) where T : ID
  694. {
  695. Container container = await InitializeCollection<T>();
  696. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  697. {
  698. QueryText = @"SELECT *
  699. FROM c
  700. WHERE c.id = @id",
  701. Parameters = new Dictionary<string, object>
  702. {
  703. { "@id",id}
  704. }
  705. };
  706. FeedIterator<T> feedIterator = container
  707. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  708. return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
  709. }
  710. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  711. {
  712. Container container = await InitializeCollection<T>();
  713. ItemResponse<T> response = await container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  714. return response.Resource;
  715. }
  716. }
  717. }