AzureCosmosDBRepository.cs 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.Cosmos;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor;
  16. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  17. using System.Threading;
  18. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  19. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  20. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  21. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  22. using TEAMModelOS.SDK.Context.Attributes.Azure;
  23. using System.Text;
  24. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  25. using Microsoft.AspNetCore.Hosting;
  26. using System.Collections.Concurrent;
  27. using DataType = Microsoft.Azure.Documents.DataType;
  28. using RequestOptions = Microsoft.Azure.Documents.Client.RequestOptions;
  29. using PartitionKey = Microsoft.Azure.Documents.PartitionKey;
  30. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  31. { /// <summary>
  32. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  33. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  34. /// </summary>
  35. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  36. {
  37. /// <summary>
  38. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  39. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  40. /// </summary>
  41. private DocumentClient CosmosClient { get; set; }
  42. /// <summary>
  43. /// 线程安全的dict类型
  44. /// </summary>
  45. private ConcurrentDictionary<string, DocumentCollection> DocumentCollectionDict { get; set; } = new ConcurrentDictionary<string, DocumentCollection>();
  46. // private DocumentCollection CosmosCollection { get; set; }
  47. private string Database { get; set; }
  48. private int CollectionThroughput { get; set; }
  49. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  50. {
  51. try
  52. {
  53. if (!string.IsNullOrEmpty(options.ConnectionString))
  54. {
  55. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  56. }
  57. else
  58. {
  59. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  60. }
  61. Database = options.Database;
  62. CollectionThroughput = options.CollectionThroughput;
  63. CosmosClient.CreateDatabaseIfNotExistsAsync(new Microsoft.Azure.Documents.Database { Id = Database });
  64. // _connectionString = options.ConnectionString;
  65. // CosmosSerializer
  66. //获取数据库所有的表
  67. Microsoft.Azure.Documents.Client.FeedResponse<DocumentCollection> collections = CosmosClient.ReadDocumentCollectionFeedAsync(UriFactory.CreateDatabaseUri(Database)).GetAwaiter().GetResult();
  68. foreach (IGrouping<string, DocumentCollection> group in collections.GroupBy(c => c.Id))
  69. {
  70. DocumentCollectionDict.TryAdd(group.Key, group.First());
  71. }
  72. //collections
  73. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(options.ScanModel);
  74. foreach (Type type in types)
  75. {
  76. string PartitionKey = GetPartitionKey(type);
  77. string CollectionName = "";
  78. int RU = 0;
  79. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  80. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  81. {
  82. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  83. }
  84. else
  85. {
  86. CollectionName = type.Name;
  87. }
  88. if (attributes.First<CosmosDBAttribute>().RU > 400)
  89. {
  90. RU = attributes.First<CosmosDBAttribute>().RU;
  91. }
  92. else
  93. {
  94. RU = options.CollectionThroughput;
  95. }
  96. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  97. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  98. {
  99. Offer offer = CosmosClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
  100. OfferV2 offerV2 = (OfferV2)offer;
  101. //更新RU
  102. if (offerV2.Content.OfferThroughput < RU)
  103. {
  104. CosmosClient.ReplaceOfferAsync(new OfferV2(offer, RU));
  105. }
  106. }
  107. else
  108. {
  109. DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
  110. collectionDefinition.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  111. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  112. if (!string.IsNullOrEmpty(PartitionKey))
  113. {
  114. collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
  115. }
  116. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  117. if (RU > CollectionThroughput)
  118. {
  119. CollectionThroughput = RU;
  120. }
  121. DocumentCollection DocumentCollection = CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  122. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new Microsoft.Azure.Documents.Client.RequestOptions { OfferThroughput = CollectionThroughput }).GetAwaiter().GetResult();
  123. DocumentCollectionDict.TryAdd(CollectionName, DocumentCollection);
  124. }
  125. }
  126. }
  127. catch (DocumentClientException de)
  128. {
  129. Exception baseException = de.GetBaseException();
  130. //Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  131. }
  132. catch (Exception e)
  133. {
  134. Exception baseException = e.GetBaseException();
  135. //Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  136. }
  137. finally
  138. {
  139. // Console.WriteLine("End of demo, press any key to exit.");
  140. // Console.ReadKey();
  141. }
  142. }
  143. private async Task<DocumentCollection> InitializeCollection<T>()
  144. {
  145. Type type = typeof(T);
  146. string partitionKey = GetPartitionKey<T>();
  147. string CollectionName;
  148. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  149. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  150. {
  151. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  152. }
  153. else
  154. {
  155. CollectionName = type.Name;
  156. }
  157. return await InitializeCollection(CollectionName, partitionKey);
  158. }
  159. private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
  160. {
  161. /////内存中已经存在这个表则直接返回
  162. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection DocumentCollection))
  163. {
  164. return DocumentCollection;
  165. }///如果没有则尝试默认创建
  166. else
  167. {
  168. DocumentCollection documentCollection = new DocumentCollection { Id = CollectionName };
  169. documentCollection.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  170. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  171. if (!string.IsNullOrEmpty(PartitionKey))
  172. {
  173. documentCollection.PartitionKey.Paths.Add("/" + PartitionKey);
  174. }
  175. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  176. documentCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  177. UriFactory.CreateDatabaseUri(Database), documentCollection, new RequestOptions { OfferThroughput = CollectionThroughput });
  178. DocumentCollectionDict.TryAdd(CollectionName, documentCollection);
  179. return documentCollection;
  180. }
  181. }
  182. private string GetPartitionKey<T>()
  183. {
  184. Type type = typeof(T);
  185. return GetPartitionKey(type);
  186. }
  187. private string GetPartitionKey(Type type)
  188. {
  189. PropertyInfo[] properties = type.GetProperties();
  190. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  191. foreach (PropertyInfo property in properties)
  192. {
  193. if (property.Name.Equals("PartitionKey"))
  194. {
  195. attrProperties.Add(property);
  196. break;
  197. }
  198. object[] attributes = property.GetCustomAttributes(true);
  199. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  200. {
  201. if (attribute is PartitionKeyAttribute)
  202. {
  203. attrProperties.Add(property);
  204. }
  205. }
  206. }
  207. if (attrProperties.Count <= 0)
  208. {
  209. throw new BizException(type.Name +"has no PartitionKey !!!!!!");
  210. }
  211. else
  212. {
  213. if (attrProperties.Count == 1)
  214. {
  215. return attrProperties[0].Name;
  216. }
  217. else { throw new BizException("PartitionKey can only be single!"); }
  218. }
  219. }
  220. public async Task<T> Save<T>(T entity) //where T : object, new()
  221. {
  222. try
  223. {
  224. Type t = typeof(T);
  225. DocumentCollection collection= await InitializeCollection<T>();
  226. ResourceResponse<Document> doc =
  227. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  228. //Console.WriteLine(doc.ActivityId);
  229. return entity;
  230. }
  231. catch (Exception e)
  232. {
  233. throw new BizException(e.Message);
  234. }
  235. }
  236. public async Task<T> Update<T>(T entity)
  237. {
  238. Type t = typeof(T);
  239. DocumentCollection collection = await InitializeCollection<T>();
  240. ResourceResponse<Document> doc =
  241. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  242. return entity;
  243. }
  244. public async Task<string> ReplaceObject<T>(T entity, string key)
  245. {
  246. Type t = typeof(T);
  247. DocumentCollection collection = await InitializeCollection<T>();
  248. try
  249. {
  250. ResourceResponse<Document> doc =
  251. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key), entity);
  252. return key;
  253. }
  254. catch (Exception e)
  255. {
  256. Console.WriteLine("{0} Exception caught.", e);
  257. //return false;
  258. }
  259. return null;
  260. }
  261. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  262. {
  263. Type t = typeof(T);
  264. DocumentCollection collection = await InitializeCollection<T>();
  265. try
  266. {
  267. ResourceResponse<Document> doc =
  268. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key),
  269. entity,
  270. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  271. return key;
  272. }
  273. catch (Exception e)
  274. {
  275. throw new BizException(e.Message);
  276. //Console.WriteLine("{0} Exception caught.", e);
  277. //return false;
  278. }
  279. }
  280. public async Task<List<T>> FindAll<T>()
  281. {
  282. Type t = typeof(T);
  283. Boolean open = true;
  284. List<T> objs = new List<T>();
  285. DocumentCollection collection = await InitializeCollection<T>();
  286. //查询条数 -1是全部
  287. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  288. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), queryOptions).AsDocumentQuery();
  289. while (query.HasMoreResults)
  290. {
  291. foreach (T obj in await query.ExecuteNextAsync())
  292. {
  293. objs.Add(obj);
  294. }
  295. }
  296. return objs;
  297. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  298. }
  299. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  300. {
  301. Type t = typeof(T);
  302. List<T> objs = new List<T>();
  303. DocumentCollection collection = await InitializeCollection<T>();
  304. //查询条数 -1是全部
  305. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  306. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  307. // query.Where();
  308. return objs;
  309. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  310. }
  311. public async Task<List<T>> FindSQL<T>(string sql)
  312. {
  313. Type t = typeof(T);
  314. //List<T> objs = new List<T>();
  315. DocumentCollection collection = await InitializeCollection<T>();
  316. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), sql);
  317. //foreach (var item in query)
  318. //{
  319. // objs.Add(item);
  320. //}
  321. return query.ToList<T>();
  322. }
  323. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  324. {
  325. Type t = typeof(T);
  326. //List<T> objs = new List<T>();
  327. // Boolean open = IsPk;
  328. DocumentCollection collection = await InitializeCollection<T>();
  329. //查询条数 -1是全部
  330. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  331. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql, queryOptions);
  332. //foreach (var item in query)
  333. //{
  334. // objs.Add(item);
  335. //}
  336. return query.ToList<T>();
  337. }
  338. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  339. {
  340. DocumentCollection collection = await InitializeCollection<T>();
  341. Type t = typeof(T);
  342. Boolean open = true;
  343. List<Filter> filters = new List<Filter>();
  344. string PKname = "";
  345. PropertyInfo[] properties = t.GetProperties();
  346. foreach (PropertyInfo property in properties)
  347. {
  348. object[] attributes = property.GetCustomAttributes(true);
  349. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  350. {
  351. if (attribute is PartitionKeyAttribute)
  352. {
  353. PKname = property.Name;
  354. break;
  355. }
  356. }
  357. }
  358. foreach (string key in dict.Keys)
  359. {
  360. //if (t.Name.Equals(key)) {
  361. // open = false;
  362. //}
  363. if (PKname.Equals(key))
  364. {
  365. open = false;
  366. }
  367. filters.Add(new Filter { Contrast = "and", Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  368. }
  369. //List<T> objs = new List<T>();
  370. await InitializeCollection<T>();
  371. //查询条数 -1是全部
  372. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  373. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  374. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  375. return list;
  376. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  377. }
  378. public async Task<string> DeleteAsync<T>(string id)
  379. {
  380. Type t = typeof(T);
  381. DocumentCollection collection = await InitializeCollection<T>();
  382. ResourceResponse<Document> doc =
  383. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, id));
  384. //Console.WriteLine(doc.ActivityId);
  385. return id;
  386. }
  387. public async Task<T> DeleteAsync<T>(T entity)
  388. {
  389. DocumentCollection collection = await InitializeCollection<T>();
  390. Type t = typeof(T);
  391. string PartitionKey = GetPartitionKey<T>();
  392. if (!string.IsNullOrEmpty(PartitionKey))
  393. {
  394. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  395. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  396. ResourceResponse<Document> doc =
  397. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  398. }
  399. else
  400. {
  401. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  402. ResourceResponse<Document> doc =
  403. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue));
  404. }
  405. //Console.WriteLine(doc.ActivityId);
  406. return entity;
  407. }
  408. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  409. {
  410. Type t = typeof(T);
  411. DocumentCollection collection = await InitializeCollection<T>();
  412. ResourceResponse<Document> doc =
  413. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  414. //Console.WriteLine(doc.ActivityId);
  415. return id;
  416. }
  417. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  418. {
  419. DocumentCollection dataCollection = await InitializeCollection<T>();
  420. // Set retry options high for initialization (default values).
  421. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  422. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  423. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  424. await bulkExecutor.InitializeAsync();
  425. // Set retries to 0 to pass control to bulk executor.
  426. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  427. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  428. BulkImportResponse bulkImportResponse = null;
  429. long totalNumberOfDocumentsInserted = 0;
  430. double totalRequestUnitsConsumed = 0;
  431. double totalTimeTakenSec = 0;
  432. var tokenSource = new CancellationTokenSource();
  433. var token = tokenSource.Token;
  434. int pageSize = 100;
  435. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  436. for (int i = 0; i < pages; i++)
  437. {
  438. List<string> documentsToImportInBatch = new List<string>();
  439. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  440. for (int j = 0; j < lists.Count; j++)
  441. {
  442. documentsToImportInBatch.Add(lists[j].ToJson());
  443. }
  444. var tasks = new List<Task>
  445. { Task.Run(async () =>
  446. {
  447. do
  448. {
  449. //try
  450. //{
  451. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  452. documents: documentsToImportInBatch,
  453. enableUpsert: true,
  454. disableAutomaticIdGeneration: true,
  455. maxConcurrencyPerPartitionKeyRange: null,
  456. maxInMemorySortingBatchSize: null,
  457. cancellationToken: token);
  458. //}
  459. //catch (DocumentClientException de)
  460. //{
  461. // break;
  462. //}
  463. //catch (Exception e)
  464. //{
  465. // break;
  466. //}
  467. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  468. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  469. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  470. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  471. },
  472. token)
  473. };
  474. await Task.WhenAll(tasks);
  475. }
  476. return enyites;
  477. }
  478. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  479. {
  480. DocumentCollection dataCollection = await InitializeCollection<T>();
  481. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  482. await bulkExecutor.InitializeAsync();
  483. BulkUpdateResponse bulkUpdateResponse = null;
  484. long totalNumberOfDocumentsUpdated = 0;
  485. double totalRequestUnitsConsumed = 0;
  486. double totalTimeTakenSec = 0;
  487. var tokenSource = new CancellationTokenSource();
  488. var token = tokenSource.Token;
  489. // Generate update operations.
  490. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  491. // Unset the description field.
  492. if (null != updateFilters && updateFilters.Count > 0)
  493. {
  494. var keys = updateFilters.Keys;
  495. foreach (string key in keys)
  496. {
  497. // updateOperations.Add(new SetUpdateOperation<string>())
  498. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  499. {
  500. updateOperations.Add(SwitchType(key, updateFilters[key]));
  501. }
  502. }
  503. }
  504. if (deleteKeys.IsNotEmpty())
  505. {
  506. foreach (string key in deleteKeys)
  507. {
  508. updateOperations.Add(new UnsetUpdateOperation(key));
  509. }
  510. }
  511. List<T> list = await FindByParams<T>(dict);
  512. int pageSize = 100;
  513. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  514. string partitionKey = "/" + GetPartitionKey<T>();
  515. Type type = typeof(T);
  516. for (int i = 0; i < pages; i++)
  517. {
  518. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  519. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  520. for (int j = 0; j < lists.Count; j++)
  521. {
  522. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  523. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  524. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  525. }
  526. var tasks = new List<Task>
  527. { Task.Run(async () =>
  528. {
  529. do
  530. {
  531. //try
  532. //{
  533. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  534. updateItems: updateItemsInBatch,
  535. maxConcurrencyPerPartitionKeyRange: null,
  536. cancellationToken: token);
  537. //}
  538. //catch (DocumentClientException de)
  539. //{
  540. // break;
  541. //}
  542. //catch (Exception e)
  543. //{
  544. // break;
  545. //}
  546. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  547. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  548. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  549. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  550. },
  551. token)
  552. };
  553. await Task.WhenAll(tasks);
  554. }
  555. return list;
  556. }
  557. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  558. {
  559. DocumentCollection dataCollection = await InitializeCollection<T>();
  560. List<T> list = await FindByParams<T>(dict);
  561. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  562. if (list.IsNotEmpty())
  563. {
  564. foreach (T t in list)
  565. {
  566. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  567. pkIdTuplesToDelete.Add(new Tuple<string, string>(id, id));
  568. }
  569. }
  570. else
  571. {
  572. return null;
  573. }
  574. long totalNumberOfDocumentsDeleted = 0;
  575. double totalRequestUnitsConsumed = 0;
  576. double totalTimeTakenSec = 0;
  577. BulkDeleteResponse bulkDeleteResponse = null;
  578. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  579. await bulkExecutor.InitializeAsync();
  580. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  581. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  582. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  583. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  584. return list;
  585. }
  586. private static UpdateOperation SwitchType(string key, object obj)
  587. {
  588. Type s = obj.GetType();
  589. TypeCode typeCode = Type.GetTypeCode(s);
  590. return typeCode switch
  591. {
  592. TypeCode.String => new SetUpdateOperation<string>(key, obj.ToString()),
  593. TypeCode.Int32 => new SetUpdateOperation<Int32>(key, (Int32)obj),
  594. TypeCode.Double => new SetUpdateOperation<Double>(key, (Double)obj),
  595. TypeCode.Byte => new SetUpdateOperation<Byte>(key, (Byte)obj),
  596. TypeCode.Boolean => new SetUpdateOperation<Boolean>(key, (Boolean)obj),
  597. TypeCode.DateTime => new SetUpdateOperation<DateTime>(key, (DateTime)obj),
  598. TypeCode.Int64 => new SetUpdateOperation<Int64>(key, (Int64)obj),
  599. _ => null,
  600. };
  601. }
  602. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk=true)
  603. {
  604. Type t = typeof(T);
  605. // List<T> objs = new List<T>();
  606. DocumentCollection collection= await InitializeCollection<T>();
  607. StringBuilder sql = new StringBuilder("select * from c where 1=1 ");
  608. if (dict != null)
  609. {
  610. foreach (string key in dict.Keys)
  611. {
  612. sql.Append(GenSql(dict[key], key));
  613. }
  614. }
  615. //查询条数 -1是全部
  616. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  617. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql.ToString(), queryOptions);
  618. //foreach (var item in query)
  619. //{
  620. // objs.Add(item);
  621. //}
  622. return query.ToList<T>();
  623. }
  624. private static string GenSql(object obj, string key)
  625. {
  626. Type s = obj.GetType();
  627. TypeCode typeCode = Type.GetTypeCode(s);
  628. return typeCode switch
  629. {
  630. TypeCode.String => " and c." + key + "=" + "'" + obj.ToString() + "'",
  631. TypeCode.Int32 => " and c." + key + "=" + int.Parse(obj.ToString()),
  632. TypeCode.Double => " and c." + key + "=" + double.Parse(obj.ToString()),
  633. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  634. TypeCode.Boolean => " and c." + key + "=" + bool.Parse(obj.ToString()),
  635. TypeCode.DateTime => " and c." + key + "=" + (DateTime)obj,
  636. TypeCode.Int64 => " and c." + key + "=" + long.Parse(obj.ToString()),
  637. _ => null,
  638. };
  639. }
  640. public IQueryable<dynamic> FindByDict(string CollectionName, Dictionary<string, object> dict)
  641. {
  642. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  643. {
  644. // collection = await InitializeCollection(CollectionName, "");
  645. StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
  646. if (dict != null)
  647. {
  648. foreach (string key in dict.Keys)
  649. {
  650. sql.Append(GenSql(dict[key], key));
  651. }
  652. }
  653. FeedOptions queryOptions;
  654. if (collection.PartitionKey.Paths.Count > 0)
  655. {
  656. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  657. }
  658. else
  659. {
  660. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  661. }
  662. //查询条数 -1是全部
  663. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  664. return query;
  665. }
  666. else {
  667. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  668. }
  669. }
  670. }
  671. }