AzureCosmosDBRepository.cs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.CosmosDB.BulkExecutor;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  16. using System.Threading;
  17. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  18. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  19. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  20. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  21. using TEAMModelOS.SDK.Context.Attributes.Azure;
  22. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  23. { /// <summary>
  24. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  25. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  26. /// </summary>
  27. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  28. {
  29. /// <summary>
  30. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  31. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  32. /// </summary>
  33. private readonly string china_con = "417A7572654368696E6120202020202020202020202020202020202020202020D63873D37F845F9DC7607B4DF4787EE26598454CE32FB5F2EE778A34A5015736196DF7940C67A034CDD4C4B44CD37C20";
  34. private readonly string china_key = "417A7572654368696E61202020202020202020202020202020202020202020203CAA1DF7E3203F0ABCB2D60C1F3DCB6D90676C4D5467167F6E6A2CB3DDE975EA37B06BBAE6E012936BEDB6D5D60B28B13642F755CB25D1958BE5366EA20FA7C47E04A67B6A96111C61C3270CD0E5539CA45E3A77A6B483F47419BBAEDE75C0F6";
  35. private readonly string global_con = "417A757265476C6F62616C2020202020202020202020202020202020202020200E357979CB69243DBF4E41BF5526830F89AB746007AC68A3DD3F9CFDA781509F1C48B2359120A5E58B8C7B1EDAA99DEA";
  36. private readonly string global_key = "417A757265476C6F62616C2020202020202020202020202020202020202020209FF199D61813D1F4857D55CFB0A7D6A797FECF39A7F47553E9C1AF23674CB04BA95748A4A3C07B90F32E5EF26E0982DBF90001E066432075C434351D73FB387D27A50716D90F414F34A4579D846C27804F658705C05A7224EC4D695FD7A5EE23";
  37. private DocumentClient CosmosClient { get; set; }
  38. private DocumentCollection CosmosCollection { get; set; }
  39. private string Database { get; set; }
  40. private int CollectionThroughput { get; set; }
  41. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  42. {
  43. try
  44. {
  45. if (!string.IsNullOrEmpty(options.ConnectionString))
  46. {
  47. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  48. }
  49. else if (AzureCosmosDBConfig.AZURE_CHINA.Equals(options.AzureTableDialect))
  50. {
  51. AESCrypt crypt = new AESCrypt();
  52. CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(china_con, options.AzureTableDialect), crypt.Decrypt(china_key, options.AzureTableDialect)).GetCosmosDBClient();
  53. }
  54. else if (AzureCosmosDBConfig.AZURE_GLOBAL.Equals(options.AzureTableDialect))
  55. {
  56. AESCrypt crypt = new AESCrypt();
  57. CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(global_con, options.AzureTableDialect), crypt.Decrypt(global_key, options.AzureTableDialect)).GetCosmosDBClient();
  58. }
  59. else
  60. {
  61. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  62. }
  63. Database = options.Database;
  64. CollectionThroughput = options.CollectionThroughput;
  65. CosmosClient.CreateDatabaseIfNotExistsAsync(new Database { Id = Database });
  66. // _connectionString = options.ConnectionString;
  67. }
  68. catch (DocumentClientException de)
  69. {
  70. Exception baseException = de.GetBaseException();
  71. Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  72. }
  73. catch (Exception e)
  74. {
  75. Exception baseException = e.GetBaseException();
  76. Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  77. }
  78. finally
  79. {
  80. Console.WriteLine("End of demo, press any key to exit.");
  81. // Console.ReadKey();
  82. }
  83. }
  84. private async Task<DocumentCollection> InitializeCollection<T>()
  85. {
  86. Type t = typeof(T);
  87. if (CosmosCollection == null || !CosmosCollection.Id.Equals(t.Name))
  88. {
  89. DocumentCollection collectionDefinition = new DocumentCollection { Id = t.Name };
  90. collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  91. string partitionKey = GetPartitionKey<T>();
  92. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  93. if (!string.IsNullOrEmpty(partitionKey))
  94. {
  95. collectionDefinition.PartitionKey.Paths.Add("/" + partitionKey);
  96. }
  97. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  98. CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  99. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }
  100. );
  101. }
  102. return CosmosCollection;
  103. }
  104. private string GetPartitionKey<T>()
  105. {
  106. Type type = typeof(T);
  107. PropertyInfo[] properties = type.GetProperties();
  108. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  109. foreach (PropertyInfo property in properties)
  110. {
  111. if (property.Name.Equals("PartitionKey"))
  112. {
  113. attrProperties.Add(property);
  114. break;
  115. }
  116. object[] attributes = property.GetCustomAttributes(true);
  117. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  118. {
  119. if (attribute is PartitionKeyAttribute)
  120. {
  121. attrProperties.Add(property);
  122. }
  123. }
  124. }
  125. if (attrProperties.Count <= 0)
  126. {
  127. return null;
  128. }
  129. else
  130. {
  131. if (attrProperties.Count == 1)
  132. {
  133. return attrProperties[0].Name;
  134. }
  135. else { throw new BizException("PartitionKey can only be single!"); }
  136. }
  137. }
  138. public async Task<T> Save<T>(T entity) //where T : object, new()
  139. {
  140. try {
  141. Type t = typeof(T);
  142. DocumentCollection documentCollection = await InitializeCollection<T>();
  143. ResourceResponse<Document> doc =
  144. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  145. //Console.WriteLine(doc.ActivityId);
  146. return entity;
  147. } catch (Exception e ) {
  148. throw new BizException(e.Message);
  149. }
  150. }
  151. public async Task<T> Update<T>(T entity)
  152. {
  153. Type t = typeof(T);
  154. await InitializeCollection<T>();
  155. ResourceResponse<Document> doc =
  156. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  157. return entity;
  158. }
  159. public async Task<string> ReplaceObject<T>(T entity, string key)
  160. {
  161. Type t = typeof(T);
  162. await InitializeCollection<T>();
  163. try
  164. {
  165. ResourceResponse<Document> doc =
  166. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key), entity);
  167. return key;
  168. }
  169. catch (Exception e)
  170. {
  171. Console.WriteLine("{0} Exception caught.", e);
  172. //return false;
  173. }
  174. return null;
  175. }
  176. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  177. {
  178. Type t = typeof(T);
  179. await InitializeCollection<T>();
  180. try
  181. {
  182. ResourceResponse<Document> doc =
  183. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key),
  184. entity,
  185. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  186. return key;
  187. }
  188. catch (Exception e)
  189. {
  190. throw new BizException(e.Message);
  191. //Console.WriteLine("{0} Exception caught.", e);
  192. //return false;
  193. }
  194. }
  195. public async Task<List<T>> FindAll<T>()
  196. {
  197. Type t = typeof(T);
  198. Boolean open = true;
  199. List<T> objs = new List<T>();
  200. //await InitializeCollection<T>();
  201. //查询条数 -1是全部
  202. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  203. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions).AsDocumentQuery();
  204. while (query.HasMoreResults)
  205. {
  206. foreach (T obj in await query.ExecuteNextAsync())
  207. {
  208. objs.Add(obj);
  209. }
  210. }
  211. return objs;
  212. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  213. }
  214. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  215. {
  216. Type t = typeof(T);
  217. List<T> objs = new List<T>();
  218. await InitializeCollection<T>();
  219. //查询条数 -1是全部
  220. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  221. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  222. // query.Where();
  223. return objs;
  224. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  225. }
  226. public async Task<List<T>> FindSQL<T>(string sql)
  227. {
  228. Type t = typeof(T);
  229. List<T> objs = new List<T>();
  230. await InitializeCollection<T>();
  231. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql);
  232. foreach (var item in query)
  233. {
  234. objs.Add(item);
  235. }
  236. return objs;
  237. }
  238. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  239. {
  240. Type t = typeof(T);
  241. List<T> objs = new List<T>();
  242. Boolean open = IsPk;
  243. await InitializeCollection<T>();
  244. //查询条数 -1是全部
  245. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  246. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql, queryOptions);
  247. foreach (var item in query)
  248. {
  249. objs.Add(item);
  250. }
  251. return objs;
  252. }
  253. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  254. {
  255. //await InitializeCollection<T>();
  256. Type t = typeof(T);
  257. Boolean open = true;
  258. List<Filter> filters = new List<Filter>();
  259. string PKname = "";
  260. PropertyInfo[] properties = t.GetProperties();
  261. foreach (PropertyInfo property in properties)
  262. {
  263. object[] attributes = property.GetCustomAttributes(true);
  264. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  265. {
  266. if (attribute is PartitionKeyAttribute)
  267. {
  268. PKname = property.Name;
  269. break;
  270. }
  271. }
  272. }
  273. foreach (string key in dict.Keys)
  274. {
  275. //if (t.Name.Equals(key)) {
  276. // open = false;
  277. //}
  278. if (PKname.Equals(key))
  279. {
  280. open = false;
  281. }
  282. filters.Add(new Filter { Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  283. }
  284. //List<T> objs = new List<T>();
  285. await InitializeCollection<T>();
  286. //查询条数 -1是全部
  287. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  288. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  289. var ass = DynamicLinq.GenerateFilter<T>(query, filters);
  290. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  291. return list;
  292. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  293. }
  294. public async Task<string> DeleteAsync<T>(string id)
  295. {
  296. Type t = typeof(T);
  297. await InitializeCollection<T>();
  298. ResourceResponse<Document> doc =
  299. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id));
  300. //Console.WriteLine(doc.ActivityId);
  301. return id;
  302. }
  303. public async Task<T> DeleteAsync<T>(T entity)
  304. {
  305. await InitializeCollection<T>();
  306. Type t = typeof(T);
  307. string PartitionKey = GetPartitionKey<T>();
  308. if (!string.IsNullOrEmpty(PartitionKey))
  309. {
  310. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  311. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  312. ResourceResponse<Document> doc =
  313. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  314. }
  315. else
  316. {
  317. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  318. ResourceResponse<Document> doc =
  319. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue));
  320. }
  321. //Console.WriteLine(doc.ActivityId);
  322. return entity;
  323. }
  324. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  325. {
  326. Type t = typeof(T);
  327. await InitializeCollection<T>();
  328. ResourceResponse<Document> doc =
  329. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  330. //Console.WriteLine(doc.ActivityId);
  331. return id;
  332. }
  333. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  334. {
  335. DocumentCollection dataCollection = await InitializeCollection<T>();
  336. // Set retry options high for initialization (default values).
  337. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  338. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  339. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  340. await bulkExecutor.InitializeAsync();
  341. // Set retries to 0 to pass control to bulk executor.
  342. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  343. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  344. BulkImportResponse bulkImportResponse = null;
  345. long totalNumberOfDocumentsInserted = 0;
  346. double totalRequestUnitsConsumed = 0;
  347. double totalTimeTakenSec = 0;
  348. var tokenSource = new CancellationTokenSource();
  349. var token = tokenSource.Token;
  350. int pageSize = 100;
  351. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  352. for (int i = 0; i < pages; i++)
  353. {
  354. List<string> documentsToImportInBatch = new List<string>();
  355. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  356. for (int j = 0; j < lists.Count; j++)
  357. {
  358. documentsToImportInBatch.Add(lists[j].ToJson());
  359. }
  360. var tasks = new List<Task>
  361. { Task.Run(async () =>
  362. {
  363. do
  364. {
  365. //try
  366. //{
  367. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  368. documents: documentsToImportInBatch,
  369. enableUpsert: true,
  370. disableAutomaticIdGeneration: true,
  371. maxConcurrencyPerPartitionKeyRange: null,
  372. maxInMemorySortingBatchSize: null,
  373. cancellationToken: token);
  374. //}
  375. //catch (DocumentClientException de)
  376. //{
  377. // break;
  378. //}
  379. //catch (Exception e)
  380. //{
  381. // break;
  382. //}
  383. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  384. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  385. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  386. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  387. },
  388. token)
  389. };
  390. await Task.WhenAll(tasks);
  391. }
  392. return enyites;
  393. }
  394. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  395. {
  396. DocumentCollection dataCollection = await InitializeCollection<T>();
  397. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  398. await bulkExecutor.InitializeAsync();
  399. BulkUpdateResponse bulkUpdateResponse = null;
  400. long totalNumberOfDocumentsUpdated = 0;
  401. double totalRequestUnitsConsumed = 0;
  402. double totalTimeTakenSec = 0;
  403. var tokenSource = new CancellationTokenSource();
  404. var token = tokenSource.Token;
  405. // Generate update operations.
  406. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  407. // Unset the description field.
  408. if (null != updateFilters && updateFilters.Count > 0)
  409. {
  410. var keys = updateFilters.Keys;
  411. foreach (string key in keys)
  412. {
  413. // updateOperations.Add(new SetUpdateOperation<string>())
  414. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  415. {
  416. updateOperations.Add(SwitchType(key, updateFilters[key]));
  417. }
  418. }
  419. }
  420. if (deleteKeys.IsNotEmpty())
  421. {
  422. foreach (string key in deleteKeys)
  423. {
  424. updateOperations.Add(new UnsetUpdateOperation(key));
  425. }
  426. }
  427. List<T> list = await FindByParams<T>(dict);
  428. int pageSize = 100;
  429. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  430. string partitionKey = "/" + GetPartitionKey<T>();
  431. Type type = typeof(T);
  432. for (int i = 0; i < pages; i++)
  433. {
  434. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  435. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  436. for (int j = 0; j < lists.Count; j++)
  437. {
  438. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  439. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  440. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  441. }
  442. var tasks = new List<Task>
  443. { Task.Run(async () =>
  444. {
  445. do
  446. {
  447. //try
  448. //{
  449. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  450. updateItems: updateItemsInBatch,
  451. maxConcurrencyPerPartitionKeyRange: null,
  452. cancellationToken: token);
  453. //}
  454. //catch (DocumentClientException de)
  455. //{
  456. // break;
  457. //}
  458. //catch (Exception e)
  459. //{
  460. // break;
  461. //}
  462. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  463. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  464. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  465. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  466. },
  467. token)
  468. };
  469. await Task.WhenAll(tasks);
  470. }
  471. return list;
  472. }
  473. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  474. {
  475. DocumentCollection dataCollection = await InitializeCollection<T>();
  476. List<T> list = await FindByParams<T>(dict);
  477. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  478. if (list.IsNotEmpty())
  479. {
  480. foreach (T t in list)
  481. {
  482. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  483. pkIdTuplesToDelete.Add(new Tuple<string, string>(id, id));
  484. }
  485. }
  486. else
  487. {
  488. return null;
  489. }
  490. long totalNumberOfDocumentsDeleted = 0;
  491. double totalRequestUnitsConsumed = 0;
  492. double totalTimeTakenSec = 0;
  493. BulkDeleteResponse bulkDeleteResponse = null;
  494. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  495. await bulkExecutor.InitializeAsync();
  496. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  497. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  498. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  499. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  500. return list;
  501. }
  502. private static UpdateOperation SwitchType(string key, object obj)
  503. {
  504. Type s = obj.GetType();
  505. TypeCode typeCode = Type.GetTypeCode(s);
  506. switch (typeCode)
  507. {
  508. case TypeCode.String: return new SetUpdateOperation<string>(key, obj.ToString());
  509. case TypeCode.Int32: return new SetUpdateOperation<Int32>(key, (Int32)obj);
  510. case TypeCode.Double: return new SetUpdateOperation<Double>(key, (Double)obj);
  511. case TypeCode.Byte: return new SetUpdateOperation<Byte>(key, (Byte)obj);
  512. case TypeCode.Boolean: return new SetUpdateOperation<Boolean>(key, (Boolean)obj);
  513. case TypeCode.DateTime: return new SetUpdateOperation<DateTime>(key, (DateTime)obj);
  514. case TypeCode.Int64: return new SetUpdateOperation<Int64>(key, (Int64)obj);
  515. default: return null;
  516. }
  517. }
  518. }
  519. }