AzureCosmosDBV3Repository.cs 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Linq.Expressions;
  10. using System.Net;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Text.Json;
  14. using System.Threading.Tasks;
  15. using TEAMModelOS.SDK.Context.Attributes.Azure;
  16. using TEAMModelOS.SDK.Context.Exception;
  17. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  18. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  19. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  20. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  21. {
  22. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  23. {
  24. private CosmosClient CosmosClient { get; set; }
  25. /// <summary>
  26. /// 线程安全的dict类型
  27. /// </summary>
  28. private Dictionary<string, Container> DocumentCollectionDict { get; set; } = new Dictionary<string, Container>();
  29. private string DatabaseId { get; set; }
  30. private int CollectionThroughput { get; set; }
  31. private Database database { get; set; }
  32. int pageSize = 200;
  33. private string[] ScanModel { get; set; }
  34. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  35. {
  36. try
  37. {
  38. if (!string.IsNullOrEmpty(options.ConnectionString))
  39. {
  40. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  41. }
  42. else
  43. {
  44. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  45. }
  46. DatabaseId = options.Database;
  47. CollectionThroughput = options.CollectionThroughput;
  48. ScanModel = options.ScanModel;
  49. // InitializeDatabase().GetAwaiter().GetResult();
  50. }
  51. catch (CosmosException e)
  52. {
  53. // Dispose(true);
  54. throw new BizException(e.Message, 500, e.StackTrace);
  55. }
  56. }
  57. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  58. {
  59. try
  60. {
  61. if (!string.IsNullOrEmpty(options.ConnectionString))
  62. {
  63. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  64. }
  65. else
  66. {
  67. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  68. }
  69. DatabaseId = options.Database;
  70. CollectionThroughput = options.CollectionThroughput;
  71. ScanModel = options.ScanModel;
  72. // InitializeDatabase().GetAwaiter().GetResult();
  73. }
  74. catch (CosmosException e)
  75. {
  76. // Dispose(true);
  77. throw new BizException(e.Message, 500, e.StackTrace);
  78. }
  79. }
  80. public async Task InitializeDatabase()
  81. {
  82. try
  83. {
  84. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  85. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  86. while (resultSetIterator.HasMoreResults)
  87. {
  88. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  89. {
  90. DocumentCollectionDict.TryAdd(container.Id, database.GetContainer(container.Id));
  91. }
  92. }
  93. //获取数据库所有的表
  94. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  95. foreach (Type type in types)
  96. {
  97. string PartitionKey = GetPartitionKey(type);
  98. string CollectionName = "";
  99. int RU = 0;
  100. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  101. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  102. {
  103. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  104. }
  105. else
  106. {
  107. CollectionName = type.Name;
  108. }
  109. if (attributes.First<CosmosDBAttribute>().RU > 400)
  110. {
  111. RU = attributes.First<CosmosDBAttribute>().RU;
  112. }
  113. else
  114. {
  115. RU = CollectionThroughput;
  116. }
  117. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  118. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container collection))
  119. { //更新RU
  120. int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReadThroughputAsync();
  121. if (throughputResponse < RU)
  122. {
  123. await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReplaceThroughputAsync(RU);
  124. }
  125. }
  126. else
  127. {
  128. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  129. if (!string.IsNullOrEmpty(PartitionKey))
  130. {
  131. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  132. }
  133. if (RU > CollectionThroughput)
  134. {
  135. CollectionThroughput = RU;
  136. }
  137. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  138. DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
  139. }
  140. }
  141. }
  142. catch (CosmosException e)
  143. {
  144. throw new BizException(e.Message, 500, e.StackTrace);
  145. }
  146. }
  147. private string GetPartitionKey<T>()
  148. {
  149. Type type = typeof(T);
  150. return GetPartitionKey(type);
  151. }
  152. private string GetPartitionKey(Type type)
  153. {
  154. PropertyInfo[] properties = type.GetProperties();
  155. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  156. foreach (PropertyInfo property in properties)
  157. {
  158. if (property.Name.Equals("PartitionKey"))
  159. {
  160. attrProperties.Add(property);
  161. break;
  162. }
  163. object[] attributes = property.GetCustomAttributes(true);
  164. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  165. {
  166. if (attribute is PartitionKeyAttribute)
  167. {
  168. attrProperties.Add(property);
  169. }
  170. }
  171. }
  172. if (attrProperties.Count <= 0)
  173. {
  174. throw new BizException(type.Name + "has no PartitionKey !");
  175. }
  176. else
  177. {
  178. if (attrProperties.Count == 1)
  179. {
  180. return attrProperties[0].Name;
  181. }
  182. else { throw new BizException("PartitionKey can only be single!"); }
  183. }
  184. }
  185. private async Task<Container> InitializeCollection<T>()
  186. {
  187. Type type = typeof(T);
  188. string partitionKey = GetPartitionKey<T>();
  189. string CollectionName;
  190. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  191. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  192. {
  193. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  194. }
  195. else
  196. {
  197. CollectionName = type.Name;
  198. }
  199. return await InitializeCollection(CollectionName, partitionKey);
  200. }
  201. private async Task<Container> InitializeCollection(string CollectionName, string PartitionKey)
  202. {
  203. /////内存中已经存在这个表则直接返回
  204. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container DocumentCollection))
  205. {
  206. return DocumentCollection;
  207. }///如果没有则尝试默认创建
  208. else
  209. {
  210. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
  211. if (!string.IsNullOrEmpty(PartitionKey))
  212. {
  213. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  214. }
  215. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  216. DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
  217. return containerWithConsistentIndexing;
  218. }
  219. }
  220. public async Task<List<IdPk>> DeleteAll<T>(List<KeyValuePair<string, string>> ids) where T : ID
  221. {
  222. Container container = await InitializeCollection<T>();
  223. //string partitionKey = GetPartitionKey<T>();
  224. //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  225. //{
  226. // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
  227. //}));
  228. List<IdPk> idPks = new List<IdPk>();
  229. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  230. Stopwatch stopwatch = Stopwatch.StartNew();
  231. for (int i = 0; i < pages; i++)
  232. {
  233. List<KeyValuePair<string, string>> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  234. List<Task> tasks = new List<Task>(lists.Count);
  235. lists.ForEach(item =>
  236. {
  237. tasks.Add(container.DeleteItemStreamAsync(item.Value, new PartitionKey(item.Key))
  238. .ContinueWith((Task<ResponseMessage> task) =>
  239. {
  240. using (ResponseMessage response = task.Result)
  241. {
  242. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  243. // if (!response.IsSuccessStatusCode)
  244. // {
  245. // }
  246. }
  247. }
  248. ));
  249. });
  250. await Task.WhenAll(tasks);
  251. }
  252. stopwatch.Stop();
  253. return idPks;
  254. }
  255. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string,object> dict) where T : ID
  256. {
  257. List<T> list= await FindByDict<T>(dict);
  258. return await DeleteAll(list);
  259. }
  260. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  261. {
  262. List<IdPk> idPks = new List<IdPk>();
  263. Container container = await InitializeCollection<T>();
  264. string pk = GetPartitionKey<T>();
  265. Type type = typeof(T);
  266. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  267. Stopwatch stopwatch = Stopwatch.StartNew();
  268. for (int i = 0; i < pages; i++)
  269. {
  270. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  271. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  272. lists.ForEach(x =>
  273. {
  274. object o = type.GetProperty(pk).GetValue(x, null);
  275. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  276. itemsToInsert.Add(keyValue);
  277. });
  278. List<Task> tasks = new List<Task>(lists.Count);
  279. itemsToInsert.ForEach(item =>
  280. {
  281. tasks.Add(container.DeleteItemStreamAsync(item.Value, item.Key)
  282. .ContinueWith((Task<ResponseMessage> task) =>
  283. {
  284. using (ResponseMessage response = task.Result)
  285. {
  286. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  287. }
  288. }
  289. ));
  290. });
  291. await Task.WhenAll(tasks);
  292. }
  293. stopwatch.Stop();
  294. return idPks;
  295. }
  296. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID {
  297. return await DeleteAsync<T>(idPk.id, idPk.pk);
  298. }
  299. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  300. {
  301. Container container = await InitializeCollection<T>();
  302. ResponseMessage response = await container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  303. return new IdPk { id =id, pk = pk, StatusCode = response.StatusCode };
  304. }
  305. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  306. {
  307. Container container = await InitializeCollection<T>();
  308. string partitionKey = GetPartitionKey<T>();
  309. Type type = typeof(T);
  310. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  311. ResponseMessage response = await container.DeleteItemStreamAsync (id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  312. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
  313. }
  314. //public async Task<T> DeleteAsync<T>(string id) where T : ID
  315. //{
  316. // Container container = await InitializeCollection<T>();
  317. // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
  318. // return response.Resource;
  319. //}
  320. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  321. {
  322. Container container = await InitializeCollection<T>();
  323. StringBuilder sql;
  324. sql = SQLHelperParametric.GetSQLSelect(propertys);
  325. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery {QueryText = sql.ToString() };
  326. FeedIterator<T> query = container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  327. return await ResultsFromFeedIterator(query);
  328. }
  329. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  330. {
  331. List<T> results = new List<T>();
  332. while (query.HasMoreResults)
  333. {
  334. foreach (T t in await query.ReadNextAsync())
  335. {
  336. results.Add(t);
  337. if (results.Count == maxItemCount)
  338. {
  339. return results;
  340. }
  341. }
  342. }
  343. return results;
  344. }
  345. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  346. {
  347. List<T> results = new List<T>();
  348. while (query.HasMoreResults)
  349. {
  350. if (results.Count() >= itemsPerPage)
  351. {
  352. await batchAction(results);
  353. results.Clear();
  354. }
  355. results.AddRange(await query.ReadNextAsync());
  356. }
  357. if (results.Count() > 0)
  358. {
  359. await batchAction(results);
  360. results.Clear();
  361. }
  362. return results;
  363. }
  364. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List<string> propertys = null)
  365. {
  366. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
  367. {
  368. //StringBuilder sql = new StringBuilder("select value(c) from c");
  369. //SQLHelper.GetSQL(dict, ref sql);
  370. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  371. //{
  372. // QueryText = sql.ToString()
  373. //};
  374. StringBuilder sql;
  375. sql = SQLHelperParametric.GetSQLSelect(propertys);
  376. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  377. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  378. FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  379. return await ResultsFromFeedIterator(query, maxItemCount);
  380. }
  381. else
  382. {
  383. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  384. }
  385. }
  386. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
  387. {
  388. if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
  389. {
  390. //StringBuilder sql = new StringBuilder("select value count(c) from c");
  391. //SQLHelper.GetSQL(dict, ref sql);
  392. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  393. //{
  394. // QueryText = sql.ToString()
  395. //};
  396. dict.Remove("@CURRPAGE");
  397. dict.Remove("@PAGESIZE");
  398. dict.Remove("@ASC");
  399. dict.Remove("@DESC");
  400. StringBuilder sql = new StringBuilder("select value count(c) from c");
  401. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  402. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  403. FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  404. return await ResultsFromFeedIterator(query, maxItemCount);
  405. }
  406. else
  407. {
  408. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  409. }
  410. }
  411. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List<string> propertys = null) where T : ID
  412. {
  413. return await FindByDict<T>(dict, itemsPerPage, maxItemCount, partitionKey, propertys);
  414. }
  415. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null,List<string> propertys = null) where T : ID
  416. {
  417. StringBuilder sql;
  418. sql = SQLHelperParametric.GetSQLSelect(propertys);
  419. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
  420. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  421. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  422. }
  423. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
  424. {
  425. Container container = await InitializeCollection<T>();
  426. FeedIterator<T> query = container.GetItemQueryIterator<T>(
  427. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  428. requestOptions: queryOptions);
  429. return await ResultsFromFeedIterator(query, maxItemCount);
  430. }
  431. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  432. {
  433. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  434. }
  435. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  436. int? maxBufferedItemCount = null,
  437. int? maxConcurrency = null)
  438. {
  439. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  440. {
  441. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  442. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  443. MaxConcurrency = maxConcurrency ?? 50
  444. };
  445. return queryRequestOptions;
  446. }
  447. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  448. {
  449. Container container = await InitializeCollection<T>();
  450. FeedIterator<T> query = container.GetItemQueryIterator<T>(
  451. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  452. requestOptions: queryOptions);
  453. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  454. }
  455. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  456. {
  457. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  458. {
  459. MaxItemCount = itemsPerPage
  460. };
  461. return queryRequestOptions;
  462. }
  463. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  464. {
  465. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  466. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
  467. FeedIterator<T> feedIterator;
  468. Container container = await InitializeCollection<T>();
  469. if (query == null)
  470. {
  471. if (order != null)
  472. {
  473. if (isDesc)
  474. {
  475. feedIterator = container
  476. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  477. .ToFeedIterator();
  478. }
  479. else
  480. {
  481. feedIterator = container
  482. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  483. .ToFeedIterator();
  484. }
  485. }
  486. else
  487. {
  488. feedIterator = container
  489. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  490. .ToFeedIterator();
  491. }
  492. }
  493. else
  494. {
  495. if (order != null)
  496. {
  497. if (isDesc)
  498. {
  499. feedIterator = container
  500. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  501. .Where(query).OrderByDescending(order)
  502. .ToFeedIterator();
  503. }
  504. else
  505. {
  506. feedIterator = container
  507. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  508. .Where(query).OrderBy(order)
  509. .ToFeedIterator();
  510. }
  511. }
  512. else
  513. {
  514. feedIterator = container
  515. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  516. .Where(query)
  517. .ToFeedIterator();
  518. }
  519. }
  520. return await ResultsFromFeedIterator<T>(feedIterator);
  521. }
  522. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
  523. {
  524. Container container = await InitializeCollection<T>();
  525. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
  526. if (Parameters != null)
  527. {
  528. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  529. {
  530. QueryText = sql,
  531. Parameters = Parameters
  532. };
  533. FeedIterator<T> feedIterator = container
  534. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  535. return await ResultsFromFeedIterator(feedIterator);
  536. }
  537. else
  538. {
  539. QueryDefinition queryDefinition = new QueryDefinition(sql);
  540. return await ResultsFromFeedIterator<T>(container.GetItemQueryIterator<T>(queryDefinition));
  541. }
  542. }
  543. public async Task<T> Save<T>(T entity) where T : ID
  544. {
  545. try
  546. {
  547. Container container = await InitializeCollection<T>();
  548. ItemResponse<T> response = await container.CreateItemAsync<T>(entity);
  549. return response.Resource;
  550. }
  551. catch (Exception e)
  552. {
  553. throw new BizException(e.Message);
  554. }
  555. }
  556. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  557. {
  558. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  559. Container container = await InitializeCollection<T>();
  560. string pk = GetPartitionKey<T>();
  561. Type type = typeof(T);
  562. Stopwatch stopwatch = Stopwatch.StartNew();
  563. for (int i = 0; i < pages; i++)
  564. {
  565. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  566. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  567. lists.ForEach(async x =>
  568. {
  569. MemoryStream stream = new MemoryStream();
  570. await JsonSerializer.SerializeAsync(stream, x ,new JsonSerializerOptions { IgnoreNullValues=true});
  571. object o = type.GetProperty(pk).GetValue(x, null);
  572. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  573. itemsToInsert.Add(keyValue);
  574. });
  575. List<Task> tasks = new List<Task>(lists.Count);
  576. itemsToInsert.ForEach(item =>
  577. {
  578. tasks.Add(container.CreateItemStreamAsync(item.Value, item.Key)
  579. .ContinueWith((Task<ResponseMessage> task) =>
  580. {
  581. using (ResponseMessage response = task.Result)
  582. {
  583. if (!response.IsSuccessStatusCode)
  584. {
  585. }
  586. }
  587. }
  588. ));
  589. });
  590. await Task.WhenAll(tasks);
  591. }
  592. stopwatch.Stop();
  593. return enyites;
  594. }
  595. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  596. {
  597. Container container = await InitializeCollection<T>();
  598. ItemResponse<T> response = await container.UpsertItemAsync(item: entity);
  599. return response.Resource;
  600. }
  601. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  602. {
  603. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  604. //{
  605. // Task.WaitAll(Update(item));
  606. //}));
  607. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  608. Container container = await InitializeCollection<T>();
  609. string pk = GetPartitionKey<T>();
  610. Type type = typeof(T);
  611. Stopwatch stopwatch = Stopwatch.StartNew();
  612. for (int i = 0; i < pages; i++)
  613. {
  614. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  615. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  616. lists.ForEach(async x =>
  617. {
  618. MemoryStream stream = new MemoryStream();
  619. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  620. object o = type.GetProperty(pk).GetValue(x, null);
  621. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  622. itemsToInsert.Add(keyValue);
  623. });
  624. List<Task> tasks = new List<Task>(lists.Count);
  625. itemsToInsert.ForEach(item =>
  626. {
  627. tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
  628. .ContinueWith((Task<ResponseMessage> task) =>
  629. {
  630. //using (ResponseMessage response = task.Result)
  631. //{
  632. // if (!response.IsSuccessStatusCode)
  633. // {
  634. // }
  635. //}
  636. }
  637. ));
  638. });
  639. await Task.WhenAll(tasks);
  640. }
  641. stopwatch.Stop();
  642. return enyites;
  643. }
  644. public async Task<T> Update<T>(T entity) where T : ID
  645. {
  646. Container container = await InitializeCollection<T>();
  647. string pk = GetPartitionKey<T>();
  648. object o = typeof(T).GetProperty(pk).GetValue(entity, null);
  649. ItemResponse<T> response = await container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  650. return response.Resource;
  651. }
  652. internal class Item
  653. {
  654. public string id { get; set; }
  655. public string pk { get; set; }
  656. public MemoryStream stream { get; set; }
  657. }
  658. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  659. {
  660. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  661. //{
  662. // Task.WaitAll(Update(item));
  663. //}));
  664. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  665. Container container = await InitializeCollection<T>();
  666. string pk = GetPartitionKey<T>();
  667. Type type = typeof(T);
  668. Stopwatch stopwatch = Stopwatch.StartNew();
  669. for (int i = 0; i < pages; i++)
  670. {
  671. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  672. List<Item> itemsToInsert = new List<Item>();
  673. lists.ForEach(async x =>
  674. {
  675. MemoryStream stream = new MemoryStream();
  676. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  677. object o = type.GetProperty(pk).GetValue(x, null);
  678. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  679. itemsToInsert.Add(keyValue);
  680. });
  681. List<Task> tasks = new List<Task>(lists.Count);
  682. itemsToInsert.ForEach(item =>
  683. {
  684. tasks.Add(container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  685. .ContinueWith((Task<ResponseMessage> task) =>
  686. {
  687. //using (ResponseMessage response = task.Result)
  688. //{
  689. // if (!response.IsSuccessStatusCode)
  690. // {
  691. // }
  692. //}
  693. }
  694. ));
  695. });
  696. await Task.WhenAll(tasks);
  697. }
  698. stopwatch.Stop();
  699. return enyites;
  700. }
  701. //public void Dispose()
  702. //{
  703. // Dispose(true);
  704. //}
  705. //protected virtual void Dispose(bool disposing)
  706. //{
  707. // if (disposing)
  708. // {
  709. // CosmosClient?.Dispose();
  710. // }
  711. //}
  712. public async Task<T> FindById<T>(string id) where T : ID
  713. {
  714. Container container = await InitializeCollection<T>();
  715. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  716. {
  717. QueryText = @"SELECT *
  718. FROM c
  719. WHERE c.id = @id",
  720. Parameters = new Dictionary<string, object>
  721. {
  722. { "@id",id}
  723. }
  724. };
  725. FeedIterator<T> feedIterator = container
  726. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  727. return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
  728. }
  729. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  730. {
  731. Container container = await InitializeCollection<T>();
  732. ItemResponse<T> response = await container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  733. return response.Resource;
  734. }
  735. }
  736. }