AzureCosmosDBRepository.cs 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  10. using TEAMModelOS.SDK.Context.Exception;
  11. using Microsoft.Azure.Documents.Linq;
  12. using TEAMModelOS.SDK.Helper.Query.LinqHelper;
  13. using System.Reflection;
  14. using Microsoft.Azure.Cosmos;
  15. using Microsoft.Azure.CosmosDB.BulkExecutor;
  16. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  17. using System.Threading;
  18. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  19. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  20. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  21. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
  22. using TEAMModelOS.SDK.Context.Attributes.Azure;
  23. using System.Text;
  24. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  25. using Microsoft.AspNetCore.Hosting;
  26. using System.Collections.Concurrent;
  27. using DataType = Microsoft.Azure.Documents.DataType;
  28. using RequestOptions = Microsoft.Azure.Documents.Client.RequestOptions;
  29. using PartitionKey = Microsoft.Azure.Documents.PartitionKey;
  30. using Newtonsoft.Json.Linq;
  31. using TEAMModelOS.SDK.Helper.Common.LogHelper;
  32. using System.Text.Json;
  33. using System.Collections;
  34. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  35. { /// <summary>
  36. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  37. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  38. /// </summary>
  39. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  40. {
  41. /// <summary>
  42. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  43. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  44. /// </summary>
  45. private DocumentClient CosmosClient { get; set; }
  46. /// <summary>
  47. /// 线程安全的dict类型
  48. /// </summary>
  49. private ConcurrentDictionary<string, DocumentCollection> DocumentCollectionDict { get; set; } = new ConcurrentDictionary<string, DocumentCollection>();
  50. // private DocumentCollection CosmosCollection { get; set; }
  51. private string Database { get; set; }
  52. private int CollectionThroughput { get; set; }
  53. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  54. {
  55. try
  56. {
  57. if (!string.IsNullOrEmpty(options.ConnectionString))
  58. {
  59. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  60. }
  61. else
  62. {
  63. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  64. }
  65. Database = options.Database;
  66. CollectionThroughput = options.CollectionThroughput;
  67. CosmosClient.CreateDatabaseIfNotExistsAsync(new Microsoft.Azure.Documents.Database { Id = Database });
  68. // _connectionString = options.ConnectionString;
  69. // CosmosSerializer
  70. //获取数据库所有的表
  71. Microsoft.Azure.Documents.Client.FeedResponse<DocumentCollection> collections = CosmosClient.ReadDocumentCollectionFeedAsync(UriFactory.CreateDatabaseUri(Database)).GetAwaiter().GetResult();
  72. foreach (IGrouping<string, DocumentCollection> group in collections.GroupBy(c => c.Id))
  73. {
  74. DocumentCollectionDict.TryAdd(group.Key, group.First());
  75. }
  76. //collections
  77. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(options.ScanModel);
  78. foreach (Type type in types)
  79. {
  80. string PartitionKey = GetPartitionKey(type);
  81. string CollectionName = "";
  82. int RU = 0;
  83. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  84. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  85. {
  86. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  87. }
  88. else
  89. {
  90. CollectionName = type.Name;
  91. }
  92. if (attributes.First<CosmosDBAttribute>().RU > 400)
  93. {
  94. RU = attributes.First<CosmosDBAttribute>().RU;
  95. }
  96. else
  97. {
  98. RU = options.CollectionThroughput;
  99. }
  100. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  101. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  102. {
  103. Offer offer = CosmosClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
  104. OfferV2 offerV2 = (OfferV2)offer;
  105. //更新RU
  106. if (offerV2.Content.OfferThroughput < RU)
  107. {
  108. CosmosClient.ReplaceOfferAsync(new OfferV2(offer, RU));
  109. }
  110. }
  111. else
  112. {
  113. DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
  114. collectionDefinition.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  115. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  116. if (!string.IsNullOrEmpty(PartitionKey))
  117. {
  118. collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
  119. }
  120. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  121. if (RU > CollectionThroughput)
  122. {
  123. CollectionThroughput = RU;
  124. }
  125. DocumentCollection DocumentCollection = CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  126. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new Microsoft.Azure.Documents.Client.RequestOptions { OfferThroughput = CollectionThroughput }).GetAwaiter().GetResult();
  127. DocumentCollectionDict.TryAdd(CollectionName, DocumentCollection);
  128. }
  129. }
  130. }
  131. catch (DocumentClientException de)
  132. {
  133. Exception baseException = de.GetBaseException();
  134. //Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  135. }
  136. catch (Exception e)
  137. {
  138. Exception baseException = e.GetBaseException();
  139. //Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  140. }
  141. finally
  142. {
  143. // Console.WriteLine("End of demo, press any key to exit.");
  144. // Console.ReadKey();
  145. }
  146. }
  147. private async Task<DocumentCollection> InitializeCollection<T>()
  148. {
  149. Type type = typeof(T);
  150. string partitionKey = GetPartitionKey<T>();
  151. string CollectionName;
  152. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  153. if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  154. {
  155. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  156. }
  157. else
  158. {
  159. CollectionName = type.Name;
  160. }
  161. return await InitializeCollection(CollectionName, partitionKey);
  162. }
  163. private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
  164. {
  165. /////内存中已经存在这个表则直接返回
  166. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection DocumentCollection))
  167. {
  168. return DocumentCollection;
  169. }///如果没有则尝试默认创建
  170. else
  171. {
  172. DocumentCollection documentCollection = new DocumentCollection { Id = CollectionName };
  173. documentCollection.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  174. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  175. if (!string.IsNullOrEmpty(PartitionKey))
  176. {
  177. documentCollection.PartitionKey.Paths.Add("/" + PartitionKey);
  178. }
  179. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  180. documentCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  181. UriFactory.CreateDatabaseUri(Database), documentCollection, new RequestOptions { OfferThroughput = CollectionThroughput });
  182. DocumentCollectionDict.TryAdd(CollectionName, documentCollection);
  183. return documentCollection;
  184. }
  185. }
  186. private string GetPartitionKey<T>()
  187. {
  188. Type type = typeof(T);
  189. return GetPartitionKey(type);
  190. }
  191. private string GetPartitionKey(Type type)
  192. {
  193. PropertyInfo[] properties = type.GetProperties();
  194. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  195. foreach (PropertyInfo property in properties)
  196. {
  197. if (property.Name.Equals("PartitionKey"))
  198. {
  199. attrProperties.Add(property);
  200. break;
  201. }
  202. object[] attributes = property.GetCustomAttributes(true);
  203. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  204. {
  205. if (attribute is PartitionKeyAttribute)
  206. {
  207. attrProperties.Add(property);
  208. }
  209. }
  210. }
  211. if (attrProperties.Count <= 0)
  212. {
  213. throw new BizException(type.Name +"has no PartitionKey !!!!!!");
  214. }
  215. else
  216. {
  217. if (attrProperties.Count == 1)
  218. {
  219. return attrProperties[0].Name;
  220. }
  221. else { throw new BizException("PartitionKey can only be single!"); }
  222. }
  223. }
  224. public async Task<T> Save<T>(T entity) //where T : object, new()
  225. {
  226. try
  227. {
  228. Type t = typeof(T);
  229. DocumentCollection collection= await InitializeCollection<T>();
  230. ResourceResponse<Document> doc =
  231. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  232. //Console.WriteLine(doc.ActivityId);
  233. return entity;
  234. }
  235. catch (Exception e)
  236. {
  237. throw new BizException(e.Message);
  238. }
  239. }
  240. public async Task<T> Update<T>(T entity)
  241. {
  242. Type t = typeof(T);
  243. DocumentCollection collection = await InitializeCollection<T>();
  244. ResourceResponse<Document> doc =
  245. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
  246. return entity;
  247. }
  248. public async Task<string> ReplaceObject<T>(T entity, string key)
  249. {
  250. Type t = typeof(T);
  251. DocumentCollection collection = await InitializeCollection<T>();
  252. try
  253. {
  254. ResourceResponse<Document> doc =
  255. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key), entity);
  256. return key;
  257. }
  258. catch (Exception e)
  259. {
  260. Console.WriteLine("{0} Exception caught.", e);
  261. //return false;
  262. }
  263. return null;
  264. }
  265. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  266. {
  267. Type t = typeof(T);
  268. DocumentCollection collection = await InitializeCollection<T>();
  269. try
  270. {
  271. ResourceResponse<Document> doc =
  272. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key),
  273. entity,
  274. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  275. return key;
  276. }
  277. catch (Exception e)
  278. {
  279. throw new BizException(e.Message);
  280. //Console.WriteLine("{0} Exception caught.", e);
  281. //return false;
  282. }
  283. }
  284. public async Task<List<T>> FindAll<T>()
  285. {
  286. Type t = typeof(T);
  287. Boolean open = true;
  288. List<T> objs = new List<T>();
  289. DocumentCollection collection = await InitializeCollection<T>();
  290. //查询条数 -1是全部
  291. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  292. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), queryOptions).AsDocumentQuery();
  293. while (query.HasMoreResults)
  294. {
  295. foreach (T obj in await query.ExecuteNextAsync())
  296. {
  297. objs.Add(obj);
  298. }
  299. }
  300. return objs;
  301. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  302. }
  303. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  304. {
  305. Type t = typeof(T);
  306. List<T> objs = new List<T>();
  307. DocumentCollection collection = await InitializeCollection<T>();
  308. //查询条数 -1是全部
  309. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  310. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  311. // query.Where();
  312. return objs;
  313. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  314. }
  315. public async Task<List<T>> FindSQL<T>(string sql)
  316. {
  317. Type t = typeof(T);
  318. //List<T> objs = new List<T>();
  319. DocumentCollection collection = await InitializeCollection<T>();
  320. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), sql);
  321. //foreach (var item in query)
  322. //{
  323. // objs.Add(item);
  324. //}
  325. return query.ToList<T>();
  326. }
  327. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  328. {
  329. Type t = typeof(T);
  330. //List<T> objs = new List<T>();
  331. // Boolean open = IsPk;
  332. DocumentCollection collection = await InitializeCollection<T>();
  333. //查询条数 -1是全部
  334. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  335. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql, queryOptions);
  336. //foreach (var item in query)
  337. //{
  338. // objs.Add(item);
  339. //}
  340. return query.ToList<T>();
  341. }
  342. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
  343. {
  344. DocumentCollection collection = await InitializeCollection<T>();
  345. Type t = typeof(T);
  346. Boolean open = true;
  347. List<Filter> filters = new List<Filter>();
  348. string PKname = "";
  349. PropertyInfo[] properties = t.GetProperties();
  350. foreach (PropertyInfo property in properties)
  351. {
  352. object[] attributes = property.GetCustomAttributes(true);
  353. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  354. {
  355. if (attribute is PartitionKeyAttribute)
  356. {
  357. PKname = property.Name;
  358. break;
  359. }
  360. }
  361. }
  362. foreach (string key in dict.Keys)
  363. {
  364. //if (t.Name.Equals(key)) {
  365. // open = false;
  366. //}
  367. if (PKname.Equals(key))
  368. {
  369. open = false;
  370. }
  371. filters.Add(new Filter { Contrast = "and", Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
  372. }
  373. //List<T> objs = new List<T>();
  374. await InitializeCollection<T>();
  375. //查询条数 -1是全部
  376. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  377. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
  378. List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
  379. return list;
  380. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  381. }
  382. public async Task<string> DeleteAsync<T>(string id)
  383. {
  384. Type t = typeof(T);
  385. DocumentCollection collection = await InitializeCollection<T>();
  386. ResourceResponse<Document> doc =
  387. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, id));
  388. //Console.WriteLine(doc.ActivityId);
  389. return id;
  390. }
  391. public async Task<T> DeleteAsync<T>(T entity)
  392. {
  393. DocumentCollection collection = await InitializeCollection<T>();
  394. Type t = typeof(T);
  395. string PartitionKey = GetPartitionKey<T>();
  396. if (!string.IsNullOrEmpty(PartitionKey))
  397. {
  398. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  399. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  400. ResourceResponse<Document> doc =
  401. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  402. }
  403. else
  404. {
  405. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  406. ResourceResponse<Document> doc =
  407. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue));
  408. }
  409. //log4net 日志記錄
  410. string uuidKey = Guid.NewGuid().ToString();
  411. string logkey = "【" + uuidKey + "】";
  412. LogHelper.Info(this,
  413. logkey
  414. +"【删除】"
  415. + "【表】"
  416. + t.Name
  417. + "【数据】"
  418. + entity.ToJson()
  419. + logkey);
  420. //Console.WriteLine(doc.ActivityId);
  421. return entity;
  422. }
  423. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  424. {
  425. Type t = typeof(T);
  426. DocumentCollection collection = await InitializeCollection<T>();
  427. ResourceResponse<Document> doc =
  428. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  429. //Console.WriteLine(doc.ActivityId);
  430. return id;
  431. }
  432. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  433. {
  434. DocumentCollection dataCollection = await InitializeCollection<T>();
  435. // Set retry options high for initialization (default values).
  436. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  437. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  438. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  439. await bulkExecutor.InitializeAsync();
  440. // Set retries to 0 to pass control to bulk executor.
  441. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  442. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  443. BulkImportResponse bulkImportResponse = null;
  444. long totalNumberOfDocumentsInserted = 0;
  445. double totalRequestUnitsConsumed = 0;
  446. double totalTimeTakenSec = 0;
  447. var tokenSource = new CancellationTokenSource();
  448. var token = tokenSource.Token;
  449. int pageSize = 50;
  450. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  451. for (int i = 0; i < pages; i++)
  452. {
  453. List<string> documentsToImportInBatch = new List<string>();
  454. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  455. for (int j = 0; j < lists.Count; j++)
  456. {
  457. documentsToImportInBatch.Add(lists[j].ToJson());
  458. }
  459. var tasks = new List<Task>
  460. { Task.Run(async () =>
  461. {
  462. do
  463. {
  464. //try
  465. //{
  466. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  467. documents: documentsToImportInBatch,
  468. enableUpsert: true,
  469. disableAutomaticIdGeneration: true,
  470. maxConcurrencyPerPartitionKeyRange: null,
  471. maxInMemorySortingBatchSize: null,
  472. cancellationToken: token);
  473. //}
  474. //catch (DocumentClientException de)
  475. //{
  476. // break;
  477. //}
  478. //catch (Exception e)
  479. //{
  480. // break;
  481. //}
  482. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  483. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  484. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  485. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  486. },
  487. token)
  488. };
  489. await Task.WhenAll(tasks);
  490. }
  491. return enyites;
  492. }
  493. public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
  494. {
  495. DocumentCollection dataCollection = await InitializeCollection<T>();
  496. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  497. await bulkExecutor.InitializeAsync();
  498. BulkUpdateResponse bulkUpdateResponse = null;
  499. long totalNumberOfDocumentsUpdated = 0;
  500. double totalRequestUnitsConsumed = 0;
  501. double totalTimeTakenSec = 0;
  502. var tokenSource = new CancellationTokenSource();
  503. var token = tokenSource.Token;
  504. // Generate update operations.
  505. List<UpdateOperation> updateOperations = new List<UpdateOperation>();
  506. // Unset the description field.
  507. if (null != updateFilters && updateFilters.Count > 0)
  508. {
  509. var keys = updateFilters.Keys;
  510. foreach (string key in keys)
  511. {
  512. // updateOperations.Add(new SetUpdateOperation<string>())
  513. if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
  514. {
  515. updateOperations.Add(SwitchType(key, updateFilters[key]));
  516. }
  517. }
  518. }
  519. if (deleteKeys.IsNotEmpty())
  520. {
  521. foreach (string key in deleteKeys)
  522. {
  523. updateOperations.Add(new UnsetUpdateOperation(key));
  524. }
  525. }
  526. List<T> list = await FindByParams<T>(dict);
  527. int pageSize = 100;
  528. int pages = (int)Math.Ceiling((double)list.Count / pageSize);
  529. string partitionKey = "/" + GetPartitionKey<T>();
  530. Type type = typeof(T);
  531. for (int i = 0; i < pages; i++)
  532. {
  533. List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
  534. List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
  535. for (int j = 0; j < lists.Count; j++)
  536. {
  537. string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
  538. string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
  539. updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
  540. }
  541. var tasks = new List<Task>
  542. { Task.Run(async () =>
  543. {
  544. do
  545. {
  546. //try
  547. //{
  548. bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
  549. updateItems: updateItemsInBatch,
  550. maxConcurrencyPerPartitionKeyRange: null,
  551. cancellationToken: token);
  552. //}
  553. //catch (DocumentClientException de)
  554. //{
  555. // break;
  556. //}
  557. //catch (Exception e)
  558. //{
  559. // break;
  560. //}
  561. } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
  562. totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
  563. totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
  564. totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
  565. },
  566. token)
  567. };
  568. await Task.WhenAll(tasks);
  569. }
  570. return list;
  571. }
  572. public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
  573. {
  574. DocumentCollection dataCollection = await InitializeCollection<T>();
  575. List<T> list = await FindByDict<T>(dict);
  576. List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
  577. string PartitionKey = GetPartitionKey<T>();
  578. if (list.IsNotEmpty())
  579. {
  580. foreach (T t in list)
  581. {
  582. string pkValue = t.GetType().GetProperty(PartitionKey).GetValue(t).ToString();
  583. string id = t.GetType().GetProperty("id").GetValue(t) + "";
  584. pkIdTuplesToDelete.Add(new Tuple<string, string>(pkValue, id));
  585. //log4net 日志記錄
  586. string uuidKey = Guid.NewGuid().ToString();
  587. string logkey = "【" + uuidKey + "】";
  588. LogHelper.Info(this,
  589. logkey
  590. + "【删除】"
  591. + "【表】"
  592. + t.GetType()
  593. + "【数据】"
  594. + t.ToJson()
  595. + logkey);
  596. }
  597. }
  598. else
  599. {
  600. return list;
  601. }
  602. long totalNumberOfDocumentsDeleted = 0;
  603. double totalRequestUnitsConsumed = 0;
  604. double totalTimeTakenSec = 0;
  605. BulkDeleteResponse bulkDeleteResponse = null;
  606. BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  607. await bulkExecutor.InitializeAsync();
  608. try
  609. {
  610. bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
  611. totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
  612. totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
  613. totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
  614. }
  615. catch (DocumentClientException de)
  616. {
  617. throw new BizException("Document client exception: {0}"+ de);
  618. }
  619. catch (Exception e)
  620. {
  621. throw new BizException("Exception: "+ e);
  622. }
  623. return list;
  624. }
  625. private static UpdateOperation SwitchType(string key, object obj)
  626. {
  627. Type s = obj.GetType();
  628. TypeCode typeCode = Type.GetTypeCode(s);
  629. return typeCode switch
  630. {
  631. TypeCode.String => new SetUpdateOperation<string>(key, obj.ToString()),
  632. TypeCode.Int32 => new SetUpdateOperation<Int32>(key, (Int32)obj),
  633. TypeCode.Double => new SetUpdateOperation<Double>(key, (Double)obj),
  634. TypeCode.Byte => new SetUpdateOperation<Byte>(key, (Byte)obj),
  635. TypeCode.Boolean => new SetUpdateOperation<Boolean>(key, (Boolean)obj),
  636. TypeCode.DateTime => new SetUpdateOperation<DateTime>(key, (DateTime)obj),
  637. TypeCode.Int64 => new SetUpdateOperation<Int64>(key, (Int64)obj),
  638. _ => null,
  639. };
  640. }
  641. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk=true)
  642. {
  643. Type t = typeof(T);
  644. // List<T> objs = new List<T>();
  645. DocumentCollection collection= await InitializeCollection<T>();
  646. StringBuilder sql = new StringBuilder("select value(c) from c");
  647. SQLHelper.GetSQL(dict,ref sql);
  648. //查询条数 -1是全部
  649. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
  650. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql.ToString(), queryOptions);
  651. //foreach (var item in query)
  652. //{
  653. // objs.Add(item);
  654. //}
  655. return query.ToList<T>();
  656. }
  657. private static string GenSql(object obj, string key)
  658. {
  659. if (obj is JArray array)
  660. {
  661. StringBuilder sql = new StringBuilder(" and c." + key + " in (");
  662. foreach (JValue obja in array)
  663. {
  664. if (obja.Value is string a)
  665. {
  666. sql.Append("'" + a + "',");
  667. }
  668. if (obja.Value is int b)
  669. {
  670. sql.Append(b + ",");
  671. }
  672. if (obja.Value is double c)
  673. {
  674. sql.Append(c + ",");
  675. }
  676. if (obja.Value is bool d)
  677. {
  678. sql.Append(d + ",");
  679. }
  680. if (obja.Value is long e)
  681. {
  682. sql.Append(e + ",");
  683. }
  684. if (obja.Value is DateTime f)
  685. {
  686. sql.Append(f + ",");
  687. }
  688. }
  689. string sqls = sql.ToString().Substring(0, sql.Length - 1);
  690. sqls += ") ";
  691. return sqls;
  692. }
  693. else if (key.StartsWith("$."))
  694. {
  695. Type s = obj.GetType();
  696. TypeCode typeCode = Type.GetTypeCode(s);
  697. string keyLike = key.Replace("$.", "");
  698. return typeCode switch
  699. {
  700. TypeCode.String => "and " + "Contains( c." + keyLike + " , \'" + obj.ToString() + "\') = true ",
  701. TypeCode.Int32 => "and " + "Contains( c." + keyLike + " , \'" + int.Parse(obj.ToString()) + "\') = true ",
  702. TypeCode.Double => "and " + "Contains( c." + keyLike + " , \'" + double.Parse(obj.ToString()) + "\') = true ",
  703. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  704. TypeCode.Boolean => "and " + "Contains( c." + keyLike + " , \'" + bool.Parse(obj.ToString()) + "\') = true ",
  705. TypeCode.DateTime => "and " + "Contains( c." + keyLike + " , \'" + (DateTime)obj + "\') = true ",
  706. TypeCode.Int64 => "and " + "Contains( c." + keyLike + " , \'" + long.Parse(obj.ToString()) + "\') = true ",
  707. _ => null,
  708. };
  709. }
  710. else
  711. {
  712. Type s = obj.GetType();
  713. TypeCode typeCode = Type.GetTypeCode(s);
  714. return typeCode switch
  715. {
  716. TypeCode.String => " and c." + key + "=" + "'" + obj.ToString() + "'",
  717. TypeCode.Int32 => " and c." + key + "=" + int.Parse(obj.ToString()),
  718. TypeCode.Double => " and c." + key + "=" + double.Parse(obj.ToString()),
  719. //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
  720. TypeCode.Boolean => " and c." + key + "=" + bool.Parse(obj.ToString()),
  721. TypeCode.DateTime => " and c." + key + "=" + (DateTime)obj,
  722. TypeCode.Int64 => " and c." + key + "=" + long.Parse(obj.ToString()),
  723. _ => null,
  724. };
  725. }
  726. }
  727. public IQueryable<dynamic> FindByDict(string CollectionName, Dictionary<string, object> dict)
  728. {
  729. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  730. {
  731. // collection = await InitializeCollection(CollectionName, "");
  732. /* StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
  733. if (dict != null)
  734. {
  735. foreach (string key in dict.Keys)
  736. {
  737. sql.Append(GenSql(dict[key], key));
  738. }
  739. }*/
  740. StringBuilder sql = new StringBuilder("select value(c) from c");
  741. SQLHelper.GetSQL(dict,ref sql);
  742. FeedOptions queryOptions;
  743. if (collection.PartitionKey.Paths.Count > 0)
  744. {
  745. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  746. }
  747. else
  748. {
  749. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  750. }
  751. //查询条数 -1是全部
  752. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  753. return query;
  754. }
  755. else {
  756. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  757. }
  758. }
  759. public IQueryable<dynamic> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
  760. {
  761. if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
  762. {
  763. StringBuilder sql = new StringBuilder("select value count(c) from c");
  764. SQLHelper.GetSQL(dict, ref sql);
  765. FeedOptions queryOptions;
  766. if (collection.PartitionKey.Paths.Count > 0)
  767. {
  768. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
  769. }
  770. else
  771. {
  772. queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
  773. }
  774. //查询条数 -1是全部
  775. var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
  776. return query;
  777. }
  778. else
  779. {
  780. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  781. }
  782. }
  783. private static void GetPageNum(Dictionary<string, object> dict, ref int offsetNum, ref int limitNum, ref bool pageBool)
  784. {
  785. dict.TryGetValue("OFFSET", out object offset);
  786. dict.Remove("OFFSET");
  787. dict.TryGetValue("LIMIT", out object limit);
  788. dict.Remove("LIMIT");
  789. if (offset != null && limit != null)
  790. {
  791. pageBool = true;
  792. offsetNum = int.Parse(offset.ToString());
  793. limitNum = int.Parse(limit.ToString());
  794. }
  795. }
  796. }
  797. }