AzureCosmosDBRepository.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.CosmosDB.BulkExecutor;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  16. using System.Threading;
  17. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  18. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  19. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  20. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  21. using TEAMModelOS.SDK.Context.Attributes.Azure;
  22. using System.Text;
  23. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  24. { /// <summary>
  25. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  26. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  27. /// </summary>
  28. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  29. {
  30. /// <summary>
  31. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  32. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  33. /// </summary>
  34. private readonly string china_con = "417A7572654368696E6120202020202020202020202020202020202020202020D63873D37F845F9DC7607B4DF4787EE26598454CE32FB5F2EE778A34A5015736196DF7940C67A034CDD4C4B44CD37C20";
  35. private readonly string china_key = "417A7572654368696E61202020202020202020202020202020202020202020203CAA1DF7E3203F0ABCB2D60C1F3DCB6D90676C4D5467167F6E6A2CB3DDE975EA37B06BBAE6E012936BEDB6D5D60B28B13642F755CB25D1958BE5366EA20FA7C47E04A67B6A96111C61C3270CD0E5539CA45E3A77A6B483F47419BBAEDE75C0F6";
  36. private readonly string global_con = "417A757265476C6F62616C2020202020202020202020202020202020202020200E357979CB69243DBF4E41BF5526830F89AB746007AC68A3DD3F9CFDA781509F1C48B2359120A5E58B8C7B1EDAA99DEA";
  37. private readonly string global_key = "417A757265476C6F62616C2020202020202020202020202020202020202020209FF199D61813D1F4857D55CFB0A7D6A797FECF39A7F47553E9C1AF23674CB04BA95748A4A3C07B90F32E5EF26E0982DBF90001E066432075C434351D73FB387D27A50716D90F414F34A4579D846C27804F658705C05A7224EC4D695FD7A5EE23";
  38. private DocumentClient CosmosClient { get; set; }
  39. private DocumentCollection CosmosCollection { get; set; }
  40. private string Database { get; set; }
  41. private int CollectionThroughput { get; set; }
  42. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  43. {
  44. try
  45. {
  46. if (!string.IsNullOrEmpty(options.ConnectionString))
  47. {
  48. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  49. }
  50. else if (AzureCosmosDBConfig.AZURE_CHINA.Equals(options.AzureTableDialect))
  51. {
  52. AESCrypt crypt = new AESCrypt();
  53. CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(china_con, options.AzureTableDialect), crypt.Decrypt(china_key, options.AzureTableDialect)).GetCosmosDBClient();
  54. }
  55. else if (AzureCosmosDBConfig.AZURE_GLOBAL.Equals(options.AzureTableDialect))
  56. {
  57. AESCrypt crypt = new AESCrypt();
  58. CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(global_con, options.AzureTableDialect), crypt.Decrypt(global_key, options.AzureTableDialect)).GetCosmosDBClient();
  59. }
  60. else
  61. {
  62. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  63. }
  64. Database = options.Database;
  65. CollectionThroughput = options.CollectionThroughput;
  66. CosmosClient.CreateDatabaseIfNotExistsAsync(new Database { Id = Database });
  67. // _connectionString = options.ConnectionString;
  68. }
  69. catch (DocumentClientException de)
  70. {
  71. Exception baseException = de.GetBaseException();
  72. Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  73. }
  74. catch (Exception e)
  75. {
  76. Exception baseException = e.GetBaseException();
  77. Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  78. }
  79. finally
  80. {
  81. Console.WriteLine("End of demo, press any key to exit.");
  82. // Console.ReadKey();
  83. }
  84. }
  85. private async Task<DocumentCollection> InitializeCollection<T>()
  86. {
  87. Type t = typeof(T);
  88. if (CosmosCollection == null || !CosmosCollection.Id.Equals(t.Name))
  89. {
  90. DocumentCollection collectionDefinition = new DocumentCollection { Id = t.Name };
  91. collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  92. string partitionKey = GetPartitionKey<T>();
  93. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  94. if (!string.IsNullOrEmpty(partitionKey))
  95. {
  96. collectionDefinition.PartitionKey.Paths.Add("/" + partitionKey);
  97. }
  98. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  99. CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  100. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }
  101. );
  102. }
  103. return CosmosCollection;
  104. }
  105. private string GetPartitionKey<T>()
  106. {
  107. Type type = typeof(T);
  108. PropertyInfo[] properties = type.GetProperties();
  109. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  110. foreach (PropertyInfo property in properties)
  111. {
  112. if (property.Name.Equals("PartitionKey"))
  113. {
  114. attrProperties.Add(property);
  115. break;
  116. }
  117. object[] attributes = property.GetCustomAttributes(true);
  118. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  119. {
  120. if (attribute is PartitionKeyAttribute)
  121. {
  122. attrProperties.Add(property);
  123. }
  124. }
  125. }
  126. if (attrProperties.Count <= 0)
  127. {
  128. return null;
  129. }
  130. else
  131. {
  132. if (attrProperties.Count == 1)
  133. {
  134. return attrProperties[0].Name;
  135. }
  136. else { throw new BizException("PartitionKey can only be single!"); }
  137. }
  138. }
  139. public async Task<T> Save<T>(T entity) //where T : object, new()
  140. {
  141. try
  142. {
  143. Type t = typeof(T);
  144. DocumentCollection documentCollection = await InitializeCollection<T>();
  145. ResourceResponse<Document> doc =
  146. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  147. //Console.WriteLine(doc.ActivityId);
  148. return entity;
  149. }
  150. catch (Exception e)
  151. {
  152. throw new BizException(e.Message);
  153. }
  154. }
  155. public async Task<T> Update<T>(T entity)
  156. {
  157. Type t = typeof(T);
  158. await InitializeCollection<T>();
  159. ResourceResponse<Document> doc =
  160. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  161. return entity;
  162. }
  163. public async Task<string> ReplaceObject<T>(T entity, string key)
  164. {
  165. Type t = typeof(T);
  166. await InitializeCollection<T>();
  167. try
  168. {
  169. ResourceResponse<Document> doc =
  170. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key), entity);
  171. return key;
  172. }
  173. catch (Exception e)
  174. {
  175. Console.WriteLine("{0} Exception caught.", e);
  176. //return false;
  177. }
  178. return null;
  179. }
  180. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  181. {
  182. Type t = typeof(T);
  183. await InitializeCollection<T>();
  184. try
  185. {
  186. ResourceResponse<Document> doc =
  187. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key),
  188. entity,
  189. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  190. return key;
  191. }
  192. catch (Exception e)
  193. {
  194. throw new BizException(e.Message);
  195. //Console.WriteLine("{0} Exception caught.", e);
  196. //return false;
  197. }
  198. }
  199. public async Task<List<T>> FindAll<T>()
  200. {
  201. Type t = typeof(T);
  202. Boolean open = true;
  203. List<T> objs = new List<T>();
  204. //await InitializeCollection<T>();
  205. //查询条数 -1是全部
  206. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  207. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions).AsDocumentQuery();
  208. while (query.HasMoreResults)
  209. {
  210. foreach (T obj in await query.ExecuteNextAsync())
  211. {
  212. objs.Add(obj);
  213. }
  214. }
  215. return objs;
  216. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  217. }
  218. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  219. {
  220. Type t = typeof(T);
  221. List<T> objs = new List<T>();
  222. await InitializeCollection<T>();
  223. //查询条数 -1是全部
  224. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  225. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  226. // query.Where();
  227. return objs;
  228. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  229. }
  230. public async Task<List<T>> FindSQL<T>(string sql)
  231. {
  232. Type t = typeof(T);
  233. List<T> objs = new List<T>();
  234. await InitializeCollection<T>();
  235. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql);
  236. foreach (var item in query)
  237. {
  238. objs.Add(item);
  239. }
  240. return objs;
  241. }
  242. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  243. {
  244. Type t = typeof(T);
  245. List<T> objs = new List<T>();
  246. Boolean open = IsPk;
  247. await InitializeCollection<T>();
  248. //查询条数 -1是全部
  249. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  250. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql, queryOptions);
  251. foreach (var item in query)
  252. {
  253. objs.Add(item);
  254. }
  255. return objs;
  256. }
  257. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  258. {
  259. //await InitializeCollection<T>();
  260. Type t = typeof(T);
  261. Boolean open = true;
  262. List<Filter> filters = new List<Filter>();
  263. string PKname = "";
  264. PropertyInfo[] properties = t.GetProperties();
  265. foreach (PropertyInfo property in properties)
  266. {
  267. object[] attributes = property.GetCustomAttributes(true);
  268. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  269. {
  270. if (attribute is PartitionKeyAttribute)
  271. {
  272. PKname = property.Name;
  273. break;
  274. }
  275. }
  276. }
  277. foreach (string key in dict.Keys)
  278. {
  279. //if (t.Name.Equals(key)) {
  280. // open = false;
  281. //}
  282. if (PKname.Equals(key))
  283. {
  284. open = false;
  285. }
  286. filters.Add(new Filter { Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  287. }
  288. //List<T> objs = new List<T>();
  289. await InitializeCollection<T>();
  290. //查询条数 -1是全部
  291. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  292. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  293. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  294. return list;
  295. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  296. }
  297. public async Task<string> DeleteAsync<T>(string id)
  298. {
  299. Type t = typeof(T);
  300. await InitializeCollection<T>();
  301. ResourceResponse<Document> doc =
  302. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id));
  303. //Console.WriteLine(doc.ActivityId);
  304. return id;
  305. }
  306. public async Task<T> DeleteAsync<T>(T entity)
  307. {
  308. await InitializeCollection<T>();
  309. Type t = typeof(T);
  310. string PartitionKey = GetPartitionKey<T>();
  311. if (!string.IsNullOrEmpty(PartitionKey))
  312. {
  313. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  314. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  315. ResourceResponse<Document> doc =
  316. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  317. }
  318. else
  319. {
  320. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  321. ResourceResponse<Document> doc =
  322. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue));
  323. }
  324. //Console.WriteLine(doc.ActivityId);
  325. return entity;
  326. }
  327. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  328. {
  329. Type t = typeof(T);
  330. await InitializeCollection<T>();
  331. ResourceResponse<Document> doc =
  332. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  333. //Console.WriteLine(doc.ActivityId);
  334. return id;
  335. }
  336. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  337. {
  338. DocumentCollection dataCollection = await InitializeCollection<T>();
  339. // Set retry options high for initialization (default values).
  340. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  341. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  342. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  343. await bulkExecutor.InitializeAsync();
  344. // Set retries to 0 to pass control to bulk executor.
  345. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  346. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  347. BulkImportResponse bulkImportResponse = null;
  348. long totalNumberOfDocumentsInserted = 0;
  349. double totalRequestUnitsConsumed = 0;
  350. double totalTimeTakenSec = 0;
  351. var tokenSource = new CancellationTokenSource();
  352. var token = tokenSource.Token;
  353. int pageSize = 100;
  354. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  355. for (int i = 0; i < pages; i++)
  356. {
  357. List<string> documentsToImportInBatch = new List<string>();
  358. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  359. for (int j = 0; j < lists.Count; j++)
  360. {
  361. documentsToImportInBatch.Add(lists[j].ToJson());
  362. }
  363. var tasks = new List<Task>
  364. { Task.Run(async () =>
  365. {
  366. do
  367. {
  368. //try
  369. //{
  370. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  371. documents: documentsToImportInBatch,
  372. enableUpsert: true,
  373. disableAutomaticIdGeneration: true,
  374. maxConcurrencyPerPartitionKeyRange: null,
  375. maxInMemorySortingBatchSize: null,
  376. cancellationToken: token);
  377. //}
  378. //catch (DocumentClientException de)
  379. //{
  380. // break;
  381. //}
  382. //catch (Exception e)
  383. //{
  384. // break;
  385. //}
  386. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  387. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  388. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  389. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  390. },
  391. token)
  392. };
  393. await Task.WhenAll(tasks);
  394. }
  395. return enyites;
  396. }
  397. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  398. {
  399. DocumentCollection dataCollection = await InitializeCollection<T>();
  400. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  401. await bulkExecutor.InitializeAsync();
  402. BulkUpdateResponse bulkUpdateResponse = null;
  403. long totalNumberOfDocumentsUpdated = 0;
  404. double totalRequestUnitsConsumed = 0;
  405. double totalTimeTakenSec = 0;
  406. var tokenSource = new CancellationTokenSource();
  407. var token = tokenSource.Token;
  408. // Generate update operations.
  409. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  410. // Unset the description field.
  411. if (null != updateFilters && updateFilters.Count > 0)
  412. {
  413. var keys = updateFilters.Keys;
  414. foreach (string key in keys)
  415. {
  416. // updateOperations.Add(new SetUpdateOperation<string>())
  417. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  418. {
  419. updateOperations.Add(SwitchType(key, updateFilters[key]));
  420. }
  421. }
  422. }
  423. if (deleteKeys.IsNotEmpty())
  424. {
  425. foreach (string key in deleteKeys)
  426. {
  427. updateOperations.Add(new UnsetUpdateOperation(key));
  428. }
  429. }
  430. List<T> list = await FindByParams<T>(dict);
  431. int pageSize = 100;
  432. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  433. string partitionKey = "/" + GetPartitionKey<T>();
  434. Type type = typeof(T);
  435. for (int i = 0; i < pages; i++)
  436. {
  437. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  438. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  439. for (int j = 0; j < lists.Count; j++)
  440. {
  441. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  442. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  443. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  444. }
  445. var tasks = new List<Task>
  446. { Task.Run(async () =>
  447. {
  448. do
  449. {
  450. //try
  451. //{
  452. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  453. updateItems: updateItemsInBatch,
  454. maxConcurrencyPerPartitionKeyRange: null,
  455. cancellationToken: token);
  456. //}
  457. //catch (DocumentClientException de)
  458. //{
  459. // break;
  460. //}
  461. //catch (Exception e)
  462. //{
  463. // break;
  464. //}
  465. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  466. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  467. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  468. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  469. },
  470. token)
  471. };
  472. await Task.WhenAll(tasks);
  473. }
  474. return list;
  475. }
  476. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  477. {
  478. DocumentCollection dataCollection = await InitializeCollection<T>();
  479. List<T> list = await FindByParams<T>(dict);
  480. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  481. if (list.IsNotEmpty())
  482. {
  483. foreach (T t in list)
  484. {
  485. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  486. pkIdTuplesToDelete.Add(new Tuple<string, string>(id, id));
  487. }
  488. }
  489. else
  490. {
  491. return null;
  492. }
  493. long totalNumberOfDocumentsDeleted = 0;
  494. double totalRequestUnitsConsumed = 0;
  495. double totalTimeTakenSec = 0;
  496. BulkDeleteResponse bulkDeleteResponse = null;
  497. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  498. await bulkExecutor.InitializeAsync();
  499. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  500. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  501. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  502. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  503. return list;
  504. }
  505. private static UpdateOperation SwitchType(string key, object obj)
  506. {
  507. Type s = obj.GetType();
  508. TypeCode typeCode = Type.GetTypeCode(s);
  509. switch (typeCode)
  510. {
  511. case TypeCode.String: return new SetUpdateOperation<string>(key, obj.ToString());
  512. case TypeCode.Int32: return new SetUpdateOperation<Int32>(key, (Int32)obj);
  513. case TypeCode.Double: return new SetUpdateOperation<Double>(key, (Double)obj);
  514. case TypeCode.Byte: return new SetUpdateOperation<Byte>(key, (Byte)obj);
  515. case TypeCode.Boolean: return new SetUpdateOperation<Boolean>(key, (Boolean)obj);
  516. case TypeCode.DateTime: return new SetUpdateOperation<DateTime>(key, (DateTime)obj);
  517. case TypeCode.Int64: return new SetUpdateOperation<Int64>(key, (Int64)obj);
  518. default: return null;
  519. }
  520. }
  521. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk)
  522. {
  523. Type t = typeof(T);
  524. List<T> objs = new List<T>();
  525. Boolean open = IsPk;
  526. await InitializeCollection<T>();
  527. StringBuilder sql = new StringBuilder("select * from c where 1=1 ");
  528. if (dict != null)
  529. {
  530. foreach (string key in dict.Keys)
  531. {
  532. sql.Append(GenSql(dict[key], key));
  533. }
  534. }
  535. //查询条数 -1是全部
  536. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  537. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql.ToString(), queryOptions);
  538. foreach (var item in query)
  539. {
  540. objs.Add(item);
  541. }
  542. return objs;
  543. }
  544. private static string GenSql(object obj, string key)
  545. {
  546. Type s = obj.GetType();
  547. TypeCode typeCode = Type.GetTypeCode(s);
  548. switch (typeCode)
  549. {
  550. case TypeCode.String: return "and c." + key + "=" + "'" + obj.ToString() + "'";
  551. case TypeCode.Int32: return "and c." + key + "=" + int.Parse(obj.ToString());
  552. case TypeCode.Double: return "and c." + key + "=" + double.Parse(obj.ToString());
  553. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  554. case TypeCode.Boolean: return "and c." + key + "=" + bool.Parse(obj.ToString());
  555. case TypeCode.DateTime: return "and c." + key + "=" + (DateTime)obj;
  556. case TypeCode.Int64: return "and c." + key + "=" + long.Parse(obj.ToString());
  557. default: return null;
  558. }
  559. }
  560. }
  561. }