AzureCosmosDBRepository.cs 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.Cosmos;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor;
  16. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  17. using System.Threading;
  18. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  19. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  20. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  21. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  22. using TEAMModelOS.SDK.Context.Attributes.Azure;
  23. using System.Text;
  24. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  25. using Microsoft.AspNetCore.Hosting;
  26. using System.Collections.Concurrent;
  27. using DataType = Microsoft.Azure.Documents.DataType;
  28. using RequestOptions = Microsoft.Azure.Documents.Client.RequestOptions;
  29. using PartitionKey = Microsoft.Azure.Documents.PartitionKey;
  30. using Newtonsoft.Json.Linq;
  31. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  32. { /// <summary>
  33. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  34. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  35. /// </summary>
  36. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  37. {
  38. /// <summary>
  39. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  40. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  41. /// </summary>
  42. private DocumentClient CosmosClient { get; set; }
  43. /// <summary>
  44. /// 线程安全的dict类型
  45. /// </summary>
  46. private ConcurrentDictionary<string, DocumentCollection> DocumentCollectionDict { get; set; } = new ConcurrentDictionary<string, DocumentCollection>();
  47. // private DocumentCollection CosmosCollection { get; set; }
  48. private string Database { get; set; }
  49. private int CollectionThroughput { get; set; }
  50. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  51. {
  52. try
  53. {
  54. if (!string.IsNullOrEmpty(options.ConnectionString))
  55. {
  56. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  57. }
  58. else
  59. {
  60. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  61. }
  62. Database = options.Database;
  63. CollectionThroughput = options.CollectionThroughput;
  64. CosmosClient.CreateDatabaseIfNotExistsAsync(new Microsoft.Azure.Documents.Database { Id = Database });
  65. // _connectionString = options.ConnectionString;
  66. // CosmosSerializer
  67. //获取数据库所有的表
  68. Microsoft.Azure.Documents.Client.FeedResponse<DocumentCollection> collections = CosmosClient.ReadDocumentCollectionFeedAsync(UriFactory.CreateDatabaseUri(Database)).GetAwaiter().GetResult();
  69. foreach (IGrouping<string, DocumentCollection> group in collections.GroupBy(c => c.Id))
  70. {
  71. DocumentCollectionDict.TryAdd(group.Key, group.First());
  72. }
  73. //collections
  74. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(options.ScanModel);
  75. foreach (Type type in types)
  76. {
  77. string PartitionKey = GetPartitionKey(type);
  78. string CollectionName = "";
  79. int RU = 0;
  80. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  81. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  82. {
  83. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  84. }
  85. else
  86. {
  87. CollectionName = type.Name;
  88. }
  89. if (attributes.First<CosmosDBAttribute>().RU > 400)
  90. {
  91. RU = attributes.First<CosmosDBAttribute>().RU;
  92. }
  93. else
  94. {
  95. RU = options.CollectionThroughput;
  96. }
  97. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  98. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  99. {
  100. Offer offer = CosmosClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
  101. OfferV2 offerV2 = (OfferV2)offer;
  102. //更新RU
  103. if (offerV2.Content.OfferThroughput < RU)
  104. {
  105. CosmosClient.ReplaceOfferAsync(new OfferV2(offer, RU));
  106. }
  107. }
  108. else
  109. {
  110. DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
  111. collectionDefinition.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  112. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  113. if (!string.IsNullOrEmpty(PartitionKey))
  114. {
  115. collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
  116. }
  117. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  118. if (RU > CollectionThroughput)
  119. {
  120. CollectionThroughput = RU;
  121. }
  122. DocumentCollection DocumentCollection = CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  123. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new Microsoft.Azure.Documents.Client.RequestOptions { OfferThroughput = CollectionThroughput }).GetAwaiter().GetResult();
  124. DocumentCollectionDict.TryAdd(CollectionName, DocumentCollection);
  125. }
  126. }
  127. }
  128. catch (DocumentClientException de)
  129. {
  130. Exception baseException = de.GetBaseException();
  131. //Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  132. }
  133. catch (Exception e)
  134. {
  135. Exception baseException = e.GetBaseException();
  136. //Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  137. }
  138. finally
  139. {
  140. // Console.WriteLine("End of demo, press any key to exit.");
  141. // Console.ReadKey();
  142. }
  143. }
  144. private async Task<DocumentCollection> InitializeCollection<T>()
  145. {
  146. Type type = typeof(T);
  147. string partitionKey = GetPartitionKey<T>();
  148. string CollectionName;
  149. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  150. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  151. {
  152. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  153. }
  154. else
  155. {
  156. CollectionName = type.Name;
  157. }
  158. return await InitializeCollection(CollectionName, partitionKey);
  159. }
  160. private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
  161. {
  162. /////内存中已经存在这个表则直接返回
  163. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection DocumentCollection))
  164. {
  165. return DocumentCollection;
  166. }///如果没有则尝试默认创建
  167. else
  168. {
  169. DocumentCollection documentCollection = new DocumentCollection { Id = CollectionName };
  170. documentCollection.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  171. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  172. if (!string.IsNullOrEmpty(PartitionKey))
  173. {
  174. documentCollection.PartitionKey.Paths.Add("/" + PartitionKey);
  175. }
  176. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  177. documentCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  178. UriFactory.CreateDatabaseUri(Database), documentCollection, new RequestOptions { OfferThroughput = CollectionThroughput });
  179. DocumentCollectionDict.TryAdd(CollectionName, documentCollection);
  180. return documentCollection;
  181. }
  182. }
  183. private string GetPartitionKey<T>()
  184. {
  185. Type type = typeof(T);
  186. return GetPartitionKey(type);
  187. }
  188. private string GetPartitionKey(Type type)
  189. {
  190. PropertyInfo[] properties = type.GetProperties();
  191. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  192. foreach (PropertyInfo property in properties)
  193. {
  194. if (property.Name.Equals("PartitionKey"))
  195. {
  196. attrProperties.Add(property);
  197. break;
  198. }
  199. object[] attributes = property.GetCustomAttributes(true);
  200. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  201. {
  202. if (attribute is PartitionKeyAttribute)
  203. {
  204. attrProperties.Add(property);
  205. }
  206. }
  207. }
  208. if (attrProperties.Count <= 0)
  209. {
  210. throw new BizException(type.Name +"has no PartitionKey !!!!!!");
  211. }
  212. else
  213. {
  214. if (attrProperties.Count == 1)
  215. {
  216. return attrProperties[0].Name;
  217. }
  218. else { throw new BizException("PartitionKey can only be single!"); }
  219. }
  220. }
  221. public async Task<T> Save<T>(T entity) //where T : object, new()
  222. {
  223. try
  224. {
  225. Type t = typeof(T);
  226. DocumentCollection collection= await InitializeCollection<T>();
  227. ResourceResponse<Document> doc =
  228. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  229. //Console.WriteLine(doc.ActivityId);
  230. return entity;
  231. }
  232. catch (Exception e)
  233. {
  234. throw new BizException(e.Message);
  235. }
  236. }
  237. public async Task<T> Update<T>(T entity)
  238. {
  239. Type t = typeof(T);
  240. DocumentCollection collection = await InitializeCollection<T>();
  241. ResourceResponse<Document> doc =
  242. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  243. return entity;
  244. }
  245. public async Task<string> ReplaceObject<T>(T entity, string key)
  246. {
  247. Type t = typeof(T);
  248. DocumentCollection collection = await InitializeCollection<T>();
  249. try
  250. {
  251. ResourceResponse<Document> doc =
  252. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key), entity);
  253. return key;
  254. }
  255. catch (Exception e)
  256. {
  257. Console.WriteLine("{0} Exception caught.", e);
  258. //return false;
  259. }
  260. return null;
  261. }
  262. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  263. {
  264. Type t = typeof(T);
  265. DocumentCollection collection = await InitializeCollection<T>();
  266. try
  267. {
  268. ResourceResponse<Document> doc =
  269. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key),
  270. entity,
  271. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  272. return key;
  273. }
  274. catch (Exception e)
  275. {
  276. throw new BizException(e.Message);
  277. //Console.WriteLine("{0} Exception caught.", e);
  278. //return false;
  279. }
  280. }
  281. public async Task<List<T>> FindAll<T>()
  282. {
  283. Type t = typeof(T);
  284. Boolean open = true;
  285. List<T> objs = new List<T>();
  286. DocumentCollection collection = await InitializeCollection<T>();
  287. //查询条数 -1是全部
  288. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  289. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), queryOptions).AsDocumentQuery();
  290. while (query.HasMoreResults)
  291. {
  292. foreach (T obj in await query.ExecuteNextAsync())
  293. {
  294. objs.Add(obj);
  295. }
  296. }
  297. return objs;
  298. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  299. }
  300. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  301. {
  302. Type t = typeof(T);
  303. List<T> objs = new List<T>();
  304. DocumentCollection collection = await InitializeCollection<T>();
  305. //查询条数 -1是全部
  306. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  307. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  308. // query.Where();
  309. return objs;
  310. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  311. }
  312. public async Task<List<T>> FindSQL<T>(string sql)
  313. {
  314. Type t = typeof(T);
  315. //List<T> objs = new List<T>();
  316. DocumentCollection collection = await InitializeCollection<T>();
  317. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), sql);
  318. //foreach (var item in query)
  319. //{
  320. // objs.Add(item);
  321. //}
  322. return query.ToList<T>();
  323. }
  324. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  325. {
  326. Type t = typeof(T);
  327. //List<T> objs = new List<T>();
  328. // Boolean open = IsPk;
  329. DocumentCollection collection = await InitializeCollection<T>();
  330. //查询条数 -1是全部
  331. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  332. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql, queryOptions);
  333. //foreach (var item in query)
  334. //{
  335. // objs.Add(item);
  336. //}
  337. return query.ToList<T>();
  338. }
  339. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  340. {
  341. DocumentCollection collection = await InitializeCollection<T>();
  342. Type t = typeof(T);
  343. Boolean open = true;
  344. List<Filter> filters = new List<Filter>();
  345. string PKname = "";
  346. PropertyInfo[] properties = t.GetProperties();
  347. foreach (PropertyInfo property in properties)
  348. {
  349. object[] attributes = property.GetCustomAttributes(true);
  350. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  351. {
  352. if (attribute is PartitionKeyAttribute)
  353. {
  354. PKname = property.Name;
  355. break;
  356. }
  357. }
  358. }
  359. foreach (string key in dict.Keys)
  360. {
  361. //if (t.Name.Equals(key)) {
  362. // open = false;
  363. //}
  364. if (PKname.Equals(key))
  365. {
  366. open = false;
  367. }
  368. filters.Add(new Filter { Contrast = "and", Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  369. }
  370. //List<T> objs = new List<T>();
  371. await InitializeCollection<T>();
  372. //查询条数 -1是全部
  373. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  374. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  375. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  376. return list;
  377. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  378. }
  379. public async Task<string> DeleteAsync<T>(string id)
  380. {
  381. Type t = typeof(T);
  382. DocumentCollection collection = await InitializeCollection<T>();
  383. ResourceResponse<Document> doc =
  384. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, id));
  385. //Console.WriteLine(doc.ActivityId);
  386. return id;
  387. }
  388. public async Task<T> DeleteAsync<T>(T entity)
  389. {
  390. DocumentCollection collection = await InitializeCollection<T>();
  391. Type t = typeof(T);
  392. string PartitionKey = GetPartitionKey<T>();
  393. if (!string.IsNullOrEmpty(PartitionKey))
  394. {
  395. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  396. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  397. ResourceResponse<Document> doc =
  398. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  399. }
  400. else
  401. {
  402. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  403. ResourceResponse<Document> doc =
  404. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue));
  405. }
  406. //Console.WriteLine(doc.ActivityId);
  407. return entity;
  408. }
  409. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  410. {
  411. Type t = typeof(T);
  412. DocumentCollection collection = await InitializeCollection<T>();
  413. ResourceResponse<Document> doc =
  414. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  415. //Console.WriteLine(doc.ActivityId);
  416. return id;
  417. }
  418. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  419. {
  420. DocumentCollection dataCollection = await InitializeCollection<T>();
  421. // Set retry options high for initialization (default values).
  422. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  423. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  424. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  425. await bulkExecutor.InitializeAsync();
  426. // Set retries to 0 to pass control to bulk executor.
  427. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  428. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  429. BulkImportResponse bulkImportResponse = null;
  430. long totalNumberOfDocumentsInserted = 0;
  431. double totalRequestUnitsConsumed = 0;
  432. double totalTimeTakenSec = 0;
  433. var tokenSource = new CancellationTokenSource();
  434. var token = tokenSource.Token;
  435. int pageSize = 100;
  436. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  437. for (int i = 0; i < pages; i++)
  438. {
  439. List<string> documentsToImportInBatch = new List<string>();
  440. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  441. for (int j = 0; j < lists.Count; j++)
  442. {
  443. documentsToImportInBatch.Add(lists[j].ToJson());
  444. }
  445. var tasks = new List<Task>
  446. { Task.Run(async () =>
  447. {
  448. do
  449. {
  450. //try
  451. //{
  452. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  453. documents: documentsToImportInBatch,
  454. enableUpsert: true,
  455. disableAutomaticIdGeneration: true,
  456. maxConcurrencyPerPartitionKeyRange: null,
  457. maxInMemorySortingBatchSize: null,
  458. cancellationToken: token);
  459. //}
  460. //catch (DocumentClientException de)
  461. //{
  462. // break;
  463. //}
  464. //catch (Exception e)
  465. //{
  466. // break;
  467. //}
  468. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  469. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  470. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  471. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  472. },
  473. token)
  474. };
  475. await Task.WhenAll(tasks);
  476. }
  477. return enyites;
  478. }
  479. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  480. {
  481. DocumentCollection dataCollection = await InitializeCollection<T>();
  482. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  483. await bulkExecutor.InitializeAsync();
  484. BulkUpdateResponse bulkUpdateResponse = null;
  485. long totalNumberOfDocumentsUpdated = 0;
  486. double totalRequestUnitsConsumed = 0;
  487. double totalTimeTakenSec = 0;
  488. var tokenSource = new CancellationTokenSource();
  489. var token = tokenSource.Token;
  490. // Generate update operations.
  491. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  492. // Unset the description field.
  493. if (null != updateFilters && updateFilters.Count > 0)
  494. {
  495. var keys = updateFilters.Keys;
  496. foreach (string key in keys)
  497. {
  498. // updateOperations.Add(new SetUpdateOperation<string>())
  499. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  500. {
  501. updateOperations.Add(SwitchType(key, updateFilters[key]));
  502. }
  503. }
  504. }
  505. if (deleteKeys.IsNotEmpty())
  506. {
  507. foreach (string key in deleteKeys)
  508. {
  509. updateOperations.Add(new UnsetUpdateOperation(key));
  510. }
  511. }
  512. List<T> list = await FindByParams<T>(dict);
  513. int pageSize = 100;
  514. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  515. string partitionKey = "/" + GetPartitionKey<T>();
  516. Type type = typeof(T);
  517. for (int i = 0; i < pages; i++)
  518. {
  519. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  520. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  521. for (int j = 0; j < lists.Count; j++)
  522. {
  523. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  524. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  525. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  526. }
  527. var tasks = new List<Task>
  528. { Task.Run(async () =>
  529. {
  530. do
  531. {
  532. //try
  533. //{
  534. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  535. updateItems: updateItemsInBatch,
  536. maxConcurrencyPerPartitionKeyRange: null,
  537. cancellationToken: token);
  538. //}
  539. //catch (DocumentClientException de)
  540. //{
  541. // break;
  542. //}
  543. //catch (Exception e)
  544. //{
  545. // break;
  546. //}
  547. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  548. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  549. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  550. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  551. },
  552. token)
  553. };
  554. await Task.WhenAll(tasks);
  555. }
  556. return list;
  557. }
  558. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  559. {
  560. DocumentCollection dataCollection = await InitializeCollection<T>();
  561. List<T> list = await FindByParams<T>(dict);
  562. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  563. if (list.IsNotEmpty())
  564. {
  565. foreach (T t in list)
  566. {
  567. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  568. pkIdTuplesToDelete.Add(new Tuple<string, string>(id, id));
  569. }
  570. }
  571. else
  572. {
  573. return null;
  574. }
  575. long totalNumberOfDocumentsDeleted = 0;
  576. double totalRequestUnitsConsumed = 0;
  577. double totalTimeTakenSec = 0;
  578. BulkDeleteResponse bulkDeleteResponse = null;
  579. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  580. await bulkExecutor.InitializeAsync();
  581. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  582. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  583. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  584. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  585. return list;
  586. }
  587. private static UpdateOperation SwitchType(string key, object obj)
  588. {
  589. Type s = obj.GetType();
  590. TypeCode typeCode = Type.GetTypeCode(s);
  591. return typeCode switch
  592. {
  593. TypeCode.String => new SetUpdateOperation<string>(key, obj.ToString()),
  594. TypeCode.Int32 => new SetUpdateOperation<Int32>(key, (Int32)obj),
  595. TypeCode.Double => new SetUpdateOperation<Double>(key, (Double)obj),
  596. TypeCode.Byte => new SetUpdateOperation<Byte>(key, (Byte)obj),
  597. TypeCode.Boolean => new SetUpdateOperation<Boolean>(key, (Boolean)obj),
  598. TypeCode.DateTime => new SetUpdateOperation<DateTime>(key, (DateTime)obj),
  599. TypeCode.Int64 => new SetUpdateOperation<Int64>(key, (Int64)obj),
  600. _ => null,
  601. };
  602. }
  603. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk=true)
  604. {
  605. Type t = typeof(T);
  606. // List<T> objs = new List<T>();
  607. DocumentCollection collection= await InitializeCollection<T>();
  608. StringBuilder sql = new StringBuilder("select * from c where 1=1 ");
  609. if (dict != null)
  610. {
  611. foreach (string key in dict.Keys)
  612. {
  613. sql.Append(GenSql(dict[key], key));
  614. }
  615. }
  616. //查询条数 -1是全部
  617. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  618. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql.ToString(), queryOptions);
  619. //foreach (var item in query)
  620. //{
  621. // objs.Add(item);
  622. //}
  623. return query.ToList<T>();
  624. }
  625. private static string GenSql(object obj, string key)
  626. {
  627. if (obj is JArray array)
  628. {
  629. StringBuilder sql = new StringBuilder(" and c." + key + " in (");
  630. foreach (JValue obja in array) {
  631. if (obja.Value is string a) {
  632. sql.Append("'" + a + "',");
  633. }
  634. if (obja.Value is int b)
  635. {
  636. sql.Append(b + ",");
  637. }
  638. if (obja.Value is double c)
  639. {
  640. sql.Append(c + ",");
  641. }
  642. if (obja.Value is bool d)
  643. {
  644. sql.Append(d + ",");
  645. }
  646. if (obja.Value is long e)
  647. {
  648. sql.Append(e + ",");
  649. }
  650. if (obja.Value is DateTime f)
  651. {
  652. sql.Append(f + ",");
  653. }
  654. }
  655. string sqls= sql.ToString().Substring(0, sql.Length - 1);
  656. sqls += ") ";
  657. return sqls;
  658. }
  659. // if(va)
  660. ///处理JsonElement类型
  661. else {
  662. Type s = obj.GetType();
  663. TypeCode typeCode = Type.GetTypeCode(s);
  664. return typeCode switch
  665. {
  666. TypeCode.String => " and c." + key + "=" + "'" + obj.ToString() + "'",
  667. TypeCode.Int32 => " and c." + key + "=" + int.Parse(obj.ToString()),
  668. TypeCode.Double => " and c." + key + "=" + double.Parse(obj.ToString()),
  669. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  670. TypeCode.Boolean => " and c." + key + "=" + bool.Parse(obj.ToString()),
  671. TypeCode.DateTime => " and c." + key + "=" + (DateTime)obj,
  672. TypeCode.Int64 => " and c." + key + "=" + long.Parse(obj.ToString()),
  673. _ => null,
  674. };
  675. }
  676. }
  677. public IQueryable<dynamic> FindByDict(string CollectionName, Dictionary<string, object> dict)
  678. {
  679. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  680. {
  681. // collection = await InitializeCollection(CollectionName, "");
  682. /* StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
  683. if (dict != null)
  684. {
  685. foreach (string key in dict.Keys)
  686. {
  687. sql.Append(GenSql(dict[key], key));
  688. }
  689. }*/
  690. StringBuilder sql = new StringBuilder("select value(c) from c");
  691. if (dict != null)
  692. {
  693. string Join = " join ";
  694. string instring = "in ";
  695. Dictionary<string, string> keyValues = new Dictionary<string, string>();
  696. StringBuilder ps = new StringBuilder();
  697. int heada = 0;
  698. string[] sqlHead = new string[] { "A", "B", "C", "D", "E", "F" };
  699. int kslength = 0;
  700. foreach (KeyValuePair<string, object> item in dict)
  701. {
  702. int index = 0;
  703. string[] ks = item.Key.Split("[*]");
  704. if (ks.Length > 1)
  705. {
  706. kslength += ks.Length;
  707. if (kslength < (7 + heada))
  708. {
  709. StringBuilder sqlitem = new StringBuilder();
  710. for (int i = 0; i < ks.Length - 1; i++)
  711. {
  712. //Console.WriteLine(ks[i]);
  713. if (i == 0)
  714. {
  715. sqlitem.Append(Join);
  716. string a = sqlHead[heada] + index;
  717. sqlitem.Append(a + " ");
  718. //keyValues.Add(ks[i], a);
  719. keyValues[ks[i]] = a;
  720. sqlitem.Append(instring);
  721. sqlitem.Append("c.");
  722. sqlitem.Append(ks[i]);
  723. }
  724. else
  725. {
  726. sqlitem.Append(Join + " ");
  727. string a = sqlHead[heada] + index;
  728. sqlitem.Append(a + " ");
  729. //keyValues.Add(ks[i], a);
  730. keyValues[ks[i]] = a;
  731. sqlitem.Append(instring);
  732. sqlitem.Append(keyValues[ks[i - 1]]);
  733. sqlitem.Append(ks[i]);
  734. }
  735. index += 1;
  736. }
  737. // Console.WriteLine(sqlitem);
  738. sql.Append(sqlitem);
  739. string s;
  740. s = "and " + sqlHead[heada] + (ks.Length - 2) + ks[index] + " = " + "\"" + item.Value.ToString() + "\"";
  741. ps.Append(s);
  742. }
  743. else
  744. {
  745. throw new BizException("数组总共深度不能超过5层",ResponseCode.PARAMS_ERROR);
  746. }
  747. }
  748. else
  749. {
  750. ps.Append(GenSql(dict[item.Key], item.Key));
  751. }
  752. heada += 1;
  753. }
  754. sql.Append(" where 1=1 ").Append(ps);
  755. }
  756. FeedOptions queryOptions;
  757. if (collection.PartitionKey.Paths.Count > 0)
  758. {
  759. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  760. }
  761. else
  762. {
  763. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  764. }
  765. //查询条数 -1是全部
  766. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  767. return query;
  768. }
  769. else {
  770. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  771. }
  772. }
  773. public IQueryable<dynamic> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
  774. {
  775. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  776. {
  777. // collection = await InitializeCollection(CollectionName, "");
  778. /* StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
  779. if (dict != null)
  780. {
  781. foreach (string key in dict.Keys)
  782. {
  783. sql.Append(GenSql(dict[key], key));
  784. }
  785. }*/
  786. StringBuilder sql = new StringBuilder("select value count(c) from c");
  787. if (dict != null)
  788. {
  789. string Join = " join ";
  790. string instring = "in ";
  791. Dictionary<string, string> keyValues = new Dictionary<string, string>();
  792. StringBuilder ps = new StringBuilder();
  793. int heada = 0;
  794. string[] sqlHead = new string[] { "A", "B", "C", "D", "E", "F" };
  795. int kslength = 0;
  796. foreach (KeyValuePair<string, object> item in dict)
  797. {
  798. int index = 0;
  799. string[] ks = item.Key.Split("[*]");
  800. if (ks.Length > 1)
  801. {
  802. kslength += ks.Length;
  803. if (kslength < (7 + heada))
  804. {
  805. StringBuilder sqlitem = new StringBuilder();
  806. for (int i = 0; i < ks.Length - 1; i++)
  807. {
  808. //Console.WriteLine(ks[i]);
  809. if (i == 0)
  810. {
  811. sqlitem.Append(Join);
  812. string a = sqlHead[heada] + index;
  813. sqlitem.Append(a + " ");
  814. //keyValues.Add(ks[i], a);
  815. keyValues[ks[i]] = a;
  816. sqlitem.Append(instring);
  817. sqlitem.Append("c.");
  818. sqlitem.Append(ks[i]);
  819. }
  820. else
  821. {
  822. sqlitem.Append(Join + " ");
  823. string a = sqlHead[heada] + index;
  824. sqlitem.Append(a + " ");
  825. //keyValues.Add(ks[i], a);
  826. keyValues[ks[i]] = a;
  827. sqlitem.Append(instring);
  828. sqlitem.Append(keyValues[ks[i - 1]]);
  829. sqlitem.Append(ks[i]);
  830. }
  831. index += 1;
  832. }
  833. // Console.WriteLine(sqlitem);
  834. sql.Append(sqlitem);
  835. string s;
  836. s = "and " + sqlHead[heada] + (ks.Length - 2) + ks[index] + " = " + "\"" + item.Value.ToString() + "\"";
  837. ps.Append(s);
  838. }
  839. else
  840. {
  841. throw new BizException("数组总共深度不能超过5层", ResponseCode.PARAMS_ERROR);
  842. }
  843. }
  844. else
  845. {
  846. ps.Append(GenSql(dict[item.Key], item.Key));
  847. }
  848. heada += 1;
  849. }
  850. sql.Append(" where 1=1 ").Append(ps);
  851. }
  852. FeedOptions queryOptions;
  853. if (collection.PartitionKey.Paths.Count > 0)
  854. {
  855. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  856. }
  857. else
  858. {
  859. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  860. }
  861. //查询条数 -1是全部
  862. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  863. return query;
  864. }
  865. else
  866. {
  867. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  868. }
  869. }
  870. }
  871. }