AzureCosmosDBRepository.cs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.CosmosDB.BulkExecutor;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  16. using System.Threading;
  17. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  18. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  19. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  20. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  21. using TEAMModelOS.SDK.Context.Attributes.Azure;
  22. using System.Text;
  23. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  24. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  25. { /// <summary>
  26. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  27. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  28. /// </summary>
  29. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  30. {
  31. /// <summary>
  32. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  33. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  34. /// </summary>
  35. private readonly string china_con = "417A7572654368696E6120202020202020202020202020202020202020202020D63873D37F845F9DC7607B4DF4787EE26598454CE32FB5F2EE778A34A5015736196DF7940C67A034CDD4C4B44CD37C20";
  36. private readonly string china_key = "417A7572654368696E61202020202020202020202020202020202020202020203CAA1DF7E3203F0ABCB2D60C1F3DCB6D90676C4D5467167F6E6A2CB3DDE975EA37B06BBAE6E012936BEDB6D5D60B28B13642F755CB25D1958BE5366EA20FA7C47E04A67B6A96111C61C3270CD0E5539CA45E3A77A6B483F47419BBAEDE75C0F6";
  37. private readonly string global_con = "417A757265476C6F62616C2020202020202020202020202020202020202020200E357979CB69243DBF4E41BF5526830F89AB746007AC68A3DD3F9CFDA781509F1C48B2359120A5E58B8C7B1EDAA99DEA";
  38. private readonly string global_key = "417A757265476C6F62616C2020202020202020202020202020202020202020209FF199D61813D1F4857D55CFB0A7D6A797FECF39A7F47553E9C1AF23674CB04BA95748A4A3C07B90F32E5EF26E0982DBF90001E066432075C434351D73FB387D27A50716D90F414F34A4579D846C27804F658705C05A7224EC4D695FD7A5EE23";
  39. private DocumentClient CosmosClient { get; set; }
  40. private DocumentCollection CosmosCollection { get; set; }
  41. private string Database { get; set; }
  42. private int CollectionThroughput { get; set; }
  43. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  44. {
  45. try
  46. {
  47. if (!string.IsNullOrEmpty(options.ConnectionString))
  48. {
  49. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  50. }
  51. else if (AzureCosmosDBConfig.AZURE_CHINA.Equals(options.AzureTableDialect))
  52. {
  53. AESCrypt crypt = new AESCrypt();
  54. CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(china_con, options.AzureTableDialect), crypt.Decrypt(china_key, options.AzureTableDialect)).GetCosmosDBClient();
  55. }
  56. else if (AzureCosmosDBConfig.AZURE_GLOBAL.Equals(options.AzureTableDialect))
  57. {
  58. AESCrypt crypt = new AESCrypt();
  59. CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(global_con, options.AzureTableDialect), crypt.Decrypt(global_key, options.AzureTableDialect)).GetCosmosDBClient();
  60. }
  61. else
  62. {
  63. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  64. }
  65. Database = options.Database;
  66. CollectionThroughput = options.CollectionThroughput;
  67. CosmosClient.CreateDatabaseIfNotExistsAsync(new Database { Id = Database });
  68. // _connectionString = options.ConnectionString;
  69. //获取数据库所有的表
  70. FeedResponse<DocumentCollection> collections = CosmosClient.ReadDocumentCollectionFeedAsync(UriFactory.CreateDatabaseUri(Database)).GetAwaiter().GetResult();
  71. //collections
  72. // IEnumerable<Type> types= ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>();
  73. }
  74. catch (DocumentClientException de)
  75. {
  76. Exception baseException = de.GetBaseException();
  77. //Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  78. }
  79. catch (Exception e)
  80. {
  81. Exception baseException = e.GetBaseException();
  82. //Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  83. }
  84. finally
  85. {
  86. // Console.WriteLine("End of demo, press any key to exit.");
  87. // Console.ReadKey();
  88. }
  89. }
  90. private async Task<DocumentCollection> InitializeCollection<T>()
  91. {
  92. Type t = typeof(T);
  93. if (CosmosCollection == null || !CosmosCollection.Id.Equals(t.Name))
  94. {
  95. DocumentCollection collectionDefinition = new DocumentCollection { Id = t.Name };
  96. collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  97. string partitionKey = GetPartitionKey<T>();
  98. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  99. if (!string.IsNullOrEmpty(partitionKey))
  100. {
  101. collectionDefinition.PartitionKey.Paths.Add("/" + partitionKey);
  102. }
  103. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  104. CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  105. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }
  106. );
  107. }
  108. return CosmosCollection;
  109. }
  110. private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
  111. {
  112. if (CosmosCollection == null || !CosmosCollection.Id.Equals(CollectionName))
  113. {
  114. DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
  115. collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  116. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  117. if (!string.IsNullOrEmpty(PartitionKey))
  118. {
  119. collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
  120. }
  121. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  122. CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  123. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }
  124. );
  125. }
  126. return CosmosCollection;
  127. }
  128. private string GetPartitionKey<T>()
  129. {
  130. Type type = typeof(T);
  131. PropertyInfo[] properties = type.GetProperties();
  132. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  133. foreach (PropertyInfo property in properties)
  134. {
  135. if (property.Name.Equals("PartitionKey"))
  136. {
  137. attrProperties.Add(property);
  138. break;
  139. }
  140. object[] attributes = property.GetCustomAttributes(true);
  141. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  142. {
  143. if (attribute is PartitionKeyAttribute)
  144. {
  145. attrProperties.Add(property);
  146. }
  147. }
  148. }
  149. if (attrProperties.Count <= 0)
  150. {
  151. return null;
  152. }
  153. else
  154. {
  155. if (attrProperties.Count == 1)
  156. {
  157. return attrProperties[0].Name;
  158. }
  159. else { throw new BizException("PartitionKey can only be single!"); }
  160. }
  161. }
  162. public async Task<T> Save<T>(T entity) //where T : object, new()
  163. {
  164. try
  165. {
  166. Type t = typeof(T);
  167. DocumentCollection documentCollection = await InitializeCollection<T>();
  168. ResourceResponse<Document> doc =
  169. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  170. //Console.WriteLine(doc.ActivityId);
  171. return entity;
  172. }
  173. catch (Exception e)
  174. {
  175. throw new BizException(e.Message);
  176. }
  177. }
  178. public async Task<T> Update<T>(T entity)
  179. {
  180. Type t = typeof(T);
  181. await InitializeCollection<T>();
  182. ResourceResponse<Document> doc =
  183. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  184. return entity;
  185. }
  186. public async Task<string> ReplaceObject<T>(T entity, string key)
  187. {
  188. Type t = typeof(T);
  189. await InitializeCollection<T>();
  190. try
  191. {
  192. ResourceResponse<Document> doc =
  193. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key), entity);
  194. return key;
  195. }
  196. catch (Exception e)
  197. {
  198. Console.WriteLine("{0} Exception caught.", e);
  199. //return false;
  200. }
  201. return null;
  202. }
  203. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  204. {
  205. Type t = typeof(T);
  206. await InitializeCollection<T>();
  207. try
  208. {
  209. ResourceResponse<Document> doc =
  210. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key),
  211. entity,
  212. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  213. return key;
  214. }
  215. catch (Exception e)
  216. {
  217. throw new BizException(e.Message);
  218. //Console.WriteLine("{0} Exception caught.", e);
  219. //return false;
  220. }
  221. }
  222. public async Task<List<T>> FindAll<T>()
  223. {
  224. Type t = typeof(T);
  225. Boolean open = true;
  226. List<T> objs = new List<T>();
  227. //await InitializeCollection<T>();
  228. //查询条数 -1是全部
  229. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  230. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions).AsDocumentQuery();
  231. while (query.HasMoreResults)
  232. {
  233. foreach (T obj in await query.ExecuteNextAsync())
  234. {
  235. objs.Add(obj);
  236. }
  237. }
  238. return objs;
  239. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  240. }
  241. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  242. {
  243. Type t = typeof(T);
  244. List<T> objs = new List<T>();
  245. await InitializeCollection<T>();
  246. //查询条数 -1是全部
  247. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  248. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  249. // query.Where();
  250. return objs;
  251. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  252. }
  253. public async Task<List<T>> FindSQL<T>(string sql)
  254. {
  255. Type t = typeof(T);
  256. //List<T> objs = new List<T>();
  257. await InitializeCollection<T>();
  258. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql);
  259. //foreach (var item in query)
  260. //{
  261. // objs.Add(item);
  262. //}
  263. return query.ToList<T>();
  264. }
  265. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  266. {
  267. Type t = typeof(T);
  268. //List<T> objs = new List<T>();
  269. // Boolean open = IsPk;
  270. await InitializeCollection<T>();
  271. //查询条数 -1是全部
  272. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  273. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql, queryOptions);
  274. //foreach (var item in query)
  275. //{
  276. // objs.Add(item);
  277. //}
  278. return query.ToList<T>();
  279. }
  280. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  281. {
  282. //await InitializeCollection<T>();
  283. Type t = typeof(T);
  284. Boolean open = true;
  285. List<Filter> filters = new List<Filter>();
  286. string PKname = "";
  287. PropertyInfo[] properties = t.GetProperties();
  288. foreach (PropertyInfo property in properties)
  289. {
  290. object[] attributes = property.GetCustomAttributes(true);
  291. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  292. {
  293. if (attribute is PartitionKeyAttribute)
  294. {
  295. PKname = property.Name;
  296. break;
  297. }
  298. }
  299. }
  300. foreach (string key in dict.Keys)
  301. {
  302. //if (t.Name.Equals(key)) {
  303. // open = false;
  304. //}
  305. if (PKname.Equals(key))
  306. {
  307. open = false;
  308. }
  309. filters.Add(new Filter {Contrast="and", Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  310. }
  311. //List<T> objs = new List<T>();
  312. await InitializeCollection<T>();
  313. //查询条数 -1是全部
  314. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  315. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  316. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  317. return list;
  318. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  319. }
  320. public async Task<string> DeleteAsync<T>(string id)
  321. {
  322. Type t = typeof(T);
  323. await InitializeCollection<T>();
  324. ResourceResponse<Document> doc =
  325. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id));
  326. //Console.WriteLine(doc.ActivityId);
  327. return id;
  328. }
  329. public async Task<T> DeleteAsync<T>(T entity)
  330. {
  331. await InitializeCollection<T>();
  332. Type t = typeof(T);
  333. string PartitionKey = GetPartitionKey<T>();
  334. if (!string.IsNullOrEmpty(PartitionKey))
  335. {
  336. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  337. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  338. ResourceResponse<Document> doc =
  339. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  340. }
  341. else
  342. {
  343. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  344. ResourceResponse<Document> doc =
  345. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue));
  346. }
  347. //Console.WriteLine(doc.ActivityId);
  348. return entity;
  349. }
  350. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  351. {
  352. Type t = typeof(T);
  353. await InitializeCollection<T>();
  354. ResourceResponse<Document> doc =
  355. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  356. //Console.WriteLine(doc.ActivityId);
  357. return id;
  358. }
  359. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  360. {
  361. DocumentCollection dataCollection = await InitializeCollection<T>();
  362. // Set retry options high for initialization (default values).
  363. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  364. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  365. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  366. await bulkExecutor.InitializeAsync();
  367. // Set retries to 0 to pass control to bulk executor.
  368. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  369. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  370. BulkImportResponse bulkImportResponse = null;
  371. long totalNumberOfDocumentsInserted = 0;
  372. double totalRequestUnitsConsumed = 0;
  373. double totalTimeTakenSec = 0;
  374. var tokenSource = new CancellationTokenSource();
  375. var token = tokenSource.Token;
  376. int pageSize = 100;
  377. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  378. for (int i = 0; i < pages; i++)
  379. {
  380. List<string> documentsToImportInBatch = new List<string>();
  381. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  382. for (int j = 0; j < lists.Count; j++)
  383. {
  384. documentsToImportInBatch.Add(lists[j].ToJson());
  385. }
  386. var tasks = new List<Task>
  387. { Task.Run(async () =>
  388. {
  389. do
  390. {
  391. //try
  392. //{
  393. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  394. documents: documentsToImportInBatch,
  395. enableUpsert: true,
  396. disableAutomaticIdGeneration: true,
  397. maxConcurrencyPerPartitionKeyRange: null,
  398. maxInMemorySortingBatchSize: null,
  399. cancellationToken: token);
  400. //}
  401. //catch (DocumentClientException de)
  402. //{
  403. // break;
  404. //}
  405. //catch (Exception e)
  406. //{
  407. // break;
  408. //}
  409. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  410. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  411. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  412. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  413. },
  414. token)
  415. };
  416. await Task.WhenAll(tasks);
  417. }
  418. return enyites;
  419. }
  420. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  421. {
  422. DocumentCollection dataCollection = await InitializeCollection<T>();
  423. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  424. await bulkExecutor.InitializeAsync();
  425. BulkUpdateResponse bulkUpdateResponse = null;
  426. long totalNumberOfDocumentsUpdated = 0;
  427. double totalRequestUnitsConsumed = 0;
  428. double totalTimeTakenSec = 0;
  429. var tokenSource = new CancellationTokenSource();
  430. var token = tokenSource.Token;
  431. // Generate update operations.
  432. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  433. // Unset the description field.
  434. if (null != updateFilters && updateFilters.Count > 0)
  435. {
  436. var keys = updateFilters.Keys;
  437. foreach (string key in keys)
  438. {
  439. // updateOperations.Add(new SetUpdateOperation<string>())
  440. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  441. {
  442. updateOperations.Add(SwitchType(key, updateFilters[key]));
  443. }
  444. }
  445. }
  446. if (deleteKeys.IsNotEmpty())
  447. {
  448. foreach (string key in deleteKeys)
  449. {
  450. updateOperations.Add(new UnsetUpdateOperation(key));
  451. }
  452. }
  453. List<T> list = await FindByParams<T>(dict);
  454. int pageSize = 100;
  455. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  456. string partitionKey = "/" + GetPartitionKey<T>();
  457. Type type = typeof(T);
  458. for (int i = 0; i < pages; i++)
  459. {
  460. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  461. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  462. for (int j = 0; j < lists.Count; j++)
  463. {
  464. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  465. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  466. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  467. }
  468. var tasks = new List<Task>
  469. { Task.Run(async () =>
  470. {
  471. do
  472. {
  473. //try
  474. //{
  475. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  476. updateItems: updateItemsInBatch,
  477. maxConcurrencyPerPartitionKeyRange: null,
  478. cancellationToken: token);
  479. //}
  480. //catch (DocumentClientException de)
  481. //{
  482. // break;
  483. //}
  484. //catch (Exception e)
  485. //{
  486. // break;
  487. //}
  488. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  489. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  490. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  491. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  492. },
  493. token)
  494. };
  495. await Task.WhenAll(tasks);
  496. }
  497. return list;
  498. }
  499. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  500. {
  501. DocumentCollection dataCollection = await InitializeCollection<T>();
  502. List<T> list = await FindByParams<T>(dict);
  503. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  504. if (list.IsNotEmpty())
  505. {
  506. foreach (T t in list)
  507. {
  508. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  509. pkIdTuplesToDelete.Add(new Tuple<string, string>(id, id));
  510. }
  511. }
  512. else
  513. {
  514. return null;
  515. }
  516. long totalNumberOfDocumentsDeleted = 0;
  517. double totalRequestUnitsConsumed = 0;
  518. double totalTimeTakenSec = 0;
  519. BulkDeleteResponse bulkDeleteResponse = null;
  520. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  521. await bulkExecutor.InitializeAsync();
  522. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  523. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  524. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  525. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  526. return list;
  527. }
  528. private static UpdateOperation SwitchType(string key, object obj)
  529. {
  530. Type s = obj.GetType();
  531. TypeCode typeCode = Type.GetTypeCode(s);
  532. switch (typeCode)
  533. {
  534. case TypeCode.String: return new SetUpdateOperation<string>(key, obj.ToString());
  535. case TypeCode.Int32: return new SetUpdateOperation<Int32>(key, (Int32)obj);
  536. case TypeCode.Double: return new SetUpdateOperation<Double>(key, (Double)obj);
  537. case TypeCode.Byte: return new SetUpdateOperation<Byte>(key, (Byte)obj);
  538. case TypeCode.Boolean: return new SetUpdateOperation<Boolean>(key, (Boolean)obj);
  539. case TypeCode.DateTime: return new SetUpdateOperation<DateTime>(key, (DateTime)obj);
  540. case TypeCode.Int64: return new SetUpdateOperation<Int64>(key, (Int64)obj);
  541. default: return null;
  542. }
  543. }
  544. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk)
  545. {
  546. Type t = typeof(T);
  547. // List<T> objs = new List<T>();
  548. await InitializeCollection<T>();
  549. StringBuilder sql = new StringBuilder("select * from c where 1=1 ");
  550. if (dict != null)
  551. {
  552. foreach (string key in dict.Keys)
  553. {
  554. sql.Append(GenSql(dict[key], key));
  555. }
  556. }
  557. //查询条数 -1是全部
  558. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  559. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql.ToString(), queryOptions);
  560. //foreach (var item in query)
  561. //{
  562. // objs.Add(item);
  563. //}
  564. return query.ToList<T>();
  565. }
  566. private static string GenSql(object obj, string key)
  567. {
  568. Type s = obj.GetType();
  569. TypeCode typeCode = Type.GetTypeCode(s);
  570. return typeCode switch
  571. {
  572. TypeCode.String => "and c." + key + "=" + "'" + obj.ToString() + "'",
  573. TypeCode.Int32 => "and c." + key + "=" + int.Parse(obj.ToString()),
  574. TypeCode.Double => "and c." + key + "=" + double.Parse(obj.ToString()),
  575. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  576. TypeCode.Boolean => "and c." + key + "=" + bool.Parse(obj.ToString()),
  577. TypeCode.DateTime => "and c." + key + "=" + (DateTime)obj,
  578. TypeCode.Int64 => "and c." + key + "=" + long.Parse(obj.ToString()),
  579. _ => null,
  580. };
  581. }
  582. public async Task<IQueryable<dynamic>> FindByDict(string CollectionName, string PartitionKey, Dictionary<string, object> dict) {
  583. await InitializeCollection(CollectionName, PartitionKey);
  584. StringBuilder sql = new StringBuilder("select * from "+ CollectionName + " c where 1=1 ");
  585. if (dict != null)
  586. {
  587. foreach (string key in dict.Keys)
  588. {
  589. sql.Append(GenSql(dict[key], key));
  590. }
  591. }
  592. FeedOptions queryOptions;
  593. if (!string.IsNullOrEmpty(PartitionKey))
  594. {
  595. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  596. }
  597. else {
  598. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  599. }
  600. //查询条数 -1是全部
  601. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  602. return query;
  603. }
  604. }
  605. }