AzureCosmosDBRepository.cs 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.Cosmos;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor;
  16. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  17. using System.Threading;
  18. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  19. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  20. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  21. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  22. using TEAMModelOS.SDK.Context.Attributes.Azure;
  23. using System.Text;
  24. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  25. using Microsoft.AspNetCore.Hosting;
  26. using System.Collections.Concurrent;
  27. using DataType = Microsoft.Azure.Documents.DataType;
  28. using RequestOptions = Microsoft.Azure.Documents.Client.RequestOptions;
  29. using PartitionKey = Microsoft.Azure.Documents.PartitionKey;
  30. using Newtonsoft.Json.Linq;
  31. using TEAMModelOS.SDK.Helper.Common.LogHelper;
  32. using System.Text.Json;
  33. using System.Collections;
  34. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  35. { /// <summary>
  36. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  37. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  38. /// </summary>
  39. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  40. {
  41. /// <summary>
  42. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  43. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  44. /// </summary>
  45. private DocumentClient CosmosClient { get; set; }
  46. /// <summary>
  47. /// 线程安全的dict类型
  48. /// </summary>
  49. private ConcurrentDictionary<string, DocumentCollection> DocumentCollectionDict { get; set; } = new ConcurrentDictionary<string, DocumentCollection>();
  50. // private DocumentCollection CosmosCollection { get; set; }
  51. private string Database { get; set; }
  52. private int CollectionThroughput { get; set; }
  53. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  54. {
  55. try
  56. {
  57. if (!string.IsNullOrEmpty(options.ConnectionString))
  58. {
  59. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  60. }
  61. else
  62. {
  63. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  64. }
  65. Database = options.Database;
  66. CollectionThroughput = options.CollectionThroughput;
  67. CosmosClient.CreateDatabaseIfNotExistsAsync(new Microsoft.Azure.Documents.Database { Id = Database });
  68. // _connectionString = options.ConnectionString;
  69. // CosmosSerializer
  70. //获取数据库所有的表
  71. Microsoft.Azure.Documents.Client.FeedResponse<DocumentCollection> collections = CosmosClient.ReadDocumentCollectionFeedAsync(UriFactory.CreateDatabaseUri(Database)).GetAwaiter().GetResult();
  72. foreach (IGrouping<string, DocumentCollection> group in collections.GroupBy(c => c.Id))
  73. {
  74. DocumentCollectionDict.TryAdd(group.Key, group.First());
  75. }
  76. //collections
  77. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(options.ScanModel);
  78. foreach (Type type in types)
  79. {
  80. string PartitionKey = GetPartitionKey(type);
  81. string CollectionName = "";
  82. int RU = 0;
  83. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  84. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  85. {
  86. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  87. }
  88. else
  89. {
  90. CollectionName = type.Name;
  91. }
  92. if (attributes.First<CosmosDBAttribute>().RU > 400)
  93. {
  94. RU = attributes.First<CosmosDBAttribute>().RU;
  95. }
  96. else
  97. {
  98. RU = options.CollectionThroughput;
  99. }
  100. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  101. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  102. {
  103. Offer offer = CosmosClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
  104. OfferV2 offerV2 = (OfferV2)offer;
  105. //更新RU
  106. if (offerV2.Content.OfferThroughput < RU)
  107. {
  108. CosmosClient.ReplaceOfferAsync(new OfferV2(offer, RU));
  109. }
  110. }
  111. else
  112. {
  113. DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
  114. collectionDefinition.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  115. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  116. if (!string.IsNullOrEmpty(PartitionKey))
  117. {
  118. collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
  119. }
  120. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  121. if (RU > CollectionThroughput)
  122. {
  123. CollectionThroughput = RU;
  124. }
  125. DocumentCollection DocumentCollection = CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  126. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new Microsoft.Azure.Documents.Client.RequestOptions { OfferThroughput = CollectionThroughput }).GetAwaiter().GetResult();
  127. DocumentCollectionDict.TryAdd(CollectionName, DocumentCollection);
  128. }
  129. }
  130. }
  131. catch (DocumentClientException de)
  132. {
  133. Exception baseException = de.GetBaseException();
  134. //Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  135. }
  136. catch (Exception e)
  137. {
  138. Exception baseException = e.GetBaseException();
  139. //Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  140. }
  141. finally
  142. {
  143. // Console.WriteLine("End of demo, press any key to exit.");
  144. // Console.ReadKey();
  145. }
  146. }
  147. private async Task<DocumentCollection> InitializeCollection<T>()
  148. {
  149. Type type = typeof(T);
  150. string partitionKey = GetPartitionKey<T>();
  151. string CollectionName;
  152. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  153. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  154. {
  155. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  156. }
  157. else
  158. {
  159. CollectionName = type.Name;
  160. }
  161. return await InitializeCollection(CollectionName, partitionKey);
  162. }
  163. private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
  164. {
  165. /////内存中已经存在这个表则直接返回
  166. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection DocumentCollection))
  167. {
  168. return DocumentCollection;
  169. }///如果没有则尝试默认创建
  170. else
  171. {
  172. DocumentCollection documentCollection = new DocumentCollection { Id = CollectionName };
  173. documentCollection.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  174. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  175. if (!string.IsNullOrEmpty(PartitionKey))
  176. {
  177. documentCollection.PartitionKey.Paths.Add("/" + PartitionKey);
  178. }
  179. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  180. documentCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  181. UriFactory.CreateDatabaseUri(Database), documentCollection, new RequestOptions { OfferThroughput = CollectionThroughput });
  182. DocumentCollectionDict.TryAdd(CollectionName, documentCollection);
  183. return documentCollection;
  184. }
  185. }
  186. private string GetPartitionKey<T>()
  187. {
  188. Type type = typeof(T);
  189. return GetPartitionKey(type);
  190. }
  191. private string GetPartitionKey(Type type)
  192. {
  193. PropertyInfo[] properties = type.GetProperties();
  194. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  195. foreach (PropertyInfo property in properties)
  196. {
  197. if (property.Name.Equals("PartitionKey"))
  198. {
  199. attrProperties.Add(property);
  200. break;
  201. }
  202. object[] attributes = property.GetCustomAttributes(true);
  203. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  204. {
  205. if (attribute is PartitionKeyAttribute)
  206. {
  207. attrProperties.Add(property);
  208. }
  209. }
  210. }
  211. if (attrProperties.Count <= 0)
  212. {
  213. throw new BizException(type.Name +"has no PartitionKey !!!!!!");
  214. }
  215. else
  216. {
  217. if (attrProperties.Count == 1)
  218. {
  219. return attrProperties[0].Name;
  220. }
  221. else { throw new BizException("PartitionKey can only be single!"); }
  222. }
  223. }
  224. public async Task<T> Save<T>(T entity) //where T : object, new()
  225. {
  226. try
  227. {
  228. Type t = typeof(T);
  229. DocumentCollection collection= await InitializeCollection<T>();
  230. ResourceResponse<Document> doc =
  231. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  232. //Console.WriteLine(doc.ActivityId);
  233. return entity;
  234. }
  235. catch (Exception e)
  236. {
  237. throw new BizException(e.Message);
  238. }
  239. }
  240. public async Task<T> Update<T>(T entity)
  241. {
  242. Type t = typeof(T);
  243. DocumentCollection collection = await InitializeCollection<T>();
  244. ResourceResponse<Document> doc =
  245. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  246. return entity;
  247. }
  248. public async Task<string> ReplaceObject<T>(T entity, string key)
  249. {
  250. Type t = typeof(T);
  251. DocumentCollection collection = await InitializeCollection<T>();
  252. try
  253. {
  254. ResourceResponse<Document> doc =
  255. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key), entity);
  256. return key;
  257. }
  258. catch (Exception e)
  259. {
  260. Console.WriteLine("{0} Exception caught.", e);
  261. //return false;
  262. }
  263. return null;
  264. }
  265. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  266. {
  267. Type t = typeof(T);
  268. DocumentCollection collection = await InitializeCollection<T>();
  269. try
  270. {
  271. ResourceResponse<Document> doc =
  272. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key),
  273. entity,
  274. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  275. return key;
  276. }
  277. catch (Exception e)
  278. {
  279. throw new BizException(e.Message);
  280. //Console.WriteLine("{0} Exception caught.", e);
  281. //return false;
  282. }
  283. }
  284. public async Task<List<T>> FindAll<T>()
  285. {
  286. Type t = typeof(T);
  287. Boolean open = true;
  288. List<T> objs = new List<T>();
  289. DocumentCollection collection = await InitializeCollection<T>();
  290. //查询条数 -1是全部
  291. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  292. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), queryOptions).AsDocumentQuery();
  293. while (query.HasMoreResults)
  294. {
  295. foreach (T obj in await query.ExecuteNextAsync())
  296. {
  297. objs.Add(obj);
  298. }
  299. }
  300. return objs;
  301. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  302. }
  303. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  304. {
  305. Type t = typeof(T);
  306. List<T> objs = new List<T>();
  307. DocumentCollection collection = await InitializeCollection<T>();
  308. //查询条数 -1是全部
  309. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  310. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  311. // query.Where();
  312. return objs;
  313. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  314. }
  315. public async Task<List<T>> FindSQL<T>(string sql)
  316. {
  317. Type t = typeof(T);
  318. //List<T> objs = new List<T>();
  319. DocumentCollection collection = await InitializeCollection<T>();
  320. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), sql);
  321. //foreach (var item in query)
  322. //{
  323. // objs.Add(item);
  324. //}
  325. return query.ToList<T>();
  326. }
  327. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  328. {
  329. Type t = typeof(T);
  330. //List<T> objs = new List<T>();
  331. // Boolean open = IsPk;
  332. DocumentCollection collection = await InitializeCollection<T>();
  333. //查询条数 -1是全部
  334. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  335. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql, queryOptions);
  336. //foreach (var item in query)
  337. //{
  338. // objs.Add(item);
  339. //}
  340. return query.ToList<T>();
  341. }
  342. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  343. {
  344. DocumentCollection collection = await InitializeCollection<T>();
  345. Type t = typeof(T);
  346. Boolean open = true;
  347. List<Filter> filters = new List<Filter>();
  348. string PKname = "";
  349. PropertyInfo[] properties = t.GetProperties();
  350. foreach (PropertyInfo property in properties)
  351. {
  352. object[] attributes = property.GetCustomAttributes(true);
  353. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  354. {
  355. if (attribute is PartitionKeyAttribute)
  356. {
  357. PKname = property.Name;
  358. break;
  359. }
  360. }
  361. }
  362. foreach (string key in dict.Keys)
  363. {
  364. //if (t.Name.Equals(key)) {
  365. // open = false;
  366. //}
  367. if (PKname.Equals(key))
  368. {
  369. open = false;
  370. }
  371. filters.Add(new Filter { Contrast = "and", Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  372. }
  373. //List<T> objs = new List<T>();
  374. await InitializeCollection<T>();
  375. //查询条数 -1是全部
  376. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  377. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  378. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  379. return list;
  380. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  381. }
  382. public async Task<string> DeleteAsync<T>(string id)
  383. {
  384. Type t = typeof(T);
  385. DocumentCollection collection = await InitializeCollection<T>();
  386. ResourceResponse<Document> doc =
  387. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, id));
  388. //Console.WriteLine(doc.ActivityId);
  389. return id;
  390. }
  391. public async Task<T> DeleteAsync<T>(T entity)
  392. {
  393. DocumentCollection collection = await InitializeCollection<T>();
  394. Type t = typeof(T);
  395. string PartitionKey = GetPartitionKey<T>();
  396. if (!string.IsNullOrEmpty(PartitionKey))
  397. {
  398. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  399. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  400. ResourceResponse<Document> doc =
  401. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  402. }
  403. else
  404. {
  405. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  406. ResourceResponse<Document> doc =
  407. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue));
  408. }
  409. //log4net 日志記錄
  410. string uuidKey = Guid.NewGuid().ToString();
  411. string logkey = "【" + uuidKey + "】";
  412. LogHelper.Info(this,
  413. logkey
  414. +"【删除】"
  415. + "【表】"
  416. + t.Name
  417. + "【数据】"
  418. + entity.ToJson()
  419. + logkey);
  420. //Console.WriteLine(doc.ActivityId);
  421. return entity;
  422. }
  423. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  424. {
  425. Type t = typeof(T);
  426. DocumentCollection collection = await InitializeCollection<T>();
  427. ResourceResponse<Document> doc =
  428. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  429. //Console.WriteLine(doc.ActivityId);
  430. return id;
  431. }
  432. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  433. {
  434. DocumentCollection dataCollection = await InitializeCollection<T>();
  435. // Set retry options high for initialization (default values).
  436. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  437. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  438. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  439. await bulkExecutor.InitializeAsync();
  440. // Set retries to 0 to pass control to bulk executor.
  441. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  442. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  443. BulkImportResponse bulkImportResponse = null;
  444. long totalNumberOfDocumentsInserted = 0;
  445. double totalRequestUnitsConsumed = 0;
  446. double totalTimeTakenSec = 0;
  447. var tokenSource = new CancellationTokenSource();
  448. var token = tokenSource.Token;
  449. int pageSize = 50;
  450. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  451. for (int i = 0; i < pages; i++)
  452. {
  453. List<string> documentsToImportInBatch = new List<string>();
  454. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  455. for (int j = 0; j < lists.Count; j++)
  456. {
  457. documentsToImportInBatch.Add(lists[j].ToJson());
  458. }
  459. var tasks = new List<Task>
  460. { Task.Run(async () =>
  461. {
  462. do
  463. {
  464. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  465. documents: documentsToImportInBatch,
  466. enableUpsert: true,
  467. disableAutomaticIdGeneration: true,
  468. maxConcurrencyPerPartitionKeyRange: null,
  469. maxInMemorySortingBatchSize: null,
  470. cancellationToken: token);
  471. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  472. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  473. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  474. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  475. },
  476. token)
  477. };
  478. await Task.WhenAll(tasks);
  479. }
  480. return enyites;
  481. }
  482. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  483. {
  484. DocumentCollection dataCollection = await InitializeCollection<T>();
  485. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  486. await bulkExecutor.InitializeAsync();
  487. BulkUpdateResponse bulkUpdateResponse = null;
  488. long totalNumberOfDocumentsUpdated = 0;
  489. double totalRequestUnitsConsumed = 0;
  490. double totalTimeTakenSec = 0;
  491. var tokenSource = new CancellationTokenSource();
  492. var token = tokenSource.Token;
  493. // Generate update operations.
  494. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  495. // Unset the description field.
  496. if (null != updateFilters && updateFilters.Count > 0)
  497. {
  498. var keys = updateFilters.Keys;
  499. foreach (string key in keys)
  500. {
  501. // updateOperations.Add(new SetUpdateOperation<string>())
  502. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  503. {
  504. updateOperations.Add(SwitchType(key, updateFilters[key]));
  505. }
  506. }
  507. }
  508. if (deleteKeys.IsNotEmpty())
  509. {
  510. foreach (string key in deleteKeys)
  511. {
  512. updateOperations.Add(new UnsetUpdateOperation(key));
  513. }
  514. }
  515. List<T> list = await FindByParams<T>(dict);
  516. int pageSize = 100;
  517. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  518. string partitionKey = "/" + GetPartitionKey<T>();
  519. Type type = typeof(T);
  520. for (int i = 0; i < pages; i++)
  521. {
  522. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  523. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  524. for (int j = 0; j < lists.Count; j++)
  525. {
  526. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  527. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  528. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  529. }
  530. var tasks = new List<Task>
  531. { Task.Run(async () =>
  532. {
  533. do
  534. {
  535. //try
  536. //{
  537. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  538. updateItems: updateItemsInBatch,
  539. maxConcurrencyPerPartitionKeyRange: null,
  540. cancellationToken: token);
  541. //}
  542. //catch (DocumentClientException de)
  543. //{
  544. // break;
  545. //}
  546. //catch (Exception e)
  547. //{
  548. // break;
  549. //}
  550. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  551. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  552. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  553. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  554. },
  555. token)
  556. };
  557. await Task.WhenAll(tasks);
  558. }
  559. return list;
  560. }
  561. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  562. {
  563. DocumentCollection dataCollection = await InitializeCollection<T>();
  564. List<T> list = await FindByDict<T>(dict);
  565. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  566. string PartitionKey = GetPartitionKey<T>();
  567. if (list.IsNotEmpty())
  568. {
  569. foreach (T t in list)
  570. {
  571. string pkValue = t.GetType().GetProperty(PartitionKey).GetValue(t).ToString();
  572. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  573. pkIdTuplesToDelete.Add(new Tuple<string, string>(pkValue, id));
  574. //log4net 日志記錄
  575. string uuidKey = Guid.NewGuid().ToString();
  576. string logkey = "【" + uuidKey + "】";
  577. LogHelper.Info(this,
  578. logkey
  579. + "【删除】"
  580. + "【表】"
  581. + t.GetType()
  582. + "【数据】"
  583. + t.ToJson()
  584. + logkey);
  585. }
  586. }
  587. else
  588. {
  589. return list;
  590. }
  591. long totalNumberOfDocumentsDeleted = 0;
  592. double totalRequestUnitsConsumed = 0;
  593. double totalTimeTakenSec = 0;
  594. BulkDeleteResponse bulkDeleteResponse = null;
  595. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  596. await bulkExecutor.InitializeAsync();
  597. try
  598. {
  599. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  600. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  601. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  602. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  603. }
  604. catch (DocumentClientException de)
  605. {
  606. throw new BizException("Document client exception: {0}"+ de);
  607. }
  608. catch (Exception e)
  609. {
  610. throw new BizException("Exception: "+ e);
  611. }
  612. return list;
  613. }
  614. private static UpdateOperation SwitchType(string key, object obj)
  615. {
  616. Type s = obj.GetType();
  617. TypeCode typeCode = Type.GetTypeCode(s);
  618. return typeCode switch
  619. {
  620. TypeCode.String => new SetUpdateOperation<string>(key, obj.ToString()),
  621. TypeCode.Int32 => new SetUpdateOperation<Int32>(key, (Int32)obj),
  622. TypeCode.Double => new SetUpdateOperation<Double>(key, (Double)obj),
  623. TypeCode.Byte => new SetUpdateOperation<Byte>(key, (Byte)obj),
  624. TypeCode.Boolean => new SetUpdateOperation<Boolean>(key, (Boolean)obj),
  625. TypeCode.DateTime => new SetUpdateOperation<DateTime>(key, (DateTime)obj),
  626. TypeCode.Int64 => new SetUpdateOperation<Int64>(key, (Int64)obj),
  627. _ => null,
  628. };
  629. }
  630. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk=true)
  631. {
  632. Type t = typeof(T);
  633. // List<T> objs = new List<T>();
  634. DocumentCollection collection= await InitializeCollection<T>();
  635. StringBuilder sql = new StringBuilder("select value(c) from c");
  636. SQLHelper.GetSQL(dict,ref sql);
  637. //查询条数 -1是全部
  638. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  639. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql.ToString(), queryOptions);
  640. //foreach (var item in query)
  641. //{
  642. // objs.Add(item);
  643. //}
  644. return query.ToList<T>();
  645. }
  646. private static string GenSql(object obj, string key)
  647. {
  648. if (obj is JArray array)
  649. {
  650. StringBuilder sql = new StringBuilder(" and c." + key + " in (");
  651. foreach (JValue obja in array)
  652. {
  653. if (obja.Value is string a)
  654. {
  655. sql.Append("'" + a + "',");
  656. }
  657. if (obja.Value is int b)
  658. {
  659. sql.Append(b + ",");
  660. }
  661. if (obja.Value is double c)
  662. {
  663. sql.Append(c + ",");
  664. }
  665. if (obja.Value is bool d)
  666. {
  667. sql.Append(d + ",");
  668. }
  669. if (obja.Value is long e)
  670. {
  671. sql.Append(e + ",");
  672. }
  673. if (obja.Value is DateTime f)
  674. {
  675. sql.Append(f + ",");
  676. }
  677. }
  678. string sqls = sql.ToString().Substring(0, sql.Length - 1);
  679. sqls += ") ";
  680. return sqls;
  681. }
  682. else if (key.StartsWith("$."))
  683. {
  684. Type s = obj.GetType();
  685. TypeCode typeCode = Type.GetTypeCode(s);
  686. string keyLike = key.Replace("$.", "");
  687. return typeCode switch
  688. {
  689. TypeCode.String => "and " + "Contains( c." + keyLike + " , \'" + obj.ToString() + "\') = true ",
  690. TypeCode.Int32 => "and " + "Contains( c." + keyLike + " , \'" + int.Parse(obj.ToString()) + "\') = true ",
  691. TypeCode.Double => "and " + "Contains( c." + keyLike + " , \'" + double.Parse(obj.ToString()) + "\') = true ",
  692. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  693. TypeCode.Boolean => "and " + "Contains( c." + keyLike + " , \'" + bool.Parse(obj.ToString()) + "\') = true ",
  694. TypeCode.DateTime => "and " + "Contains( c." + keyLike + " , \'" + (DateTime)obj + "\') = true ",
  695. TypeCode.Int64 => "and " + "Contains( c." + keyLike + " , \'" + long.Parse(obj.ToString()) + "\') = true ",
  696. _ => null,
  697. };
  698. }
  699. else
  700. {
  701. Type s = obj.GetType();
  702. TypeCode typeCode = Type.GetTypeCode(s);
  703. return typeCode switch
  704. {
  705. TypeCode.String => " and c." + key + "=" + "'" + obj.ToString() + "'",
  706. TypeCode.Int32 => " and c." + key + "=" + int.Parse(obj.ToString()),
  707. TypeCode.Double => " and c." + key + "=" + double.Parse(obj.ToString()),
  708. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  709. TypeCode.Boolean => " and c." + key + "=" + bool.Parse(obj.ToString()),
  710. TypeCode.DateTime => " and c." + key + "=" + (DateTime)obj,
  711. TypeCode.Int64 => " and c." + key + "=" + long.Parse(obj.ToString()),
  712. _ => null,
  713. };
  714. }
  715. }
  716. public IQueryable<dynamic> FindByDict(string CollectionName, Dictionary<string, object> dict)
  717. {
  718. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  719. {
  720. // collection = await InitializeCollection(CollectionName, "");
  721. /* StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
  722. if (dict != null)
  723. {
  724. foreach (string key in dict.Keys)
  725. {
  726. sql.Append(GenSql(dict[key], key));
  727. }
  728. }*/
  729. StringBuilder sql = new StringBuilder("select value(c) from c");
  730. SQLHelper.GetSQL(dict,ref sql);
  731. FeedOptions queryOptions;
  732. if (collection.PartitionKey.Paths.Count > 0)
  733. {
  734. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  735. }
  736. else
  737. {
  738. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  739. }
  740. //查询条数 -1是全部
  741. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  742. return query;
  743. }
  744. else {
  745. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  746. }
  747. }
  748. public IQueryable<dynamic> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
  749. {
  750. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  751. {
  752. StringBuilder sql = new StringBuilder("select value count(c) from c");
  753. SQLHelper.GetSQL(dict, ref sql);
  754. FeedOptions queryOptions;
  755. if (collection.PartitionKey.Paths.Count > 0)
  756. {
  757. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  758. }
  759. else
  760. {
  761. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  762. }
  763. //查询条数 -1是全部
  764. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  765. return query;
  766. }
  767. else
  768. {
  769. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  770. }
  771. }
  772. private static void GetPageNum(Dictionary<string, object> dict, ref int offsetNum, ref int limitNum, ref bool pageBool)
  773. {
  774. dict.TryGetValue("OFFSET", out object offset);
  775. dict.Remove("OFFSET");
  776. dict.TryGetValue("LIMIT", out object limit);
  777. dict.Remove("LIMIT");
  778. if (offset != null && limit != null)
  779. {
  780. pageBool = true;
  781. offsetNum = int.Parse(offset.ToString());
  782. limitNum = int.Parse(limit.ToString());
  783. }
  784. }
  785. }
  786. }