AzureTableDBRepository.cs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757
  1. using TEAMModelOS.SDK.Module.AzureTable.Configuration;
  2. using TEAMModelOS.SDK.Module.AzureTable.Interfaces;
  3. using TEAMModelOS.SDK.Extension.DataResult.PageToken;
  4. using System;
  5. using System.Collections.Concurrent;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  11. using TEAMModelOS.SDK.Context.Exception;
  12. using System.Reflection;
  13. using TEAMModelOS.SDK.Context.Attributes.Azure;
  14. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  15. using TEAMModelOS.SDK.Context.Configuration;
  16. using Microsoft.Extensions.Configuration;
  17. using Microsoft.Azure.Cosmos.Table;
  18. namespace TEAMModelOS.SDK.Module.AzureTable.Implements
  19. { //Linq全部查詢有1000限制
  20. /*
  21. public async Task<List<T>> GetByPartitionKey(string partitionKey)
  22. {
  23. var theQuery = table.CreateQuery<T>().Where(ent => ent.PartitionKey == partitionKey);
  24. TableQuerySegment<T> querySegment = null;
  25. var returnList = new List<T>();
  26. while (querySegment == null || querySegment.ContinuationToken != null)
  27. {
  28. querySegment = await theQuery.AsTableQuery()
  29. .ExecuteSegmentedAsync(querySegment != null ?
  30. querySegment.ContinuationToken : null);
  31. returnList.AddRange(querySegment);
  32. }
  33. return returnList;
  34. //Linq全部查詢沒有1000限制(要驗證)
  35. public Task<List<T>> GetByPartitionKey(string partitionKey)
  36. {
  37. return Task.Run(() => table.CreateQuery<T>()
  38. .Where(ent => ent.PartitionKey == partitionKey)
  39. .ToList());
  40. }
  41. }*/
  42. /// <summary>
  43. /// 带处理问题
  44. /// 1.使用 Microsoft.Azure.Cosmos.Table
  45. /// 2.AddSingleton 可能不適合CloudTableClient 线程不安全
  46. /// 3.能否使用 Efcore
  47. /// 4.有CreateQuery可以用,也有AsTableQuery轉換 還有Microsoft.Azure.Cosmos.Table.Queryable 支持 Linq lambda語法
  48. /// 5. 另外可以驗證一下最低查詢投影處理Count時 var query = from entity in table.CreateQuery<T>(tableName) select new { entity.PartitionKey };
  49. ///
  50. /// 另外你有優化的話,順便驗證一下
  51. /// </summary>
  52. public class AzureTableDBRepository : IAzureTableDBRepository
  53. {
  54. private CloudTableClient CloudTableClient { get; set; }
  55. private CloudTable cloudTable { get; set; }
  56. private AzureTableOptions options { get; set; }
  57. public AzureTableDBRepository()
  58. {
  59. options = BaseConfigModel.Configuration.GetSection("Azure:Table").Get<AzureTableOptions>();
  60. CloudTableClient = CloudStorageAccount.Parse(options.ConnectionString).CreateCloudTableClient();
  61. }
  62. private string GetTableSpace<T>()
  63. {
  64. Type type = typeof(T);
  65. string Name = type.Name;
  66. object[] attributes = type.GetCustomAttributes(true);
  67. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  68. {
  69. if (attribute is TableNameAttribute tableSpace)
  70. {
  71. Name = tableSpace.Name;
  72. }
  73. }
  74. return Name;
  75. }
  76. private async Task<string> InitializeTable<T>()
  77. {
  78. string TableName = GetTableSpace<T>();
  79. if (cloudTable == null || !cloudTable.Name.Equals(TableName))
  80. {
  81. cloudTable = CloudTableClient.GetTableReference(TableName);
  82. await cloudTable.CreateIfNotExistsAsync();
  83. }
  84. return TableName;
  85. }
  86. //Linq全部查詢有1000限制
  87. //public async Task<List<T>> GetByPartitionKey<T>(string partitionKey) where T : TableEntity, new()
  88. //{
  89. // var theQuery = cloudTable.CreateQuery<T>().Where(ent => ent.PartitionKey == partitionKey);
  90. // TableQuerySegment<T> querySegment = null;
  91. // var returnList = new List<T>();
  92. // while (querySegment == null || querySegment.ContinuationToken != null)
  93. // {
  94. // querySegment = await theQuery.AsTableQuery()
  95. // .ExecuteSegmentedAsync(querySegment != null ?
  96. // querySegment.ContinuationToken : null);
  97. // returnList.AddRange(querySegment);
  98. // }
  99. // return returnList;
  100. //}
  101. //Linq全部查詢沒有1000限制(要驗證)
  102. /// <summary>
  103. /// Linq全部查詢沒有1000限制(要驗證)
  104. /// 就是最低消耗查詢處理Count var query = from entity in table.CreateQuery<T>(tableName) select new { entity.PartitionKey };
  105. /// </summary>
  106. /// <typeparam name="T"></typeparam>
  107. /// <param name="partitionKey"></param>
  108. /// <returns></returns>
  109. public Task<List<T>> GetByPartitionKey<T>(string partitionKey) where T : TableEntity, new()
  110. {
  111. return Task.Run(() => cloudTable.CreateQuery<T>()
  112. .Where(ent => ent.PartitionKey == partitionKey)
  113. .ToList());
  114. }
  115. public async Task<List<T>> FindAll<T>() where T : TableEntity, new()
  116. {
  117. string TableName = await InitializeTable<T>();
  118. var exQuery = new TableQuery<T>();
  119. return await QueryList<T>(exQuery, TableName);
  120. }
  121. private async Task<List<T>> QueryList<T>(TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  122. {
  123. TableContinuationToken continuationToken = null;
  124. List<T> entitys = new List<T>();
  125. do
  126. {
  127. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  128. if (result.Results.Count > 0)
  129. {
  130. entitys.AddRange(result.ToList());
  131. }
  132. continuationToken = result.ContinuationToken;
  133. } while (continuationToken != null);
  134. return entitys;
  135. }
  136. private async Task<T> QueryObject<T>(TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  137. {
  138. TableContinuationToken continuationToken = null;
  139. T entity = new T();
  140. do
  141. {
  142. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  143. if (result.Results.Count > 0)
  144. {
  145. entity = result.ToList().Single();
  146. }
  147. continuationToken = result.ContinuationToken;
  148. } while (continuationToken != null);
  149. return entity;
  150. }
  151. public async Task<int> Count<T>(TableContinuationToken continuationToken) where T : TableEntity, new()
  152. {
  153. string TableName = await InitializeTable<T>();
  154. var exQuery = new TableQuery<T>();
  155. List<T> entitys = new List<T>();
  156. do
  157. {
  158. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  159. if (result.Results.Count > 0)
  160. {
  161. entitys.AddRange(result.ToList());
  162. }
  163. continuationToken = result.ContinuationToken;
  164. } while (continuationToken != null);
  165. return entitys.Count;
  166. }
  167. public async Task<int> Count<T>() where T : TableEntity, new()
  168. {
  169. string TableName = await InitializeTable<T>();
  170. TableContinuationToken continuationToken = null;
  171. var exQuery = new TableQuery<T>();
  172. List<T> entitys = new List<T>();
  173. do
  174. {
  175. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  176. if (result.Results.Count > 0)
  177. {
  178. entitys.AddRange(result.ToList());
  179. }
  180. continuationToken = result.ContinuationToken;
  181. } while (continuationToken != null);
  182. return entitys.Count;
  183. }
  184. public async Task<T> FindByRowKey<T>(string id) where T : TableEntity, new()
  185. {
  186. string TableName = await InitializeTable<T>();
  187. var exQuery = new TableQuery<T>();
  188. if (!string.IsNullOrEmpty(id))
  189. {
  190. string typeStr = SwitchType<T>(id, "RowKey");
  191. if (string.IsNullOrEmpty(typeStr))
  192. {
  193. return null;
  194. }
  195. exQuery.Where(typeStr);
  196. // exQuery.Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, id));
  197. return await QueryObject<T>(exQuery, TableName);
  198. }
  199. else
  200. {
  201. return null;
  202. }
  203. }
  204. public async Task<List<T>> FindListByDict<T>(Dictionary<string, object> dict) where T : TableEntity, new()
  205. {
  206. string TableName = await InitializeTable<T>();
  207. var exQuery = new TableQuery<T>();
  208. StringBuilder builder = new StringBuilder();
  209. if (null != dict && dict.Count > 0)
  210. {
  211. var keys = dict.Keys;
  212. int index = 1;
  213. foreach (string key in keys)
  214. {
  215. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  216. {
  217. string typeStr = SwitchType<T>(dict[key], key);
  218. if (string.IsNullOrEmpty(typeStr))
  219. {
  220. continue;
  221. }
  222. if (index == 1)
  223. {
  224. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  225. builder.Append(typeStr);
  226. }
  227. else
  228. {
  229. //builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  230. builder.Append(" " + TableOperators.And + " " + typeStr);
  231. }
  232. index++;
  233. }
  234. else
  235. {
  236. throw new Exception("The parameter must have value!");
  237. }
  238. }
  239. exQuery.Where(builder.ToString());
  240. return await QueryList<T>(exQuery, TableName);
  241. }
  242. else
  243. {
  244. return null;
  245. }
  246. }
  247. public async Task<List<T>> FindListByKey<T>(string key, object value) where T : TableEntity, new()
  248. {
  249. string TableName = await InitializeTable<T>();
  250. var exQuery = new TableQuery<T>();
  251. if (!string.IsNullOrEmpty(key) && value != null && !string.IsNullOrEmpty(value.ToString()))
  252. {
  253. string typeStr = SwitchType<T>(value, key);
  254. if (string.IsNullOrEmpty(typeStr))
  255. {
  256. return null;
  257. }
  258. exQuery.Where(typeStr);
  259. //exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, value));
  260. return await QueryList<T>(exQuery, TableName);
  261. }
  262. else
  263. {
  264. return null;
  265. }
  266. }
  267. public async Task<T> FindOneByDict<T>(IDictionary<string, object> dict) where T : TableEntity, new()
  268. {
  269. string TableName = await InitializeTable<T>();
  270. var exQuery = new TableQuery<T>();
  271. StringBuilder builder = new StringBuilder();
  272. if (null != dict && dict.Count > 0)
  273. {
  274. var keys = dict.Keys;
  275. int index = 1;
  276. foreach (string key in keys)
  277. {
  278. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  279. {
  280. string typeStr = SwitchType<T>(dict[key], key);
  281. if (string.IsNullOrEmpty(typeStr))
  282. {
  283. continue;
  284. }
  285. if (index == 1)
  286. {
  287. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  288. builder.Append(typeStr);
  289. }
  290. else
  291. {
  292. // builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  293. builder.Append(" " + TableOperators.And + " " + typeStr);
  294. }
  295. index++;
  296. }
  297. else
  298. {
  299. throw new Exception("The parameter must have value!");
  300. }
  301. }
  302. exQuery.Where(builder.ToString());
  303. return await QueryObject<T>(exQuery, TableName);
  304. }
  305. else
  306. {
  307. return null;
  308. }
  309. }
  310. private static string SwitchType<T>(object obj, string key)
  311. {
  312. Type objType = typeof(T);
  313. PropertyInfo property = objType.GetProperty(key);
  314. //Type s = obj.GetType();
  315. //TypeCode typeCode = Type.GetTypeCode(s);
  316. if (property == null)
  317. {
  318. //return null;
  319. throw new Exception(objType.FullName + " PropertyInfo doesn't include this parameter :" + key);
  320. }
  321. TypeCode typeCode = Type.GetTypeCode(property.PropertyType);
  322. switch (typeCode)
  323. {
  324. case TypeCode.String: return TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, obj.ToString());
  325. case TypeCode.Int32: return TableQuery.GenerateFilterConditionForInt(key, QueryComparisons.Equal, int.Parse(obj.ToString()));
  326. case TypeCode.Double: return TableQuery.GenerateFilterConditionForDouble(key, QueryComparisons.Equal, (double)obj);
  327. case TypeCode.Byte: return TableQuery.GenerateFilterConditionForBinary(key, QueryComparisons.Equal, (byte[])obj);
  328. case TypeCode.Boolean: return TableQuery.GenerateFilterConditionForBool(key, QueryComparisons.Equal, (bool)obj);
  329. case TypeCode.DateTime: return TableQuery.GenerateFilterConditionForDate(key, QueryComparisons.Equal, (DateTimeOffset)obj);
  330. case TypeCode.Int64: return TableQuery.GenerateFilterConditionForLong(key, QueryComparisons.Equal, long.Parse(obj.ToString()));
  331. default: return null;
  332. }
  333. }
  334. public async Task<T> FindOneByKey<T>(string key, object value) where T : TableEntity, new()
  335. {
  336. string TableName = await InitializeTable<T>();
  337. var exQuery = new TableQuery<T>();
  338. if (!string.IsNullOrEmpty(key) && value != null && !string.IsNullOrEmpty(value.ToString()))
  339. {
  340. string typeStr = SwitchType<T>(value, key);
  341. if (string.IsNullOrEmpty(typeStr))
  342. {
  343. return null;
  344. }
  345. exQuery.Where(typeStr);
  346. //exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal,
  347. // value));
  348. return await QueryObject<T>(exQuery, TableName);
  349. }
  350. else
  351. {
  352. return null;
  353. }
  354. }
  355. public async Task<List<T>> GetEntities<T>(IDictionary<string, object> dict) where T : TableEntity, new()
  356. {
  357. string TableName = await InitializeTable<T>();
  358. var exQuery = new TableQuery<T>();
  359. StringBuilder builder = new StringBuilder();
  360. if (null != dict && dict.Count > 0)
  361. {
  362. var keys = dict.Keys;
  363. int index = 1;
  364. foreach (string key in keys)
  365. {
  366. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  367. {
  368. string typeStr = SwitchType<T>(dict, key);
  369. if (string.IsNullOrEmpty(typeStr))
  370. {
  371. continue;
  372. }
  373. if (index == 1)
  374. {
  375. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  376. builder.Append(typeStr);
  377. }
  378. else
  379. {
  380. // builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  381. builder.Append(" " + TableOperators.And + " " + typeStr);
  382. }
  383. index++;
  384. }
  385. else
  386. {
  387. throw new Exception("The parameter must have value!");
  388. }
  389. }
  390. exQuery.Where(builder.ToString());
  391. return await QueryList<T>(exQuery, TableName);
  392. }
  393. else
  394. {
  395. return null;
  396. }
  397. }
  398. public async Task<List<T>> SaveAll<T>(List<T> entitys) where T : TableEntity, new()
  399. {
  400. if (entitys.IsEmpty())
  401. {
  402. return null;
  403. }
  404. string TableName = await InitializeTable<T>();
  405. IList<Dictionary<string, List<T>>> listInfo = new List<Dictionary<string, List<T>>>();
  406. foreach (IGrouping<string, T> group in entitys.GroupBy(c => c.PartitionKey))
  407. {
  408. Dictionary<string, List<T>> dictInfo = new Dictionary<string, List<T>>
  409. {
  410. { group.Key, group.ToList() }
  411. };
  412. listInfo.Add(dictInfo);
  413. }
  414. foreach (Dictionary<string, List<T>> dict in listInfo)
  415. {
  416. IList<TableResult> result = null;
  417. foreach (string key in dict.Keys)
  418. {
  419. List<T> values = dict[key];
  420. //Parallel.ForEach(Partitioner.Create(0, values.Count, 100),
  421. // async range =>
  422. // {
  423. // TableBatchOperation batchOperation = new TableBatchOperation();
  424. // for (Int32 i = range.Item1; i < range.Item2; i++)
  425. // batchOperation.Insert(values[i]);
  426. // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  427. // });
  428. int pageSize = 100;
  429. int pages = (int)Math.Ceiling((double)values.Count / pageSize);
  430. for (int i = 0; i < pages; i++)
  431. {
  432. List<T> lists = values.Skip((i) * pageSize).Take(pageSize).ToList();
  433. TableBatchOperation batchOperation = new TableBatchOperation();
  434. for (int j = 0; j < lists.Count; j++)
  435. {
  436. batchOperation.Insert(lists[j]);
  437. }
  438. result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  439. }
  440. }
  441. }
  442. return entitys;
  443. }
  444. public async Task<List<T>> UpdateAll<T>(List<T> entitys) where T : TableEntity, new()
  445. {
  446. if (entitys.IsEmpty())
  447. {
  448. return null;
  449. }
  450. string TableName = await InitializeTable<T>();
  451. IList<Dictionary<string, List<T>>> listInfo = new List<Dictionary<string, List<T>>>();
  452. foreach (IGrouping<string, T> group in entitys.GroupBy(c => c.PartitionKey))
  453. {
  454. Dictionary<string, List<T>> dictInfo = new Dictionary<string, List<T>>
  455. {
  456. { group.Key, group.ToList() }
  457. };
  458. listInfo.Add(dictInfo);
  459. }
  460. foreach (Dictionary<string, List<T>> dict in listInfo)
  461. {
  462. IList<TableResult> result = null;
  463. foreach (string key in dict.Keys)
  464. {
  465. List<T> values = dict[key];
  466. //Parallel.ForEach(Partitioner.Create(0, values.Count, 100),
  467. // async range =>
  468. // {
  469. // TableBatchOperation batchOperation = new TableBatchOperation();
  470. // for (Int32 i = range.Item1; i < range.Item2; i++)
  471. // batchOperation.Replace(values[i]);
  472. // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  473. // });
  474. int pageSize = 100;
  475. int pages = (int)Math.Ceiling((double)values.Count / pageSize);
  476. for (int i = 0; i < pages; i++)
  477. {
  478. List<T> lists = values.Skip((i) * pageSize).Take(pageSize).ToList();
  479. TableBatchOperation batchOperation = new TableBatchOperation();
  480. for (int j = 0; j < lists.Count; j++)
  481. {
  482. batchOperation.Replace(lists[j]);
  483. }
  484. result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  485. }
  486. }
  487. }
  488. return entitys;
  489. }
  490. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> entitys) where T : TableEntity, new()
  491. {
  492. if (entitys.IsEmpty())
  493. {
  494. return null;
  495. }
  496. string TableName = await InitializeTable<T>();
  497. IList<Dictionary<string, List<T>>> listInfo = new List<Dictionary<string, List<T>>>();
  498. foreach (IGrouping<string, T> group in entitys.GroupBy(c => c.PartitionKey))
  499. {
  500. Dictionary<string, List<T>> dictInfo = new Dictionary<string, List<T>>
  501. {
  502. { group.Key, group.ToList() }
  503. };
  504. listInfo.Add(dictInfo);
  505. }
  506. foreach (Dictionary<string, List<T>> dict in listInfo)
  507. {
  508. IList<TableResult> result = null;
  509. foreach (string key in dict.Keys)
  510. {
  511. List<T> values = dict[key];
  512. //Parallel.ForEach(Partitioner.Create(0, values.Count, 50),
  513. // async range =>
  514. // {
  515. // TableBatchOperation batchOperation = new TableBatchOperation();
  516. // for (Int32 i = range.Item1; i < range.Item2; i++)
  517. // batchOperation.InsertOrReplace(values[i]);
  518. // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  519. // });
  520. int pageSize = 100;
  521. int pages = (int)Math.Ceiling((double)values.Count / pageSize);
  522. for (int i = 0; i < pages; i++)
  523. {
  524. List<T> lists = values.Skip((i) * pageSize).Take(pageSize).ToList();
  525. TableBatchOperation batchOperation = new TableBatchOperation();
  526. for (int j = 0; j < lists.Count; j++)
  527. {
  528. batchOperation.InsertOrReplace(lists[j]);
  529. }
  530. result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  531. }
  532. }
  533. }
  534. return entitys;
  535. }
  536. public async Task<List<T>> DeleteAll<T>(List<T> entitys) where T : TableEntity, new()
  537. {
  538. if (entitys.IsEmpty())
  539. {
  540. return null;
  541. }
  542. string TableName = await InitializeTable<T>();
  543. IList<Dictionary<string, List<T>>> listInfo = new List<Dictionary<string, List<T>>>();
  544. foreach (IGrouping<string, T> group in entitys.GroupBy(c => c.PartitionKey))
  545. {
  546. Dictionary<string, List<T>> dictInfo = new Dictionary<string, List<T>>
  547. {
  548. { group.Key, group.ToList() }
  549. };
  550. listInfo.Add(dictInfo);
  551. }
  552. foreach (Dictionary<string, List<T>> dict in listInfo)
  553. {
  554. IList<TableResult> result = null;
  555. foreach (string key in dict.Keys)
  556. {
  557. List<T> values = dict[key];
  558. //Parallel.ForEach(Partitioner.Create(0, values.Count, 100),
  559. // async range =>
  560. // {
  561. // TableBatchOperation batchOperation = new TableBatchOperation();
  562. // for (Int32 i = range.Item1; i < range.Item2; i++)
  563. // batchOperation.Delete(values[i]);
  564. // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  565. // });
  566. int pageSize = 100;
  567. int pages = (int)Math.Ceiling((double)values.Count / pageSize);
  568. for (int i = 0; i < pages; i++)
  569. {
  570. List<T> lists = values.Skip((i) * pageSize).Take(pageSize).ToList();
  571. TableBatchOperation batchOperation = new TableBatchOperation();
  572. for (int j = 0; j < lists.Count; j++)
  573. {
  574. batchOperation.Delete(lists[j]);
  575. }
  576. result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  577. }
  578. }
  579. }
  580. return entitys;
  581. }
  582. public async Task<T> Save<T>(TableEntity entity) where T : TableEntity, new()
  583. {
  584. string TableName = await InitializeTable<T>();
  585. TableOperation operation = TableOperation.Insert(entity);
  586. TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation);
  587. return (T)result.Result;
  588. }
  589. public async Task<T> SaveOrUpdate<T>(TableEntity entity) where T : TableEntity, new()
  590. {
  591. string TableName = await InitializeTable<T>();
  592. TableOperation operation = TableOperation.InsertOrReplace(entity);
  593. TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation);
  594. return (T)result.Result;
  595. }
  596. public async Task<T> Update<T>(TableEntity entity) where T : TableEntity, new()
  597. {
  598. string TableName = await InitializeTable<T>();
  599. TableOperation operation = TableOperation.Replace(entity);
  600. TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation);
  601. return (T)result.Result;
  602. }
  603. public async Task<T> Delete<T>(TableEntity entity) where T : TableEntity, new()
  604. {
  605. string TableName = await InitializeTable<T>();
  606. TableOperation operation = TableOperation.Delete(entity);
  607. TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation);
  608. return (T)result.Result;
  609. }
  610. //public async Task<List<T>> FindListByDictAndLike<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict) where T : TableEntity, new()
  611. //{
  612. // throw new NotImplementedException();
  613. //}
  614. //public async Task<List<T>> FindListByDictAndLikeAndNotEQ<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict, Dictionary<string, object> notEQDict) where T : TableEntity, new()
  615. //{
  616. // throw new NotImplementedException();
  617. //}
  618. //public async Task<List<T>> FindListByDictAndLikeAndStartWith<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict, Dictionary<string, object> startDict) where T : TableEntity, new()
  619. //{
  620. // throw new NotImplementedException();
  621. //}
  622. public async Task<AzurePagination<T>> FindListByDict<T>(Dictionary<string, object> dict, AzureTableToken azureTableToken) where T : TableEntity, new()
  623. {
  624. string TableName = await InitializeTable<T>();
  625. var exQuery = new TableQuery<T>();
  626. StringBuilder builder = new StringBuilder();
  627. if (null != dict && dict.Count > 0)
  628. {
  629. var keys = dict.Keys;
  630. int index = 1;
  631. foreach (string key in keys)
  632. {
  633. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  634. {
  635. string typeStr = SwitchType<T>(dict, key);
  636. if (string.IsNullOrEmpty(typeStr))
  637. {
  638. continue;
  639. }
  640. if (index == 1)
  641. {
  642. // builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  643. builder.Append(typeStr);
  644. }
  645. else
  646. {
  647. builder.Append(" " + TableOperators.And + " " + typeStr);
  648. //builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  649. }
  650. index++;
  651. }
  652. else
  653. {
  654. throw new Exception("The parameter must have value!");
  655. }
  656. }
  657. exQuery.Where(builder.ToString());
  658. return await QueryList<T>(azureTableToken, exQuery, TableName);
  659. }
  660. else
  661. {
  662. return null;
  663. }
  664. }
  665. private async Task<AzurePagination<T>> QueryList<T>(AzureTableToken azureTableToken, TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  666. {
  667. TableContinuationToken tableToken = new HaBookTableContinuationToken(azureTableToken).GetContinuationToken();
  668. List<T> entitys = new List<T>();
  669. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, tableToken);
  670. if (result.Results.Count > 0)
  671. {
  672. entitys.AddRange(result.ToList());
  673. }
  674. tableToken = result.ContinuationToken;
  675. AzurePagination<T> pagination = new AzurePagination<T>
  676. {
  677. token = new HaBookTableContinuationToken(tableToken).GetAzureTableToken(),
  678. data = entitys
  679. };
  680. return pagination;
  681. }
  682. }
  683. }