AzureCosmosDBV3Repository.cs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Linq.Expressions;
  10. using System.Net;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading.Tasks;
  15. using TEAMModelOS.SDK.Context.Attributes.Azure;
  16. using TEAMModelOS.SDK.Context.Exception;
  17. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  18. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  19. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  20. {
  21. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  22. {
  23. private CosmosClient CosmosClient { get; set; }
  24. /// <summary>
  25. /// 线程安全的dict类型
  26. /// </summary>
  27. private Dictionary<string, Container> DocumentCollectionDict { get; set; } = new Dictionary<string, Container>();
  28. private string DatabaseId { get; set; }
  29. private int CollectionThroughput { get; set; }
  30. private Database database { get; set; }
  31. private string[] ScanModel { get; set; }
  32. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  33. {
  34. try
  35. {
  36. if (!string.IsNullOrEmpty(options.ConnectionString))
  37. {
  38. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  39. }
  40. else
  41. {
  42. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  43. }
  44. DatabaseId = options.Database;
  45. CollectionThroughput = options.CollectionThroughput;
  46. ScanModel = options.ScanModel;
  47. // InitializeDatabase().GetAwaiter().GetResult();
  48. }
  49. catch (CosmosException e)
  50. {
  51. // Dispose(true);
  52. throw new BizException(e.Message, 500, e.StackTrace);
  53. }
  54. }
  55. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  56. {
  57. try
  58. {
  59. if (!string.IsNullOrEmpty(options.ConnectionString))
  60. {
  61. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  62. }
  63. else
  64. {
  65. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  66. }
  67. DatabaseId = options.Database;
  68. CollectionThroughput = options.CollectionThroughput;
  69. ScanModel = options.ScanModel;
  70. // InitializeDatabase().GetAwaiter().GetResult();
  71. }
  72. catch (CosmosException e)
  73. {
  74. // Dispose(true);
  75. throw new BizException(e.Message, 500, e.StackTrace);
  76. }
  77. }
  78. public async Task InitializeDatabase()
  79. {
  80. try
  81. {
  82. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  83. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  84. while (resultSetIterator.HasMoreResults)
  85. {
  86. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  87. {
  88. DocumentCollectionDict.TryAdd(container.Id, database.GetContainer(container.Id));
  89. }
  90. }
  91. //获取数据库所有的表
  92. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  93. foreach (Type type in types)
  94. {
  95. string PartitionKey = GetPartitionKey(type);
  96. string CollectionName = "";
  97. int RU = 0;
  98. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  99. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  100. {
  101. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  102. }
  103. else
  104. {
  105. CollectionName = type.Name;
  106. }
  107. if (attributes.First<CosmosDBAttribute>().RU > 400)
  108. {
  109. RU = attributes.First<CosmosDBAttribute>().RU;
  110. }
  111. else
  112. {
  113. RU = CollectionThroughput;
  114. }
  115. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  116. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container collection))
  117. { //更新RU
  118. int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReadThroughputAsync();
  119. if (throughputResponse < RU)
  120. {
  121. await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReplaceThroughputAsync(RU);
  122. }
  123. }
  124. else
  125. {
  126. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  127. if (!string.IsNullOrEmpty(PartitionKey))
  128. {
  129. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  130. }
  131. if (RU > CollectionThroughput)
  132. {
  133. CollectionThroughput = RU;
  134. }
  135. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  136. DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
  137. }
  138. }
  139. }
  140. catch (CosmosException e)
  141. {
  142. throw new BizException(e.Message, 500, e.StackTrace);
  143. }
  144. }
  145. private string GetPartitionKey<T>()
  146. {
  147. Type type = typeof(T);
  148. return GetPartitionKey(type);
  149. }
  150. private string GetPartitionKey(Type type)
  151. {
  152. PropertyInfo[] properties = type.GetProperties();
  153. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  154. foreach (PropertyInfo property in properties)
  155. {
  156. if (property.Name.Equals("PartitionKey"))
  157. {
  158. attrProperties.Add(property);
  159. break;
  160. }
  161. object[] attributes = property.GetCustomAttributes(true);
  162. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  163. {
  164. if (attribute is PartitionKeyAttribute)
  165. {
  166. attrProperties.Add(property);
  167. }
  168. }
  169. }
  170. if (attrProperties.Count <= 0)
  171. {
  172. throw new BizException(type.Name + "has no PartitionKey !");
  173. }
  174. else
  175. {
  176. if (attrProperties.Count == 1)
  177. {
  178. return attrProperties[0].Name;
  179. }
  180. else { throw new BizException("PartitionKey can only be single!"); }
  181. }
  182. }
  183. private async Task<Container> InitializeCollection<T>()
  184. {
  185. Type type = typeof(T);
  186. string partitionKey = GetPartitionKey<T>();
  187. string CollectionName;
  188. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  189. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  190. {
  191. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  192. }
  193. else
  194. {
  195. CollectionName = type.Name;
  196. }
  197. return await InitializeCollection(CollectionName, partitionKey);
  198. }
  199. private async Task<Container> InitializeCollection(string CollectionName, string PartitionKey)
  200. {
  201. /////内存中已经存在这个表则直接返回
  202. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container DocumentCollection))
  203. {
  204. return DocumentCollection;
  205. }///如果没有则尝试默认创建
  206. else
  207. {
  208. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  209. if (!string.IsNullOrEmpty(PartitionKey))
  210. {
  211. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  212. }
  213. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  214. DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
  215. return containerWithConsistentIndexing;
  216. }
  217. }
  218. public async Task DeleteAll<T>(List<KeyValuePair<string, string>> ids) where T : ID
  219. {
  220. Container container = await InitializeCollection<T>();
  221. //string partitionKey = GetPartitionKey<T>();
  222. //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  223. //{
  224. // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
  225. //}));
  226. Stopwatch stopwatch = Stopwatch.StartNew();
  227. List<Task> tasks = new List<Task>(ids.Count);
  228. ids.ForEach(item => {
  229. tasks.Add(container.DeleteItemStreamAsync(item.Value,new PartitionKey(item.Key))
  230. .ContinueWith((Task<ResponseMessage> task) =>
  231. {
  232. //using (ResponseMessage response = task.Result)
  233. //{
  234. // if (!response.IsSuccessStatusCode)
  235. // {
  236. // }
  237. //}
  238. }
  239. ));
  240. });
  241. //await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  242. //{
  243. // Task.WaitAll(Save(item));
  244. //}));
  245. await Task.WhenAll(tasks);
  246. stopwatch.Stop();
  247. }
  248. public async Task DeleteAll<T>(List<T> entities) where T : ID
  249. {
  250. //string partitionKey = GetPartitionKey<T>();
  251. //Type type = typeof(T);
  252. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  253. //{
  254. // object o = type.GetProperty(partitionKey).GetValue(item, null);
  255. // Task.WaitAll(DeleteAsync<T>(item.id, o.ToString()));
  256. //}));
  257. Container container = await InitializeCollection<T>();
  258. string pk = GetPartitionKey<T>();
  259. Type type = typeof(T);
  260. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  261. entities.ForEach( x => {
  262. //MemoryStream stream = new MemoryStream();
  263. //await JsonSerializer.SerializeAsync(stream, x);
  264. object o = type.GetProperty(pk).GetValue(x, null);
  265. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  266. itemsToInsert.Add(keyValue);
  267. });
  268. Stopwatch stopwatch = Stopwatch.StartNew();
  269. List<Task> tasks = new List<Task>(entities.Count);
  270. itemsToInsert.ForEach(item => {
  271. tasks.Add(container.DeleteItemStreamAsync(item.Value, item.Key)
  272. .ContinueWith((Task<ResponseMessage> task) =>
  273. {
  274. //using (ResponseMessage response = task.Result)
  275. //{
  276. // if (!response.IsSuccessStatusCode)
  277. // {
  278. // }
  279. //}
  280. }
  281. ));
  282. });
  283. //await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  284. //{
  285. // Task.WaitAll(Save(item));
  286. //}));
  287. await Task.WhenAll(tasks);
  288. stopwatch.Stop();
  289. }
  290. public async Task<T> DeleteAsync<T>(string id, string pk) where T : ID
  291. {
  292. Container container = await InitializeCollection<T>();
  293. ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  294. return response.Resource;
  295. }
  296. public async Task<T> DeleteAsync<T>(T entity) where T : ID
  297. {
  298. Container container = await InitializeCollection<T>();
  299. string partitionKey = GetPartitionKey<T>();
  300. Type type = typeof(T);
  301. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  302. ItemResponse<T> response = await container.DeleteItemAsync<T>(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  303. return response.Resource;
  304. }
  305. //public async Task<T> DeleteAsync<T>(string id) where T : ID
  306. //{
  307. // Container container = await InitializeCollection<T>();
  308. // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
  309. // return response.Resource;
  310. //}
  311. public async Task<List<T>> FindAll<T>() where T : ID
  312. {
  313. Container container = await InitializeCollection<T>();
  314. return await ResultsFromFeedIterator(container.GetItemQueryIterator<T>());
  315. }
  316. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  317. {
  318. List<T> results = new List<T>();
  319. while (query.HasMoreResults)
  320. {
  321. foreach (T t in await query.ReadNextAsync())
  322. {
  323. results.Add(t);
  324. if (results.Count == maxItemCount)
  325. {
  326. return results;
  327. }
  328. }
  329. }
  330. return results;
  331. }
  332. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  333. {
  334. List<T> results = new List<T>();
  335. while (query.HasMoreResults)
  336. {
  337. if (results.Count() >= itemsPerPage)
  338. {
  339. await batchAction(results);
  340. results.Clear();
  341. }
  342. results.AddRange(await query.ReadNextAsync());
  343. }
  344. if (results.Count() > 0)
  345. {
  346. await batchAction(results);
  347. results.Clear();
  348. }
  349. return results;
  350. }
  351. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
  352. {
  353. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
  354. {
  355. //StringBuilder sql = new StringBuilder("select value(c) from c");
  356. //SQLHelper.GetSQL(dict, ref sql);
  357. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  358. //{
  359. // QueryText = sql.ToString()
  360. //};
  361. StringBuilder sql = new StringBuilder("select value(c) from c");
  362. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict,sql);
  363. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  364. FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  365. return await ResultsFromFeedIterator(query, maxItemCount);
  366. }
  367. else
  368. {
  369. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  370. }
  371. }
  372. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
  373. {
  374. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
  375. {
  376. //StringBuilder sql = new StringBuilder("select value count(c) from c");
  377. //SQLHelper.GetSQL(dict, ref sql);
  378. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  379. //{
  380. // QueryText = sql.ToString()
  381. //};
  382. StringBuilder sql = new StringBuilder("select value count(c) from c");
  383. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  384. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  385. FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  386. return await ResultsFromFeedIterator(query, maxItemCount);
  387. }
  388. else
  389. {
  390. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  391. }
  392. }
  393. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) where T : ID
  394. {
  395. return await FindByDict<T>(dict, itemsPerPage, maxItemCount, partitionKey);
  396. }
  397. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) where T : ID
  398. {
  399. StringBuilder sql = new StringBuilder("select value(c) from c");
  400. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  401. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  402. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  403. }
  404. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
  405. {
  406. Container container = await InitializeCollection<T>();
  407. FeedIterator<T> query = container.GetItemQueryIterator<T>(
  408. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  409. requestOptions: queryOptions);
  410. return await ResultsFromFeedIterator(query, maxItemCount);
  411. }
  412. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  413. {
  414. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  415. }
  416. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  417. int? maxBufferedItemCount = null,
  418. int? maxConcurrency = null)
  419. {
  420. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  421. {
  422. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  423. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  424. MaxConcurrency = maxConcurrency ?? 50
  425. };
  426. return queryRequestOptions;
  427. }
  428. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  429. {
  430. Container container = await InitializeCollection<T>();
  431. FeedIterator<T> query = container.GetItemQueryIterator<T>(
  432. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  433. requestOptions: queryOptions);
  434. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  435. }
  436. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  437. {
  438. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  439. {
  440. MaxItemCount = itemsPerPage
  441. };
  442. return queryRequestOptions;
  443. }
  444. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  445. {
  446. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  447. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  448. FeedIterator<T> feedIterator;
  449. Container container = await InitializeCollection<T>();
  450. if (query == null)
  451. {
  452. if (order != null)
  453. {
  454. if (isDesc)
  455. {
  456. feedIterator = container
  457. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  458. .ToFeedIterator();
  459. }
  460. else
  461. {
  462. feedIterator = container
  463. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  464. .ToFeedIterator();
  465. }
  466. }
  467. else
  468. {
  469. feedIterator = container
  470. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  471. .ToFeedIterator();
  472. }
  473. }
  474. else
  475. {
  476. if (order != null)
  477. {
  478. if (isDesc)
  479. {
  480. feedIterator = container
  481. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  482. .Where(query).OrderByDescending(order)
  483. .ToFeedIterator();
  484. }
  485. else
  486. {
  487. feedIterator = container
  488. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  489. .Where(query).OrderBy(order)
  490. .ToFeedIterator();
  491. }
  492. }
  493. else
  494. {
  495. feedIterator = container
  496. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  497. .Where(query)
  498. .ToFeedIterator();
  499. }
  500. }
  501. return await ResultsFromFeedIterator<T>(feedIterator);
  502. }
  503. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  504. {
  505. Container container = await InitializeCollection<T>();
  506. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
  507. if (Parameters != null)
  508. {
  509. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  510. {
  511. QueryText = sql,
  512. Parameters = Parameters
  513. };
  514. FeedIterator<T> feedIterator = container
  515. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  516. return await ResultsFromFeedIterator(feedIterator);
  517. }
  518. else
  519. {
  520. QueryDefinition queryDefinition = new QueryDefinition(sql);
  521. return await ResultsFromFeedIterator<T>(container.GetItemQueryIterator<T>(queryDefinition));
  522. }
  523. }
  524. //public async Task<List<T>> FindSQL<T>(string sql, bool isPK) where T : ID
  525. //{
  526. // Container container = await InitializeCollection<T>();
  527. // QueryDefinition queryDefinition = new QueryDefinition(sql);
  528. // return await ResultsFromFeedIterator<T>(container.GetItemQueryIterator<T>(queryDefinition));
  529. //}
  530. public async Task<T> ReplaceObject<T>(T entity) where T : ID
  531. {
  532. Container container = await InitializeCollection<T>();
  533. ItemResponse<T> response = await container.ReplaceItemAsync(item: entity, id: entity.id);
  534. if (response.StatusCode.Equals(HttpStatusCode.OK))
  535. {
  536. return response.Resource;
  537. }
  538. else { throw new BizException("error"); }
  539. }
  540. //public async Task<T> ReplaceObject<T>(T entity, string key, string partitionKey) where T : ID
  541. //{
  542. // Container container = await InitializeCollection<T>();
  543. // ItemResponse<T> response = await container.ReplaceItemAsync(item: entity, id: entity.id);
  544. // if (response.StatusCode.Equals(HttpStatusCode.OK))
  545. // {
  546. // return response.Resource;
  547. // }
  548. // else { throw new BizException("error"); }
  549. //}
  550. public async Task<T> Save<T>(T entity) where T : ID
  551. {
  552. try
  553. {
  554. Container container = await InitializeCollection<T>();
  555. ItemResponse<T> response = await container.CreateItemAsync<T>(entity);
  556. return response.Resource;
  557. }
  558. catch (Exception e)
  559. {
  560. throw new BizException(e.Message);
  561. }
  562. }
  563. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  564. {
  565. Container container = await InitializeCollection<T>();
  566. string pk= GetPartitionKey<T>();
  567. Type type = typeof(T);
  568. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  569. enyites.ForEach(async x => {
  570. MemoryStream stream = new MemoryStream();
  571. await JsonSerializer.SerializeAsync(stream, x);
  572. object o = type.GetProperty(pk).GetValue(x, null);
  573. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  574. itemsToInsert.Add(keyValue);
  575. });
  576. int count=0;
  577. Stopwatch stopwatch = Stopwatch.StartNew();
  578. List<Task> tasks = new List<Task>(enyites.Count);
  579. itemsToInsert.ForEach(item => {
  580. tasks.Add(container.CreateItemStreamAsync(item.Value, item.Key)
  581. .ContinueWith((Task<ResponseMessage> task) =>
  582. {
  583. using (ResponseMessage response = task.Result)
  584. {
  585. if (!response.IsSuccessStatusCode)
  586. {
  587. count++;
  588. Console.WriteLine(response.RequestMessage);
  589. }
  590. }
  591. }
  592. ));
  593. });
  594. await Task.WhenAll(tasks);
  595. stopwatch.Stop();
  596. Console.WriteLine(count);
  597. return enyites;
  598. }
  599. public async Task<T> Update<T>(T entity) where T : ID
  600. {
  601. Container container = await InitializeCollection<T>();
  602. ItemResponse<T> response = await container.UpsertItemAsync(entity);
  603. return response.Resource;
  604. }
  605. public async Task<List<T>> UpdateAll<T>(List<T> entities) where T : ID
  606. {
  607. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  608. //{
  609. // Task.WaitAll(Update(item));
  610. //}));
  611. Container container = await InitializeCollection<T>();
  612. string pk = GetPartitionKey<T>();
  613. Type type = typeof(T);
  614. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  615. entities.ForEach(async x => {
  616. MemoryStream stream = new MemoryStream();
  617. await JsonSerializer.SerializeAsync(stream, x);
  618. object o = type.GetProperty(pk).GetValue(x, null);
  619. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  620. itemsToInsert.Add(keyValue);
  621. });
  622. Stopwatch stopwatch = Stopwatch.StartNew();
  623. List<Task> tasks = new List<Task>(entities.Count);
  624. itemsToInsert.ForEach(item => {
  625. tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
  626. .ContinueWith((Task<ResponseMessage> task) =>
  627. {
  628. //using (ResponseMessage response = task.Result)
  629. //{
  630. // if (!response.IsSuccessStatusCode)
  631. // {
  632. // }
  633. //}
  634. }
  635. ));
  636. });
  637. //await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  638. //{
  639. // Task.WaitAll(Save(item));
  640. //}));
  641. await Task.WhenAll(tasks);
  642. stopwatch.Stop();
  643. return entities;
  644. }
  645. //public void Dispose()
  646. //{
  647. // Dispose(true);
  648. //}
  649. //protected virtual void Dispose(bool disposing)
  650. //{
  651. // if (disposing)
  652. // {
  653. // CosmosClient?.Dispose();
  654. // }
  655. //}
  656. public async Task<T> FindById<T>(string id) where T : ID
  657. {
  658. Container container = await InitializeCollection<T>();
  659. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  660. {
  661. QueryText = @"SELECT *
  662. FROM c
  663. WHERE c.id = @id",
  664. Parameters = new Dictionary<string, object>
  665. {
  666. { "@id",id}
  667. }
  668. };
  669. FeedIterator<T> feedIterator = container
  670. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  671. return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
  672. }
  673. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  674. {
  675. Container container = await InitializeCollection<T>();
  676. ItemResponse<T> response = await container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  677. return response.Resource;
  678. }
  679. }
  680. }