AzureCosmosDBRepository.cs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.CosmosDB.BulkExecutor;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  16. using System.Threading;
  17. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  18. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  19. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  20. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  21. using TEAMModelOS.SDK.Context.Attributes.Azure;
  22. using System.Text;
  23. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  24. { /// <summary>
  25. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  26. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  27. /// </summary>
  28. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  29. {
  30. /// <summary>
  31. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  32. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  33. /// </summary>
  34. private readonly string china_con = "417A7572654368696E6120202020202020202020202020202020202020202020D63873D37F845F9DC7607B4DF4787EE26598454CE32FB5F2EE778A34A5015736196DF7940C67A034CDD4C4B44CD37C20";
  35. private readonly string china_key = "417A7572654368696E61202020202020202020202020202020202020202020203CAA1DF7E3203F0ABCB2D60C1F3DCB6D90676C4D5467167F6E6A2CB3DDE975EA37B06BBAE6E012936BEDB6D5D60B28B13642F755CB25D1958BE5366EA20FA7C47E04A67B6A96111C61C3270CD0E5539CA45E3A77A6B483F47419BBAEDE75C0F6";
  36. private readonly string global_con = "417A757265476C6F62616C2020202020202020202020202020202020202020200E357979CB69243DBF4E41BF5526830F89AB746007AC68A3DD3F9CFDA781509F1C48B2359120A5E58B8C7B1EDAA99DEA";
  37. private readonly string global_key = "417A757265476C6F62616C2020202020202020202020202020202020202020209FF199D61813D1F4857D55CFB0A7D6A797FECF39A7F47553E9C1AF23674CB04BA95748A4A3C07B90F32E5EF26E0982DBF90001E066432075C434351D73FB387D27A50716D90F414F34A4579D846C27804F658705C05A7224EC4D695FD7A5EE23";
  38. private DocumentClient CosmosClient { get; set; }
  39. private DocumentCollection CosmosCollection { get; set; }
  40. private string Database { get; set; }
  41. private int CollectionThroughput { get; set; }
  42. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  43. {
  44. try
  45. {
  46. if (!string.IsNullOrEmpty(options.ConnectionString))
  47. {
  48. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  49. }
  50. else if (AzureCosmosDBConfig.AZURE_CHINA.Equals(options.AzureTableDialect))
  51. {
  52. AESCrypt crypt = new AESCrypt();
  53. CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(china_con, options.AzureTableDialect), crypt.Decrypt(china_key, options.AzureTableDialect)).GetCosmosDBClient();
  54. }
  55. else if (AzureCosmosDBConfig.AZURE_GLOBAL.Equals(options.AzureTableDialect))
  56. {
  57. AESCrypt crypt = new AESCrypt();
  58. CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(global_con, options.AzureTableDialect), crypt.Decrypt(global_key, options.AzureTableDialect)).GetCosmosDBClient();
  59. }
  60. else
  61. {
  62. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  63. }
  64. Database = options.Database;
  65. CollectionThroughput = options.CollectionThroughput;
  66. CosmosClient.CreateDatabaseIfNotExistsAsync(new Database { Id = Database });
  67. // _connectionString = options.ConnectionString;
  68. }
  69. catch (DocumentClientException de)
  70. {
  71. Exception baseException = de.GetBaseException();
  72. Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  73. }
  74. catch (Exception e)
  75. {
  76. Exception baseException = e.GetBaseException();
  77. Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  78. }
  79. finally
  80. {
  81. Console.WriteLine("End of demo, press any key to exit.");
  82. // Console.ReadKey();
  83. }
  84. }
  85. private async Task<DocumentCollection> InitializeCollection<T>()
  86. {
  87. Type t = typeof(T);
  88. if (CosmosCollection == null || !CosmosCollection.Id.Equals(t.Name))
  89. {
  90. DocumentCollection collectionDefinition = new DocumentCollection { Id = t.Name };
  91. collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  92. string partitionKey = GetPartitionKey<T>();
  93. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  94. if (!string.IsNullOrEmpty(partitionKey))
  95. {
  96. collectionDefinition.PartitionKey.Paths.Add("/" + partitionKey);
  97. }
  98. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  99. CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  100. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }
  101. );
  102. }
  103. return CosmosCollection;
  104. }
  105. private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
  106. {
  107. if (CosmosCollection == null || !CosmosCollection.Id.Equals(CollectionName))
  108. {
  109. DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
  110. collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  111. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  112. if (!string.IsNullOrEmpty(PartitionKey))
  113. {
  114. collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
  115. }
  116. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  117. CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  118. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }
  119. );
  120. }
  121. return CosmosCollection;
  122. }
  123. private string GetPartitionKey<T>()
  124. {
  125. Type type = typeof(T);
  126. PropertyInfo[] properties = type.GetProperties();
  127. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  128. foreach (PropertyInfo property in properties)
  129. {
  130. if (property.Name.Equals("PartitionKey"))
  131. {
  132. attrProperties.Add(property);
  133. break;
  134. }
  135. object[] attributes = property.GetCustomAttributes(true);
  136. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  137. {
  138. if (attribute is PartitionKeyAttribute)
  139. {
  140. attrProperties.Add(property);
  141. }
  142. }
  143. }
  144. if (attrProperties.Count <= 0)
  145. {
  146. return null;
  147. }
  148. else
  149. {
  150. if (attrProperties.Count == 1)
  151. {
  152. return attrProperties[0].Name;
  153. }
  154. else { throw new BizException("PartitionKey can only be single!"); }
  155. }
  156. }
  157. public async Task<T> Save<T>(T entity) //where T : object, new()
  158. {
  159. try
  160. {
  161. Type t = typeof(T);
  162. DocumentCollection documentCollection = await InitializeCollection<T>();
  163. ResourceResponse<Document> doc =
  164. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  165. //Console.WriteLine(doc.ActivityId);
  166. return entity;
  167. }
  168. catch (Exception e)
  169. {
  170. throw new BizException(e.Message);
  171. }
  172. }
  173. public async Task<T> Update<T>(T entity)
  174. {
  175. Type t = typeof(T);
  176. await InitializeCollection<T>();
  177. ResourceResponse<Document> doc =
  178. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  179. return entity;
  180. }
  181. public async Task<string> ReplaceObject<T>(T entity, string key)
  182. {
  183. Type t = typeof(T);
  184. await InitializeCollection<T>();
  185. try
  186. {
  187. ResourceResponse<Document> doc =
  188. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key), entity);
  189. return key;
  190. }
  191. catch (Exception e)
  192. {
  193. Console.WriteLine("{0} Exception caught.", e);
  194. //return false;
  195. }
  196. return null;
  197. }
  198. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  199. {
  200. Type t = typeof(T);
  201. await InitializeCollection<T>();
  202. try
  203. {
  204. ResourceResponse<Document> doc =
  205. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key),
  206. entity,
  207. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  208. return key;
  209. }
  210. catch (Exception e)
  211. {
  212. throw new BizException(e.Message);
  213. //Console.WriteLine("{0} Exception caught.", e);
  214. //return false;
  215. }
  216. }
  217. public async Task<List<T>> FindAll<T>()
  218. {
  219. Type t = typeof(T);
  220. Boolean open = true;
  221. List<T> objs = new List<T>();
  222. //await InitializeCollection<T>();
  223. //查询条数 -1是全部
  224. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  225. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions).AsDocumentQuery();
  226. while (query.HasMoreResults)
  227. {
  228. foreach (T obj in await query.ExecuteNextAsync())
  229. {
  230. objs.Add(obj);
  231. }
  232. }
  233. return objs;
  234. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  235. }
  236. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  237. {
  238. Type t = typeof(T);
  239. List<T> objs = new List<T>();
  240. await InitializeCollection<T>();
  241. //查询条数 -1是全部
  242. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  243. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  244. // query.Where();
  245. return objs;
  246. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  247. }
  248. public async Task<List<T>> FindSQL<T>(string sql)
  249. {
  250. Type t = typeof(T);
  251. //List<T> objs = new List<T>();
  252. await InitializeCollection<T>();
  253. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql);
  254. //foreach (var item in query)
  255. //{
  256. // objs.Add(item);
  257. //}
  258. return query.ToList<T>();
  259. }
  260. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  261. {
  262. Type t = typeof(T);
  263. //List<T> objs = new List<T>();
  264. // Boolean open = IsPk;
  265. await InitializeCollection<T>();
  266. //查询条数 -1是全部
  267. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  268. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql, queryOptions);
  269. //foreach (var item in query)
  270. //{
  271. // objs.Add(item);
  272. //}
  273. return query.ToList<T>();
  274. }
  275. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  276. {
  277. //await InitializeCollection<T>();
  278. Type t = typeof(T);
  279. Boolean open = true;
  280. List<Filter> filters = new List<Filter>();
  281. string PKname = "";
  282. PropertyInfo[] properties = t.GetProperties();
  283. foreach (PropertyInfo property in properties)
  284. {
  285. object[] attributes = property.GetCustomAttributes(true);
  286. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  287. {
  288. if (attribute is PartitionKeyAttribute)
  289. {
  290. PKname = property.Name;
  291. break;
  292. }
  293. }
  294. }
  295. foreach (string key in dict.Keys)
  296. {
  297. //if (t.Name.Equals(key)) {
  298. // open = false;
  299. //}
  300. if (PKname.Equals(key))
  301. {
  302. open = false;
  303. }
  304. filters.Add(new Filter { Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  305. }
  306. //List<T> objs = new List<T>();
  307. await InitializeCollection<T>();
  308. //查询条数 -1是全部
  309. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  310. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  311. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  312. return list;
  313. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  314. }
  315. public async Task<string> DeleteAsync<T>(string id)
  316. {
  317. Type t = typeof(T);
  318. await InitializeCollection<T>();
  319. ResourceResponse<Document> doc =
  320. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id));
  321. //Console.WriteLine(doc.ActivityId);
  322. return id;
  323. }
  324. public async Task<T> DeleteAsync<T>(T entity)
  325. {
  326. await InitializeCollection<T>();
  327. Type t = typeof(T);
  328. string PartitionKey = GetPartitionKey<T>();
  329. if (!string.IsNullOrEmpty(PartitionKey))
  330. {
  331. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  332. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  333. ResourceResponse<Document> doc =
  334. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  335. }
  336. else
  337. {
  338. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  339. ResourceResponse<Document> doc =
  340. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue));
  341. }
  342. //Console.WriteLine(doc.ActivityId);
  343. return entity;
  344. }
  345. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  346. {
  347. Type t = typeof(T);
  348. await InitializeCollection<T>();
  349. ResourceResponse<Document> doc =
  350. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  351. //Console.WriteLine(doc.ActivityId);
  352. return id;
  353. }
  354. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  355. {
  356. DocumentCollection dataCollection = await InitializeCollection<T>();
  357. // Set retry options high for initialization (default values).
  358. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  359. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  360. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  361. await bulkExecutor.InitializeAsync();
  362. // Set retries to 0 to pass control to bulk executor.
  363. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  364. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  365. BulkImportResponse bulkImportResponse = null;
  366. long totalNumberOfDocumentsInserted = 0;
  367. double totalRequestUnitsConsumed = 0;
  368. double totalTimeTakenSec = 0;
  369. var tokenSource = new CancellationTokenSource();
  370. var token = tokenSource.Token;
  371. int pageSize = 100;
  372. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  373. for (int i = 0; i < pages; i++)
  374. {
  375. List<string> documentsToImportInBatch = new List<string>();
  376. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  377. for (int j = 0; j < lists.Count; j++)
  378. {
  379. documentsToImportInBatch.Add(lists[j].ToJson());
  380. }
  381. var tasks = new List<Task>
  382. { Task.Run(async () =>
  383. {
  384. do
  385. {
  386. //try
  387. //{
  388. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  389. documents: documentsToImportInBatch,
  390. enableUpsert: true,
  391. disableAutomaticIdGeneration: true,
  392. maxConcurrencyPerPartitionKeyRange: null,
  393. maxInMemorySortingBatchSize: null,
  394. cancellationToken: token);
  395. //}
  396. //catch (DocumentClientException de)
  397. //{
  398. // break;
  399. //}
  400. //catch (Exception e)
  401. //{
  402. // break;
  403. //}
  404. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  405. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  406. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  407. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  408. },
  409. token)
  410. };
  411. await Task.WhenAll(tasks);
  412. }
  413. return enyites;
  414. }
  415. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  416. {
  417. DocumentCollection dataCollection = await InitializeCollection<T>();
  418. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  419. await bulkExecutor.InitializeAsync();
  420. BulkUpdateResponse bulkUpdateResponse = null;
  421. long totalNumberOfDocumentsUpdated = 0;
  422. double totalRequestUnitsConsumed = 0;
  423. double totalTimeTakenSec = 0;
  424. var tokenSource = new CancellationTokenSource();
  425. var token = tokenSource.Token;
  426. // Generate update operations.
  427. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  428. // Unset the description field.
  429. if (null != updateFilters && updateFilters.Count > 0)
  430. {
  431. var keys = updateFilters.Keys;
  432. foreach (string key in keys)
  433. {
  434. // updateOperations.Add(new SetUpdateOperation<string>())
  435. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  436. {
  437. updateOperations.Add(SwitchType(key, updateFilters[key]));
  438. }
  439. }
  440. }
  441. if (deleteKeys.IsNotEmpty())
  442. {
  443. foreach (string key in deleteKeys)
  444. {
  445. updateOperations.Add(new UnsetUpdateOperation(key));
  446. }
  447. }
  448. List<T> list = await FindByParams<T>(dict);
  449. int pageSize = 100;
  450. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  451. string partitionKey = "/" + GetPartitionKey<T>();
  452. Type type = typeof(T);
  453. for (int i = 0; i < pages; i++)
  454. {
  455. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  456. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  457. for (int j = 0; j < lists.Count; j++)
  458. {
  459. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  460. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  461. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  462. }
  463. var tasks = new List<Task>
  464. { Task.Run(async () =>
  465. {
  466. do
  467. {
  468. //try
  469. //{
  470. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  471. updateItems: updateItemsInBatch,
  472. maxConcurrencyPerPartitionKeyRange: null,
  473. cancellationToken: token);
  474. //}
  475. //catch (DocumentClientException de)
  476. //{
  477. // break;
  478. //}
  479. //catch (Exception e)
  480. //{
  481. // break;
  482. //}
  483. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  484. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  485. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  486. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  487. },
  488. token)
  489. };
  490. await Task.WhenAll(tasks);
  491. }
  492. return list;
  493. }
  494. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  495. {
  496. DocumentCollection dataCollection = await InitializeCollection<T>();
  497. List<T> list = await FindByParams<T>(dict);
  498. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  499. if (list.IsNotEmpty())
  500. {
  501. foreach (T t in list)
  502. {
  503. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  504. pkIdTuplesToDelete.Add(new Tuple<string, string>(id, id));
  505. }
  506. }
  507. else
  508. {
  509. return null;
  510. }
  511. long totalNumberOfDocumentsDeleted = 0;
  512. double totalRequestUnitsConsumed = 0;
  513. double totalTimeTakenSec = 0;
  514. BulkDeleteResponse bulkDeleteResponse = null;
  515. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  516. await bulkExecutor.InitializeAsync();
  517. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  518. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  519. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  520. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  521. return list;
  522. }
  523. private static UpdateOperation SwitchType(string key, object obj)
  524. {
  525. Type s = obj.GetType();
  526. TypeCode typeCode = Type.GetTypeCode(s);
  527. switch (typeCode)
  528. {
  529. case TypeCode.String: return new SetUpdateOperation<string>(key, obj.ToString());
  530. case TypeCode.Int32: return new SetUpdateOperation<Int32>(key, (Int32)obj);
  531. case TypeCode.Double: return new SetUpdateOperation<Double>(key, (Double)obj);
  532. case TypeCode.Byte: return new SetUpdateOperation<Byte>(key, (Byte)obj);
  533. case TypeCode.Boolean: return new SetUpdateOperation<Boolean>(key, (Boolean)obj);
  534. case TypeCode.DateTime: return new SetUpdateOperation<DateTime>(key, (DateTime)obj);
  535. case TypeCode.Int64: return new SetUpdateOperation<Int64>(key, (Int64)obj);
  536. default: return null;
  537. }
  538. }
  539. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk)
  540. {
  541. Type t = typeof(T);
  542. // List<T> objs = new List<T>();
  543. await InitializeCollection<T>();
  544. StringBuilder sql = new StringBuilder("select * from c where 1=1 ");
  545. if (dict != null)
  546. {
  547. foreach (string key in dict.Keys)
  548. {
  549. sql.Append(GenSql(dict[key], key));
  550. }
  551. }
  552. //查询条数 -1是全部
  553. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  554. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql.ToString(), queryOptions);
  555. //foreach (var item in query)
  556. //{
  557. // objs.Add(item);
  558. //}
  559. return query.ToList<T>();
  560. }
  561. private static string GenSql(object obj, string key)
  562. {
  563. Type s = obj.GetType();
  564. TypeCode typeCode = Type.GetTypeCode(s);
  565. return typeCode switch
  566. {
  567. TypeCode.String => "and c." + key + "=" + "'" + obj.ToString() + "'",
  568. TypeCode.Int32 => "and c." + key + "=" + int.Parse(obj.ToString()),
  569. TypeCode.Double => "and c." + key + "=" + double.Parse(obj.ToString()),
  570. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  571. TypeCode.Boolean => "and c." + key + "=" + bool.Parse(obj.ToString()),
  572. TypeCode.DateTime => "and c." + key + "=" + (DateTime)obj,
  573. TypeCode.Int64 => "and c." + key + "=" + long.Parse(obj.ToString()),
  574. _ => null,
  575. };
  576. }
  577. public async Task<IQueryable<dynamic>> FindByDict(string CollectionName, string PartitionKey, Dictionary<string, object> dict) {
  578. await InitializeCollection(CollectionName, PartitionKey);
  579. StringBuilder sql = new StringBuilder("select * from "+ CollectionName + " c where 1=1 ");
  580. if (dict != null)
  581. {
  582. foreach (string key in dict.Keys)
  583. {
  584. sql.Append(GenSql(dict[key], key));
  585. }
  586. }
  587. FeedOptions queryOptions;
  588. if (!string.IsNullOrEmpty(PartitionKey))
  589. {
  590. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  591. }
  592. else {
  593. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  594. }
  595. //查询条数 -1是全部
  596. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  597. return query;
  598. }
  599. }
  600. }