AzureCosmosDBV3Repository.cs 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116
  1. using Microsoft.Azure.Cosmos;
  2. using Microsoft.Azure.Cosmos.Linq;
  3. using OpenXmlPowerTools;
  4. using System;
  5. using System.Collections.Concurrent;
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.IO;
  9. using System.Linq;
  10. using System.Linq.Expressions;
  11. using System.Net;
  12. using System.Reflection;
  13. using System.Text;
  14. using System.Text.Json;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. using TEAMModelOS.SDK.Context.Attributes.Azure;
  18. using TEAMModelOS.SDK.Context.Exception;
  19. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  20. using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
  21. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  22. namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
  23. {
  24. public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
  25. {
  26. private CosmosClient CosmosClient { get; set; }
  27. public CosmosDict CosmosDict { get; set; } = new CosmosDict();
  28. /// <summary>
  29. /// 数据库名
  30. /// </summary>
  31. private string DatabaseId { get; set; }
  32. /// <summary>
  33. /// RU
  34. /// </summary>
  35. private int CollectionThroughput { get; set; }
  36. /// <summary>
  37. /// 数据库对象
  38. /// </summary>
  39. private Database database { get; set; }
  40. /// <summary>
  41. /// 分页大小
  42. /// </summary>
  43. int pageSize = 200;
  44. /// <summary>
  45. /// 缓存前缀
  46. /// </summary>
  47. private const string CacheCosmosPrefix = "cosmos:";
  48. /// <summary>
  49. /// 扫描类
  50. /// </summary>
  51. private string[] ScanModel { get; set; }
  52. /// <summary>
  53. /// 超时时间
  54. /// </summary>
  55. private const int timeoutSeconds = 86400;
  56. /// <summary>
  57. /// ttl时长 1秒
  58. /// </summary>
  59. private const int ttl = 1;
  60. /// <summary>
  61. /// 更新源通知容器
  62. /// </summary>
  63. private string leaseId = "AleaseContainer";
  64. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
  65. {
  66. try
  67. {
  68. if (!string.IsNullOrEmpty(options.ConnectionString))
  69. {
  70. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
  71. }
  72. else
  73. {
  74. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  75. }
  76. DatabaseId = options.Database;
  77. CollectionThroughput = options.CollectionThroughput;
  78. ScanModel = options.ScanModel;
  79. }
  80. catch (CosmosException e)
  81. {
  82. throw new BizException(e.Message, 500, e.StackTrace);
  83. }
  84. }
  85. public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
  86. {
  87. try
  88. {
  89. if (!string.IsNullOrEmpty(options.ConnectionString))
  90. {
  91. CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
  92. }
  93. else
  94. {
  95. throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
  96. }
  97. DatabaseId = options.Database;
  98. CollectionThroughput = options.CollectionThroughput;
  99. ScanModel = options.ScanModel;
  100. }
  101. catch (CosmosException e)
  102. {
  103. throw new BizException(e.Message, 500, e.StackTrace);
  104. }
  105. }
  106. /// <summary>
  107. /// 初始化CosmosDB数据库
  108. /// </summary>
  109. /// <returns></returns>
  110. public async Task<CosmosDict> InitializeDatabase() {
  111. database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
  112. FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
  113. while (resultSetIterator.HasMoreResults)
  114. {
  115. foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
  116. {
  117. CosmosDict.nameCosmos.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), cache = false, monitor = false });
  118. }
  119. }
  120. bool isMonitor = false;
  121. //获取数据库所有的表
  122. List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
  123. foreach (Type type in types)
  124. {
  125. string PartitionKey = GetPartitionKey(type);
  126. string CollectionName = "";
  127. int RU = 0;
  128. bool cache = false;
  129. bool monitor = false;
  130. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  131. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  132. {
  133. CollectionName = attributes.First<CosmosDBAttribute>().Name;
  134. }
  135. else {
  136. throw new BizException("必须指定容器名",ResponseCode.PARAMS_ERROR);
  137. }
  138. if (attributes.First<CosmosDBAttribute>().Cache)
  139. {
  140. cache = attributes.First<CosmosDBAttribute>().Cache;
  141. }
  142. if (attributes.First<CosmosDBAttribute>().Monitor)
  143. {
  144. monitor = attributes.First<CosmosDBAttribute>().Monitor;
  145. if (monitor)
  146. {
  147. isMonitor = true;
  148. }
  149. }
  150. if (attributes.First<CosmosDBAttribute>().RU > 400)
  151. {
  152. RU = attributes.First<CosmosDBAttribute>().RU;
  153. }
  154. else
  155. {
  156. RU = CollectionThroughput;
  157. }
  158. //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
  159. if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
  160. { //更新RU
  161. cosmosModelInfo.cache = cache;
  162. Container container = CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id);
  163. int? throughputResponse = await container.ReadThroughputAsync();
  164. if (throughputResponse < RU)
  165. {
  166. await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
  167. }
  168. CosmosModelInfo cosmos = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type };
  169. CosmosDict.nameCosmos[CollectionName] = cosmos;
  170. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  171. }
  172. else {
  173. ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
  174. if (!string.IsNullOrEmpty(PartitionKey))
  175. {
  176. containerProperties.PartitionKeyPath = "/" + PartitionKey;
  177. }
  178. if (RU > CollectionThroughput)
  179. {
  180. CollectionThroughput = RU;
  181. }
  182. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
  183. CosmosModelInfo cosmos = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type };
  184. CosmosDict.nameCosmos[CollectionName] = cosmos;
  185. CosmosDict.typeCosmos.Add(type.Name, cosmos);
  186. }
  187. }
  188. if (isMonitor)
  189. {
  190. ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
  191. Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
  192. CosmosDict.nameCosmos.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
  193. }
  194. return CosmosDict;
  195. }
  196. private string GetPartitionKey<T>()
  197. {
  198. Type type = typeof(T);
  199. return GetPartitionKey(type);
  200. }
  201. private string GetPartitionKey(Type type)
  202. {
  203. PropertyInfo[] properties = type.GetProperties();
  204. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  205. foreach (PropertyInfo property in properties)
  206. {
  207. if (property.Name.Equals("PartitionKey"))
  208. {
  209. attrProperties.Add(property);
  210. break;
  211. }
  212. object[] attributes = property.GetCustomAttributes(true);
  213. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  214. {
  215. if (attribute is PartitionKeyAttribute)
  216. {
  217. attrProperties.Add(property);
  218. }
  219. }
  220. }
  221. if (attrProperties.Count <= 0)
  222. {
  223. throw new BizException(type.Name + " has no PartitionKey !");
  224. }
  225. else
  226. {
  227. if (attrProperties.Count == 1)
  228. {
  229. return attrProperties[0].Name;
  230. }
  231. else { throw new BizException(type.Name+" PartitionKey can only be single!"); }
  232. }
  233. }
  234. private async Task<CosmosModelInfo> InitializeCollection<T>()
  235. {
  236. Type type = typeof(T);
  237. string partitionKey = GetPartitionKey<T>();
  238. CosmosDBAttribute cosmosDBAttribute = null;
  239. IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
  240. if (attributes != null && !string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
  241. {
  242. cosmosDBAttribute = attributes.First<CosmosDBAttribute>();
  243. }
  244. else {
  245. throw new BizException(type.Name+"未指定CosmosDB表名",ResponseCode.PARAMS_ERROR);
  246. }
  247. return await InitializeCollection(cosmosDBAttribute, type.Name, partitionKey);
  248. }
  249. private async Task<CosmosModelInfo> InitializeCollection(CosmosDBAttribute cosmosDBAttribute, string typeName, string PartitionKey)
  250. {
  251. /////内存中已经存在这个表则直接返回
  252. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo cosmosModelInfo))
  253. {
  254. return cosmosModelInfo;
  255. }///如果没有则尝试默认创建
  256. else
  257. {
  258. ContainerProperties containerProperties = new ContainerProperties { Id = cosmosDBAttribute.Name };
  259. if (!string.IsNullOrEmpty(PartitionKey))
  260. {
  261. containerProperties.PartitionKeyPath = "/"+ PartitionKey;
  262. }
  263. Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: cosmosDBAttribute.RU);
  264. CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cosmosDBAttribute.Cache,monitor= cosmosDBAttribute.Monitor };
  265. CosmosDict. nameCosmos.TryAdd(cosmosDBAttribute.Name, cosmosModel);
  266. CosmosDict.typeCosmos.TryAdd(typeName, cosmosModel);
  267. return cosmosModel;
  268. }
  269. }
  270. /// <summary>
  271. /// 按TTL删除
  272. /// </summary>
  273. /// <typeparam name="T"></typeparam>
  274. /// <param name="list"></param>
  275. /// <returns></returns>
  276. private async Task<List<T>> DeleteTTL<T>(List<T> list) where T : ID
  277. {
  278. CosmosModelInfo container = await InitializeCollection<T>();
  279. list.ForEach(x => { x.ttl = ttl; });
  280. list = await SaveOrUpdateAll(list);
  281. if (container.cache && RedisHelper.Instance != null)
  282. {
  283. list.ForEach(async x => {
  284. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  285. });
  286. }
  287. return list;
  288. }
  289. public async Task<List<IdPk>> DeleteAll<T>(List<IdPk> ids) where T : ID
  290. {
  291. // string pk = GetPartitionKey<T>();
  292. CosmosModelInfo container = await InitializeCollection<T>();
  293. List<IdPk> idPks = new List<IdPk>();
  294. if (container.monitor)
  295. {
  296. List<T> list = await FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
  297. list = await DeleteTTL(list);
  298. return ids;
  299. }
  300. else
  301. {
  302. int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
  303. Stopwatch stopwatch = Stopwatch.StartNew();
  304. for (int i = 0; i < pages; i++)
  305. {
  306. List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
  307. List<Task> tasks = new List<Task>(lists.Count);
  308. lists.ForEach(item =>
  309. {
  310. tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
  311. .ContinueWith((Task<ResponseMessage> task) =>
  312. {
  313. using (ResponseMessage response = task.Result)
  314. {
  315. idPks.Add(new IdPk { id = item.id, pk = item.pk, StatusCode = response.StatusCode });
  316. // if (!response.IsSuccessStatusCode)
  317. // {
  318. // }
  319. }
  320. }
  321. ));
  322. });
  323. await Task.WhenAll(tasks);
  324. if (container.cache && RedisHelper.Instance != null)
  325. {
  326. lists.ForEach(async x => {
  327. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  328. });
  329. }
  330. }
  331. stopwatch.Stop();
  332. return idPks;
  333. }
  334. }
  335. public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string, object> dict) where T : ID
  336. {
  337. if (dict.Keys.Count > 0)
  338. {
  339. List<T> list = await FindByDict<T>(dict);
  340. return await DeleteAll(list);
  341. }
  342. else
  343. {
  344. throw new BizException("参数为空", 500);
  345. }
  346. }
  347. public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
  348. {
  349. Type type = typeof(T);
  350. string pk = GetPartitionKey<T>();
  351. CosmosModelInfo container = await InitializeCollection<T>();
  352. List<IdPk> idPks = new List<IdPk>();
  353. if (container.monitor)
  354. {
  355. enyites = await DeleteTTL(enyites);
  356. foreach (T t in enyites)
  357. {
  358. object o = type.GetProperty(pk).GetValue(t, null);
  359. idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
  360. }
  361. return idPks;
  362. }
  363. else
  364. {
  365. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  366. Stopwatch stopwatch = Stopwatch.StartNew();
  367. for (int i = 0; i < pages; i++)
  368. {
  369. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  370. List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
  371. lists.ForEach(x =>
  372. {
  373. object o = type.GetProperty(pk).GetValue(x, null);
  374. KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
  375. itemsToInsert.Add(keyValue);
  376. });
  377. List<Task> tasks = new List<Task>(lists.Count);
  378. itemsToInsert.ForEach(item =>
  379. {
  380. tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
  381. .ContinueWith((Task<ResponseMessage> task) =>
  382. {
  383. using (ResponseMessage response = task.Result)
  384. {
  385. idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
  386. }
  387. }
  388. ));
  389. });
  390. await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
  391. {
  392. lists.ForEach(async x => {
  393. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
  394. });
  395. }
  396. }
  397. stopwatch.Stop();
  398. return idPks;
  399. }
  400. }
  401. public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID
  402. {
  403. return await DeleteAsync<T>(idPk.id, idPk.pk);
  404. }
  405. public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
  406. {
  407. // pk = GetPartitionKey<T>();
  408. CosmosModelInfo container = await InitializeCollection<T>();
  409. if (container.monitor)
  410. {
  411. List<T> list = await FindByDict<T>(new Dictionary<string, object>() { { "id", id } });
  412. if (list.Count > 0)
  413. {
  414. await DeleteTTL<T>(list);
  415. return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
  416. }
  417. else
  418. {
  419. throw new BizException("未找到ID匹配的数据,删除失败");
  420. }
  421. }
  422. else
  423. {
  424. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
  425. if (container.cache && RedisHelper.Instance != null)
  426. {
  427. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
  428. }
  429. return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
  430. }
  431. }
  432. public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
  433. {
  434. CosmosModelInfo container = await InitializeCollection<T>();
  435. string partitionKey = GetPartitionKey<T>();
  436. Type type = typeof(T);
  437. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  438. if (container.monitor)
  439. {
  440. await DeleteTTL<T>(new List<T>() { entity });
  441. return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
  442. }
  443. else
  444. {
  445. ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
  446. if (container.cache && RedisHelper.Instance != null)
  447. {
  448. await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
  449. }
  450. return new IdPk { id = entity.id, pk = partitionKey, StatusCode = response.StatusCode };
  451. }
  452. }
  453. public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
  454. {
  455. CosmosModelInfo container = await InitializeCollection<T>();
  456. string pk = typeof(T).Name;
  457. StringBuilder sql;
  458. sql = SQLHelperParametric.GetSQLSelect(propertys);
  459. sql.Append(" where c.pk ='"+ pk + "'");
  460. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = sql.ToString() };
  461. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
  462. return await ResultsFromFeedIterator(query);
  463. }
  464. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
  465. {
  466. List<T> results = new List<T>();
  467. while (query.HasMoreResults)
  468. {
  469. foreach (T t in await query.ReadNextAsync())
  470. {
  471. results.Add(t);
  472. if (results.Count == maxItemCount)
  473. {
  474. return results;
  475. }
  476. }
  477. }
  478. return results;
  479. }
  480. private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
  481. {
  482. List<T> results = new List<T>();
  483. while (query.HasMoreResults)
  484. {
  485. if (results.Count() >= itemsPerPage)
  486. {
  487. await batchAction(results);
  488. results.Clear();
  489. }
  490. results.AddRange(await query.ReadNextAsync());
  491. }
  492. if (results.Count() > 0)
  493. {
  494. await batchAction(results);
  495. results.Clear();
  496. }
  497. return results;
  498. }
  499. public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, List<string> propertys = null)
  500. {
  501. if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
  502. {
  503. //StringBuilder sql = new StringBuilder("select value(c) from c");
  504. //SQLHelper.GetSQL(dict, ref sql);
  505. //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery, int itemsPerPage = -1, int?
  506. //{
  507. // QueryText = sql.ToString()
  508. //};
  509. string pk = container.type.Name;
  510. StringBuilder sql;
  511. sql = SQLHelperParametric.GetSQLSelect(propertys);
  512. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  513. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  514. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  515. return await ResultsFromFeedIterator(query);
  516. }
  517. else
  518. {
  519. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  520. }
  521. }
  522. public async Task<List<int>> FindCountByDict<T>(Dictionary<string, object> dict) {
  523. CosmosModelInfo container = await InitializeCollection<T>();
  524. string pk = typeof(T).Name;
  525. StringBuilder sql = new StringBuilder("select value count(c) from c");
  526. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  527. if (cosmosDbQuery == null)
  528. {
  529. return new List<int> { 0 };
  530. }
  531. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  532. FeedIterator<int> query = container.container.GetItemQueryIterator<int>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  533. return await ResultsFromFeedIterator(query);
  534. }
  535. public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
  536. {
  537. if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
  538. {
  539. string pk = container.type.Name;
  540. dict.Remove("@CURRPAGE");
  541. dict.Remove("@PAGESIZE");
  542. dict.Remove("@ASC");
  543. dict.Remove("@DESC");
  544. StringBuilder sql = new StringBuilder("select value count(c) from c");
  545. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  546. if (cosmosDbQuery == null) {
  547. return new List<dynamic> {0 };
  548. }
  549. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  550. FeedIterator<dynamic> query = container.container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
  551. return await ResultsFromFeedIterator(query);
  552. }
  553. else
  554. {
  555. throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
  556. }
  557. }
  558. public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  559. {
  560. return await FindByDict<T>(dict, propertys);
  561. }
  562. public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, List<string> propertys = null) where T : ID
  563. {
  564. StringBuilder sql;
  565. sql = SQLHelperParametric.GetSQLSelect(propertys);
  566. string pk = typeof(T).Name;
  567. CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
  568. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  569. return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
  570. }
  571. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions)
  572. {
  573. if (cosmosDbQuery == null) {
  574. return null;
  575. }
  576. CosmosModelInfo container = await InitializeCollection<T>();
  577. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  578. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  579. requestOptions: queryOptions);
  580. return await ResultsFromFeedIterator(query);
  581. }
  582. private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
  583. {
  584. return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
  585. }
  586. private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
  587. int? maxBufferedItemCount = null,
  588. int? maxConcurrency = null)
  589. {
  590. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  591. {
  592. MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
  593. MaxBufferedItemCount = maxBufferedItemCount ?? 100,
  594. MaxConcurrency = maxConcurrency ?? 50
  595. };
  596. return queryRequestOptions;
  597. }
  598. private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
  599. {
  600. CosmosModelInfo container = await InitializeCollection<T>();
  601. FeedIterator<T> query = container.container.GetItemQueryIterator<T>(
  602. queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
  603. requestOptions: queryOptions);
  604. return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
  605. }
  606. private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
  607. {
  608. QueryRequestOptions queryRequestOptions = new QueryRequestOptions
  609. {
  610. MaxItemCount = itemsPerPage
  611. };
  612. return queryRequestOptions;
  613. }
  614. public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false) where T : ID
  615. {
  616. string pk = typeof(T).Name;
  617. query = query.And(x => x.pk == pk);
  618. //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
  619. QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
  620. FeedIterator<T> feedIterator;
  621. CosmosModelInfo container = await InitializeCollection<T>();
  622. if (query == null)
  623. {
  624. if (order != null)
  625. {
  626. if (isDesc)
  627. {
  628. feedIterator = container.container
  629. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
  630. .ToFeedIterator();
  631. }
  632. else
  633. {
  634. feedIterator = container.container
  635. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
  636. .ToFeedIterator();
  637. }
  638. }
  639. else
  640. {
  641. feedIterator = container.container
  642. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  643. .ToFeedIterator();
  644. }
  645. }
  646. else
  647. {
  648. if (order != null)
  649. {
  650. if (isDesc)
  651. {
  652. feedIterator = container.container
  653. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  654. .Where(query).OrderByDescending(order)
  655. .ToFeedIterator();
  656. }
  657. else
  658. {
  659. feedIterator = container.container
  660. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  661. .Where(query).OrderBy(order)
  662. .ToFeedIterator();
  663. }
  664. }
  665. else
  666. {
  667. feedIterator = container.container
  668. .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
  669. .Where(query)
  670. .ToFeedIterator();
  671. }
  672. }
  673. return await ResultsFromFeedIterator<T>(feedIterator);
  674. }
  675. public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null) where T : ID
  676. {
  677. if (sql.Contains(".pk"))
  678. {
  679. CosmosModelInfo container = await InitializeCollection<T>();
  680. QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
  681. if (Parameters != null)
  682. {
  683. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  684. {
  685. QueryText = sql,
  686. Parameters = Parameters
  687. };
  688. FeedIterator<T> feedIterator = container.container
  689. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
  690. return await ResultsFromFeedIterator(feedIterator);
  691. }
  692. else
  693. {
  694. QueryDefinition queryDefinition = new QueryDefinition(sql);
  695. return await ResultsFromFeedIterator<T>(container.container.GetItemQueryIterator<T>(queryDefinition));
  696. }
  697. }
  698. else
  699. {
  700. throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
  701. }
  702. }
  703. public async Task<T> Save<T>(T entity) where T : ID
  704. {
  705. try
  706. {
  707. CosmosModelInfo container = await InitializeCollection<T>();
  708. entity.pk = container.type.Name;
  709. ItemResponse<T> response = await container.container.CreateItemAsync<T>(entity);
  710. if (container.cache && RedisHelper.Instance != null)
  711. {
  712. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  713. {
  714. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  715. }
  716. else
  717. {
  718. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  719. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  720. }
  721. }
  722. return response.Resource;
  723. }
  724. catch (Exception e)
  725. {
  726. throw new BizException(e.Message);
  727. }
  728. }
  729. public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
  730. {
  731. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  732. CosmosModelInfo container = await InitializeCollection<T>();
  733. bool flag = false;
  734. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  735. {
  736. flag = true;
  737. }
  738. string partitionKey = GetPartitionKey<T>();
  739. Type type = typeof(T);
  740. Stopwatch stopwatch = Stopwatch.StartNew();
  741. for (int i = 0; i < pages; i++)
  742. {
  743. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  744. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  745. lists.ForEach(async x =>
  746. {
  747. x.pk = type.Name;
  748. MemoryStream stream = new MemoryStream();
  749. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  750. object o = type.GetProperty(partitionKey).GetValue(x, null);
  751. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  752. itemsToInsert.Add(keyValue);
  753. });
  754. List<Task> tasks = new List<Task>(lists.Count);
  755. itemsToInsert.ForEach(item =>
  756. {
  757. tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
  758. .ContinueWith((Task<ResponseMessage> task) =>
  759. {
  760. using (ResponseMessage response = task.Result)
  761. {
  762. if (!response.IsSuccessStatusCode)
  763. {
  764. }
  765. }
  766. }
  767. ));
  768. });
  769. await Task.WhenAll(tasks);
  770. if (container.cache && RedisHelper.Instance != null)
  771. {
  772. lists.ForEach(async x => {
  773. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  774. });
  775. }
  776. }
  777. if (container.cache && RedisHelper.Instance != null && !flag)
  778. {
  779. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  780. }
  781. stopwatch.Stop();
  782. return enyites;
  783. }
  784. public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
  785. {
  786. CosmosModelInfo container = await InitializeCollection<T>();
  787. entity.pk = container.type.Name;
  788. ItemResponse<T> response = await container.container.UpsertItemAsync(item: entity);
  789. if (container.cache && RedisHelper.Instance != null)
  790. {
  791. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  792. {
  793. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  794. }
  795. else
  796. {
  797. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  798. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  799. }
  800. }
  801. return response.Resource;
  802. }
  803. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
  804. {
  805. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  806. //{
  807. // Task.WaitAll(Update(item));
  808. //}));
  809. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  810. CosmosModelInfo container = await InitializeCollection<T>();
  811. bool flag = false;
  812. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  813. {
  814. flag = true;
  815. }
  816. string partitionKey = GetPartitionKey<T>();
  817. Type type = typeof(T);
  818. Stopwatch stopwatch = Stopwatch.StartNew();
  819. for (int i = 0; i < pages; i++)
  820. {
  821. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  822. List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
  823. lists.ForEach(async x =>
  824. {
  825. x.pk = type.Name;
  826. MemoryStream stream = new MemoryStream();
  827. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  828. object o = type.GetProperty(partitionKey).GetValue(x, null);
  829. KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
  830. itemsToInsert.Add(keyValue);
  831. });
  832. List<Task> tasks = new List<Task>(lists.Count);
  833. itemsToInsert.ForEach(item =>
  834. {
  835. tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
  836. .ContinueWith((Task<ResponseMessage> task) =>
  837. {
  838. //using (ResponseMessage response = task.Result)
  839. //{
  840. // if (!response.IsSuccessStatusCode)
  841. // {
  842. // }
  843. //}
  844. }
  845. ));
  846. });
  847. await Task.WhenAll(tasks);
  848. if (container.cache && RedisHelper.Instance != null)
  849. {
  850. lists.ForEach(async x => {
  851. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  852. });
  853. }
  854. }
  855. if (container.cache && RedisHelper.Instance != null && !flag)
  856. {
  857. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  858. }
  859. stopwatch.Stop();
  860. return enyites;
  861. }
  862. public async Task<T> Update<T>(T entity) where T : ID
  863. {
  864. CosmosModelInfo container = await InitializeCollection<T>();
  865. string partitionKey = GetPartitionKey<T>();
  866. Type type = typeof(T);
  867. entity.pk = type.Name;
  868. object o = type.GetProperty(partitionKey).GetValue(entity, null);
  869. ItemResponse<T> response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
  870. if (container.cache && RedisHelper.Instance != null)
  871. {
  872. if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  873. {
  874. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  875. }
  876. else
  877. {
  878. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
  879. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  880. }
  881. }
  882. return response.Resource;
  883. }
  884. internal class Item
  885. {
  886. public string id { get; set; }
  887. public string pk { get; set; }
  888. public MemoryStream stream { get; set; }
  889. }
  890. public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
  891. {
  892. //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
  893. //{
  894. // Task.WaitAll(Update(item));
  895. //}));
  896. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  897. CosmosModelInfo container = await InitializeCollection<T>();
  898. bool flag = false;
  899. if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
  900. {
  901. flag = true;
  902. }
  903. string partitionKey = GetPartitionKey<T>();
  904. Type type = typeof(T);
  905. Stopwatch stopwatch = Stopwatch.StartNew();
  906. for (int i = 0; i < pages; i++)
  907. {
  908. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  909. List<Item> itemsToInsert = new List<Item>();
  910. lists.ForEach(async x =>
  911. {
  912. x.pk = type.Name;
  913. MemoryStream stream = new MemoryStream();
  914. await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
  915. object o = type.GetProperty(partitionKey).GetValue(x, null);
  916. Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
  917. itemsToInsert.Add(keyValue);
  918. });
  919. List<Task> tasks = new List<Task>(lists.Count);
  920. itemsToInsert.ForEach(item =>
  921. {
  922. tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
  923. .ContinueWith((Task<ResponseMessage> task) =>
  924. {
  925. //using (ResponseMessage response = task.Result)
  926. //{
  927. // if (!response.IsSuccessStatusCode)
  928. // {
  929. // }
  930. //}
  931. }
  932. ));
  933. });
  934. await Task.WhenAll(tasks);
  935. if (container.cache && RedisHelper.Instance != null)
  936. {
  937. lists.ForEach(async x => {
  938. await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
  939. });
  940. }
  941. }
  942. if (container.cache && RedisHelper.Instance != null && !flag)
  943. {
  944. await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
  945. }
  946. stopwatch.Stop();
  947. return enyites;
  948. }
  949. private async Task<List<T>> FindByIdAsSql<T>(string id) where T : ID
  950. {
  951. CosmosModelInfo container = await InitializeCollection<T>();
  952. string pk = container.type.Name;
  953. CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
  954. {
  955. QueryText = @"SELECT *
  956. FROM c
  957. WHERE c.pk='" + pk + "' and c.id = @id",
  958. Parameters = new Dictionary<string, object>
  959. {
  960. { "@id",id}
  961. }
  962. };
  963. FeedIterator<T> feedIterator = container.container
  964. .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
  965. return (await ResultsFromFeedIterator(feedIterator)).ToList();
  966. }
  967. public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
  968. {
  969. CosmosModelInfo container = await InitializeCollection<T>();
  970. try {
  971. ItemResponse<T> response = await container.container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
  972. return response.Resource;
  973. }
  974. catch (Exception ex) {
  975. return default;
  976. }
  977. }
  978. public async Task<List<T>> FindById<T>(string id, bool cache = true) where T : ID
  979. {
  980. CosmosModelInfo container = await InitializeCollection<T>();
  981. if (container.cache && RedisHelper.Instance != null && cache == true)
  982. {
  983. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByIdAsSql<T>(id); });
  984. }
  985. else
  986. {
  987. return await FindByIdAsSql<T>(id);
  988. }
  989. }
  990. public async Task<List<T>> FindByIds<T>(List<string> ids) where T : ID
  991. {
  992. CosmosModelInfo container = await InitializeCollection<T>();
  993. if (container.cache && RedisHelper.Instance != null)
  994. {
  995. List<T> list = new List<T>();
  996. List<string> NotIn = new List<string>();
  997. foreach (string id in ids)
  998. {
  999. if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
  1000. {
  1001. NotIn.Add(id);
  1002. }
  1003. else
  1004. {
  1005. list.Add(await RedisHelper.HGetAsync<T>(CacheCosmosPrefix + container.container.Id, id));
  1006. }
  1007. }
  1008. if (NotIn.IsNotEmpty())
  1009. {
  1010. List<T> noInList = await FindByDict<T>(new Dictionary<string, object> { { "id", NotIn.ToArray() } });
  1011. noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
  1012. list.AddRange(noInList);
  1013. }
  1014. return list;
  1015. }
  1016. else
  1017. {
  1018. return await FindByDict<T>(new Dictionary<string, object> { { "id", ids.ToArray() } });
  1019. }
  1020. }
  1021. public async Task<dynamic> FindById(string typeName, string id)
  1022. {
  1023. if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
  1024. {
  1025. if (container.cache && RedisHelper.Instance != null)
  1026. {
  1027. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary<string, object> { { "id", id } }); });
  1028. }
  1029. else
  1030. {
  1031. return await FindByDict(typeName, new Dictionary<string, object> { { "id", id } });
  1032. }
  1033. }
  1034. else
  1035. {
  1036. throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
  1037. }
  1038. }
  1039. public async Task<List<dynamic>> FindByIds(string typeName, List<string> ids)
  1040. {
  1041. if (CosmosDict. typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
  1042. {
  1043. if (container.cache && RedisHelper.Instance != null)
  1044. {
  1045. return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary<string, object> { { "id", ids.ToArray() } }); });
  1046. }
  1047. else
  1048. {
  1049. return await FindByDict(typeName, new Dictionary<string, object> { { "id", ids.ToArray() } });
  1050. }
  1051. }
  1052. else
  1053. {
  1054. throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
  1055. }
  1056. }
  1057. }
  1058. }