AzureCosmosDBRepository.cs 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.Cosmos;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor;
  16. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  17. using System.Threading;
  18. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  19. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  20. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  21. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  22. using TEAMModelOS.SDK.Context.Attributes.Azure;
  23. using System.Text;
  24. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  25. using Microsoft.AspNetCore.Hosting;
  26. using System.Collections.Concurrent;
  27. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  28. { /// <summary>
  29. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  30. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  31. /// </summary>
  32. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  33. {
  34. /// <summary>
  35. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  36. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  37. /// </summary>
  38. private DocumentClient CosmosClient { get; set; }
  39. /// <summary>
  40. /// 线程安全的dict类型
  41. /// </summary>
  42. private ConcurrentDictionary<string, DocumentCollection> DocumentCollectionDict { get; set; } = new ConcurrentDictionary<string, DocumentCollection>();
  43. // private DocumentCollection CosmosCollection { get; set; }
  44. private string Database { get; set; }
  45. private int CollectionThroughput { get; set; }
  46. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  47. {
  48. try
  49. {
  50. if (!string.IsNullOrEmpty(options.ConnectionString))
  51. {
  52. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  53. }
  54. else
  55. {
  56. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  57. }
  58. Database = options.Database;
  59. CollectionThroughput = options.CollectionThroughput;
  60. CosmosClient.CreateDatabaseIfNotExistsAsync(new Database { Id = Database });
  61. // _connectionString = options.ConnectionString;
  62. // CosmosSerializer
  63. //获取数据库所有的表
  64. FeedResponse<DocumentCollection> collections = CosmosClient.ReadDocumentCollectionFeedAsync(UriFactory.CreateDatabaseUri(Database)).GetAwaiter().GetResult();
  65. foreach (IGrouping<string, DocumentCollection> group in collections.GroupBy(c => c.Id))
  66. {
  67. DocumentCollectionDict.TryAdd(group.Key, group.First());
  68. }
  69. //collections
  70. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(options.ScanModel);
  71. foreach (Type type in types)
  72. {
  73. string PartitionKey = GetPartitionKey(type);
  74. string CollectionName = "";
  75. int RU = 0;
  76. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  77. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  78. {
  79. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  80. }
  81. else
  82. {
  83. CollectionName = type.Name;
  84. }
  85. if (attributes.First<CosmosDBAttribute>().RU > 400)
  86. {
  87. RU = attributes.First<CosmosDBAttribute>().RU;
  88. }
  89. else
  90. {
  91. RU = options.CollectionThroughput;
  92. }
  93. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  94. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  95. {
  96. Offer offer = CosmosClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
  97. OfferV2 offerV2 = (OfferV2)offer;
  98. //更新RU
  99. if (offerV2.Content.OfferThroughput < RU)
  100. {
  101. CosmosClient.ReplaceOfferAsync(new OfferV2(offer, RU));
  102. }
  103. }
  104. else
  105. {
  106. DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
  107. collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  108. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  109. if (!string.IsNullOrEmpty(PartitionKey))
  110. {
  111. collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
  112. }
  113. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  114. if (RU > CollectionThroughput)
  115. {
  116. CollectionThroughput = RU;
  117. }
  118. DocumentCollection DocumentCollection = CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  119. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }).GetAwaiter().GetResult();
  120. DocumentCollectionDict.TryAdd(CollectionName, DocumentCollection);
  121. }
  122. }
  123. }
  124. catch (DocumentClientException de)
  125. {
  126. Exception baseException = de.GetBaseException();
  127. //Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  128. }
  129. catch (Exception e)
  130. {
  131. Exception baseException = e.GetBaseException();
  132. //Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  133. }
  134. finally
  135. {
  136. // Console.WriteLine("End of demo, press any key to exit.");
  137. // Console.ReadKey();
  138. }
  139. }
  140. private async Task<DocumentCollection> InitializeCollection<T>()
  141. {
  142. Type type = typeof(T);
  143. string partitionKey = GetPartitionKey<T>();
  144. string CollectionName;
  145. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  146. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  147. {
  148. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  149. }
  150. else
  151. {
  152. CollectionName = type.Name;
  153. }
  154. return await InitializeCollection(CollectionName, partitionKey);
  155. }
  156. private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
  157. {
  158. /////内存中已经存在这个表则直接返回
  159. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection DocumentCollection))
  160. {
  161. return DocumentCollection;
  162. }///如果没有则尝试默认创建
  163. else
  164. {
  165. DocumentCollection documentCollection = new DocumentCollection { Id = CollectionName };
  166. documentCollection.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  167. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  168. if (!string.IsNullOrEmpty(PartitionKey))
  169. {
  170. documentCollection.PartitionKey.Paths.Add("/" + PartitionKey);
  171. }
  172. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  173. documentCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  174. UriFactory.CreateDatabaseUri(Database), documentCollection, new RequestOptions { OfferThroughput = CollectionThroughput });
  175. DocumentCollectionDict.TryAdd(CollectionName, documentCollection);
  176. return documentCollection;
  177. }
  178. }
  179. private string GetPartitionKey<T>()
  180. {
  181. Type type = typeof(T);
  182. return GetPartitionKey(type);
  183. }
  184. private string GetPartitionKey(Type type)
  185. {
  186. PropertyInfo[] properties = type.GetProperties();
  187. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  188. foreach (PropertyInfo property in properties)
  189. {
  190. if (property.Name.Equals("PartitionKey"))
  191. {
  192. attrProperties.Add(property);
  193. break;
  194. }
  195. object[] attributes = property.GetCustomAttributes(true);
  196. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  197. {
  198. if (attribute is PartitionKeyAttribute)
  199. {
  200. attrProperties.Add(property);
  201. }
  202. }
  203. }
  204. if (attrProperties.Count <= 0)
  205. {
  206. throw new BizException(type.Name +"has no PartitionKey !!!!!!");
  207. }
  208. else
  209. {
  210. if (attrProperties.Count == 1)
  211. {
  212. return attrProperties[0].Name;
  213. }
  214. else { throw new BizException("PartitionKey can only be single!"); }
  215. }
  216. }
  217. public async Task<T> Save<T>(T entity) //where T : object, new()
  218. {
  219. try
  220. {
  221. Type t = typeof(T);
  222. DocumentCollection collection= await InitializeCollection<T>();
  223. ResourceResponse<Document> doc =
  224. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  225. //Console.WriteLine(doc.ActivityId);
  226. return entity;
  227. }
  228. catch (Exception e)
  229. {
  230. throw new BizException(e.Message);
  231. }
  232. }
  233. public async Task<T> Update<T>(T entity)
  234. {
  235. Type t = typeof(T);
  236. DocumentCollection collection = await InitializeCollection<T>();
  237. ResourceResponse<Document> doc =
  238. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  239. return entity;
  240. }
  241. public async Task<string> ReplaceObject<T>(T entity, string key)
  242. {
  243. Type t = typeof(T);
  244. DocumentCollection collection = await InitializeCollection<T>();
  245. try
  246. {
  247. ResourceResponse<Document> doc =
  248. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key), entity);
  249. return key;
  250. }
  251. catch (Exception e)
  252. {
  253. Console.WriteLine("{0} Exception caught.", e);
  254. //return false;
  255. }
  256. return null;
  257. }
  258. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  259. {
  260. Type t = typeof(T);
  261. DocumentCollection collection = await InitializeCollection<T>();
  262. try
  263. {
  264. ResourceResponse<Document> doc =
  265. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key),
  266. entity,
  267. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  268. return key;
  269. }
  270. catch (Exception e)
  271. {
  272. throw new BizException(e.Message);
  273. //Console.WriteLine("{0} Exception caught.", e);
  274. //return false;
  275. }
  276. }
  277. public async Task<List<T>> FindAll<T>()
  278. {
  279. Type t = typeof(T);
  280. Boolean open = true;
  281. List<T> objs = new List<T>();
  282. DocumentCollection collection = await InitializeCollection<T>();
  283. //查询条数 -1是全部
  284. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  285. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), queryOptions).AsDocumentQuery();
  286. while (query.HasMoreResults)
  287. {
  288. foreach (T obj in await query.ExecuteNextAsync())
  289. {
  290. objs.Add(obj);
  291. }
  292. }
  293. return objs;
  294. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  295. }
  296. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  297. {
  298. Type t = typeof(T);
  299. List<T> objs = new List<T>();
  300. DocumentCollection collection = await InitializeCollection<T>();
  301. //查询条数 -1是全部
  302. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  303. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  304. // query.Where();
  305. return objs;
  306. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  307. }
  308. public async Task<List<T>> FindSQL<T>(string sql)
  309. {
  310. Type t = typeof(T);
  311. //List<T> objs = new List<T>();
  312. DocumentCollection collection = await InitializeCollection<T>();
  313. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), sql);
  314. //foreach (var item in query)
  315. //{
  316. // objs.Add(item);
  317. //}
  318. return query.ToList<T>();
  319. }
  320. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  321. {
  322. Type t = typeof(T);
  323. //List<T> objs = new List<T>();
  324. // Boolean open = IsPk;
  325. DocumentCollection collection = await InitializeCollection<T>();
  326. //查询条数 -1是全部
  327. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  328. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql, queryOptions);
  329. //foreach (var item in query)
  330. //{
  331. // objs.Add(item);
  332. //}
  333. return query.ToList<T>();
  334. }
  335. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  336. {
  337. DocumentCollection collection = await InitializeCollection<T>();
  338. Type t = typeof(T);
  339. Boolean open = true;
  340. List<Filter> filters = new List<Filter>();
  341. string PKname = "";
  342. PropertyInfo[] properties = t.GetProperties();
  343. foreach (PropertyInfo property in properties)
  344. {
  345. object[] attributes = property.GetCustomAttributes(true);
  346. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  347. {
  348. if (attribute is PartitionKeyAttribute)
  349. {
  350. PKname = property.Name;
  351. break;
  352. }
  353. }
  354. }
  355. foreach (string key in dict.Keys)
  356. {
  357. //if (t.Name.Equals(key)) {
  358. // open = false;
  359. //}
  360. if (PKname.Equals(key))
  361. {
  362. open = false;
  363. }
  364. filters.Add(new Filter { Contrast = "and", Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  365. }
  366. //List<T> objs = new List<T>();
  367. await InitializeCollection<T>();
  368. //查询条数 -1是全部
  369. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  370. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  371. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  372. return list;
  373. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  374. }
  375. public async Task<string> DeleteAsync<T>(string id)
  376. {
  377. Type t = typeof(T);
  378. DocumentCollection collection = await InitializeCollection<T>();
  379. ResourceResponse<Document> doc =
  380. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, id));
  381. //Console.WriteLine(doc.ActivityId);
  382. return id;
  383. }
  384. public async Task<T> DeleteAsync<T>(T entity)
  385. {
  386. DocumentCollection collection = await InitializeCollection<T>();
  387. Type t = typeof(T);
  388. string PartitionKey = GetPartitionKey<T>();
  389. if (!string.IsNullOrEmpty(PartitionKey))
  390. {
  391. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  392. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  393. ResourceResponse<Document> doc =
  394. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  395. }
  396. else
  397. {
  398. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  399. ResourceResponse<Document> doc =
  400. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue));
  401. }
  402. //Console.WriteLine(doc.ActivityId);
  403. return entity;
  404. }
  405. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  406. {
  407. Type t = typeof(T);
  408. DocumentCollection collection = await InitializeCollection<T>();
  409. ResourceResponse<Document> doc =
  410. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  411. //Console.WriteLine(doc.ActivityId);
  412. return id;
  413. }
  414. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  415. {
  416. DocumentCollection dataCollection = await InitializeCollection<T>();
  417. // Set retry options high for initialization (default values).
  418. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  419. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  420. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  421. await bulkExecutor.InitializeAsync();
  422. // Set retries to 0 to pass control to bulk executor.
  423. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  424. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  425. BulkImportResponse bulkImportResponse = null;
  426. long totalNumberOfDocumentsInserted = 0;
  427. double totalRequestUnitsConsumed = 0;
  428. double totalTimeTakenSec = 0;
  429. var tokenSource = new CancellationTokenSource();
  430. var token = tokenSource.Token;
  431. int pageSize = 100;
  432. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  433. for (int i = 0; i < pages; i++)
  434. {
  435. List<string> documentsToImportInBatch = new List<string>();
  436. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  437. for (int j = 0; j < lists.Count; j++)
  438. {
  439. documentsToImportInBatch.Add(lists[j].ToJson());
  440. }
  441. var tasks = new List<Task>
  442. { Task.Run(async () =>
  443. {
  444. do
  445. {
  446. //try
  447. //{
  448. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  449. documents: documentsToImportInBatch,
  450. enableUpsert: true,
  451. disableAutomaticIdGeneration: true,
  452. maxConcurrencyPerPartitionKeyRange: null,
  453. maxInMemorySortingBatchSize: null,
  454. cancellationToken: token);
  455. //}
  456. //catch (DocumentClientException de)
  457. //{
  458. // break;
  459. //}
  460. //catch (Exception e)
  461. //{
  462. // break;
  463. //}
  464. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  465. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  466. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  467. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  468. },
  469. token)
  470. };
  471. await Task.WhenAll(tasks);
  472. }
  473. return enyites;
  474. }
  475. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  476. {
  477. DocumentCollection dataCollection = await InitializeCollection<T>();
  478. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  479. await bulkExecutor.InitializeAsync();
  480. BulkUpdateResponse bulkUpdateResponse = null;
  481. long totalNumberOfDocumentsUpdated = 0;
  482. double totalRequestUnitsConsumed = 0;
  483. double totalTimeTakenSec = 0;
  484. var tokenSource = new CancellationTokenSource();
  485. var token = tokenSource.Token;
  486. // Generate update operations.
  487. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  488. // Unset the description field.
  489. if (null != updateFilters && updateFilters.Count > 0)
  490. {
  491. var keys = updateFilters.Keys;
  492. foreach (string key in keys)
  493. {
  494. // updateOperations.Add(new SetUpdateOperation<string>())
  495. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  496. {
  497. updateOperations.Add(SwitchType(key, updateFilters[key]));
  498. }
  499. }
  500. }
  501. if (deleteKeys.IsNotEmpty())
  502. {
  503. foreach (string key in deleteKeys)
  504. {
  505. updateOperations.Add(new UnsetUpdateOperation(key));
  506. }
  507. }
  508. List<T> list = await FindByParams<T>(dict);
  509. int pageSize = 100;
  510. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  511. string partitionKey = "/" + GetPartitionKey<T>();
  512. Type type = typeof(T);
  513. for (int i = 0; i < pages; i++)
  514. {
  515. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  516. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  517. for (int j = 0; j < lists.Count; j++)
  518. {
  519. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  520. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  521. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  522. }
  523. var tasks = new List<Task>
  524. { Task.Run(async () =>
  525. {
  526. do
  527. {
  528. //try
  529. //{
  530. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  531. updateItems: updateItemsInBatch,
  532. maxConcurrencyPerPartitionKeyRange: null,
  533. cancellationToken: token);
  534. //}
  535. //catch (DocumentClientException de)
  536. //{
  537. // break;
  538. //}
  539. //catch (Exception e)
  540. //{
  541. // break;
  542. //}
  543. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  544. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  545. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  546. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  547. },
  548. token)
  549. };
  550. await Task.WhenAll(tasks);
  551. }
  552. return list;
  553. }
  554. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  555. {
  556. DocumentCollection dataCollection = await InitializeCollection<T>();
  557. List<T> list = await FindByParams<T>(dict);
  558. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  559. if (list.IsNotEmpty())
  560. {
  561. foreach (T t in list)
  562. {
  563. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  564. pkIdTuplesToDelete.Add(new Tuple<string, string>(id, id));
  565. }
  566. }
  567. else
  568. {
  569. return null;
  570. }
  571. long totalNumberOfDocumentsDeleted = 0;
  572. double totalRequestUnitsConsumed = 0;
  573. double totalTimeTakenSec = 0;
  574. BulkDeleteResponse bulkDeleteResponse = null;
  575. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  576. await bulkExecutor.InitializeAsync();
  577. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  578. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  579. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  580. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  581. return list;
  582. }
  583. private static UpdateOperation SwitchType(string key, object obj)
  584. {
  585. Type s = obj.GetType();
  586. TypeCode typeCode = Type.GetTypeCode(s);
  587. return typeCode switch
  588. {
  589. TypeCode.String => new SetUpdateOperation<string>(key, obj.ToString()),
  590. TypeCode.Int32 => new SetUpdateOperation<Int32>(key, (Int32)obj),
  591. TypeCode.Double => new SetUpdateOperation<Double>(key, (Double)obj),
  592. TypeCode.Byte => new SetUpdateOperation<Byte>(key, (Byte)obj),
  593. TypeCode.Boolean => new SetUpdateOperation<Boolean>(key, (Boolean)obj),
  594. TypeCode.DateTime => new SetUpdateOperation<DateTime>(key, (DateTime)obj),
  595. TypeCode.Int64 => new SetUpdateOperation<Int64>(key, (Int64)obj),
  596. _ => null,
  597. };
  598. }
  599. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk)
  600. {
  601. Type t = typeof(T);
  602. // List<T> objs = new List<T>();
  603. DocumentCollection collection= await InitializeCollection<T>();
  604. StringBuilder sql = new StringBuilder("select * from c where 1=1 ");
  605. if (dict != null)
  606. {
  607. foreach (string key in dict.Keys)
  608. {
  609. sql.Append(GenSql(dict[key], key));
  610. }
  611. }
  612. //查询条数 -1是全部
  613. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  614. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql.ToString(), queryOptions);
  615. //foreach (var item in query)
  616. //{
  617. // objs.Add(item);
  618. //}
  619. return query.ToList<T>();
  620. }
  621. private static string GenSql(object obj, string key)
  622. {
  623. Type s = obj.GetType();
  624. TypeCode typeCode = Type.GetTypeCode(s);
  625. return typeCode switch
  626. {
  627. TypeCode.String => "and c." + key + "=" + "'" + obj.ToString() + "'",
  628. TypeCode.Int32 => "and c." + key + "=" + int.Parse(obj.ToString()),
  629. TypeCode.Double => "and c." + key + "=" + double.Parse(obj.ToString()),
  630. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  631. TypeCode.Boolean => "and c." + key + "=" + bool.Parse(obj.ToString()),
  632. TypeCode.DateTime => "and c." + key + "=" + (DateTime)obj,
  633. TypeCode.Int64 => "and c." + key + "=" + long.Parse(obj.ToString()),
  634. _ => null,
  635. };
  636. }
  637. public async Task<IQueryable<dynamic>> FindByDict(string CollectionName, string PartitionKey, Dictionary<string, object> dict)
  638. {
  639. if (DocumentCollectionDict.TryGetValue(CollectionName, out _))
  640. {
  641. await InitializeCollection(CollectionName, PartitionKey);
  642. StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
  643. if (dict != null)
  644. {
  645. foreach (string key in dict.Keys)
  646. {
  647. sql.Append(GenSql(dict[key], key));
  648. }
  649. }
  650. FeedOptions queryOptions;
  651. if (!string.IsNullOrEmpty(PartitionKey))
  652. {
  653. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  654. }
  655. else
  656. {
  657. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  658. }
  659. //查询条数 -1是全部
  660. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  661. return query;
  662. }
  663. else {
  664. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  665. }
  666. }
  667. }
  668. }