AzureTableDBRepository.cs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. using TEAMModelOS.SDK.Module.AzureTable.Configuration;
  2. using TEAMModelOS.SDK.Module.AzureTable.Interfaces;
  3. using TEAMModelOS.SDK;
  4. using System;
  5. using System.Collections.Concurrent;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  11. using TEAMModelOS.SDK.Context.Exception;
  12. using System.Reflection;
  13. using TEAMModelOS.SDK.Context.Attributes.Azure;
  14. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  15. using TEAMModelOS.SDK.Context.Configuration;
  16. using Microsoft.Extensions.Configuration;
  17. using Microsoft.Azure.Cosmos.Table;
  18. namespace TEAMModelOS.SDK.Module.AzureTable.Implements
  19. { //Linq全部查詢有1000限制
  20. /*
  21. public async Task<List<T>> GetByPartitionKey(string partitionKey)
  22. {
  23. var theQuery = table.CreateQuery<T>().Where(ent => ent.PartitionKey == partitionKey);
  24. TableQuerySegment<T> querySegment = null;
  25. var returnList = new List<T>();
  26. while (querySegment == null || querySegment.ContinuationToken != null)
  27. {
  28. querySegment = await theQuery.AsTableQuery()
  29. .ExecuteSegmentedAsync(querySegment != null ?
  30. querySegment.ContinuationToken : null);
  31. returnList.AddRange(querySegment);
  32. }
  33. return returnList;
  34. //Linq全部查詢沒有1000限制(要驗證)
  35. public Task<List<T>> GetByPartitionKey(string partitionKey)
  36. {
  37. return Task.Run(() => table.CreateQuery<T>()
  38. .Where(ent => ent.PartitionKey == partitionKey)
  39. .ToList());
  40. }
  41. }*/
  42. /// <summary>
  43. /// 带处理问题
  44. /// 1.使用 Microsoft.Azure.Cosmos.Table
  45. /// 2.AddSingleton 可能不適合CloudTableClient 线程不安全
  46. /// 3.能否使用 Efcore
  47. /// 4.有CreateQuery可以用,也有AsTableQuery轉換 還有Microsoft.Azure.Cosmos.Table.Queryable 支持 Linq lambda語法
  48. /// 5. 另外可以驗證一下最低查詢投影處理Count時 var query = from entity in table.CreateQuery<T>(tableName) select new { entity.PartitionKey };
  49. ///
  50. /// 另外你有優化的話,順便驗證一下
  51. /// </summary>
  52. public class AzureTableDBRepository : IAzureTableDBRepository
  53. {
  54. private CloudTableClient CloudTableClient { get; set; }
  55. private CloudTable cloudTable { get; set; }
  56. private AzureTableOptions options { get; set; }
  57. public AzureTableDBRepository()
  58. {
  59. options = BaseConfigModel.Configuration.GetSection("Azure:Table").Get<AzureTableOptions>();
  60. CloudTableClient = CloudStorageAccount.Parse(options.ConnectionString).CreateCloudTableClient();
  61. }
  62. private string GetTableSpace<T>()
  63. {
  64. Type type = typeof(T);
  65. string Name = type.Name;
  66. object[] attributes = type.GetCustomAttributes(true);
  67. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  68. {
  69. if (attribute is TableNameAttribute tableSpace)
  70. {
  71. Name = tableSpace.Name;
  72. }
  73. }
  74. return Name;
  75. }
  76. private async Task<string> InitializeTable<T>()
  77. {
  78. string TableName = GetTableSpace<T>();
  79. if (cloudTable == null || !cloudTable.Name.Equals(TableName))
  80. {
  81. cloudTable = CloudTableClient.GetTableReference(TableName);
  82. await cloudTable.CreateIfNotExistsAsync();
  83. }
  84. return TableName;
  85. }
  86. //Linq全部查詢有1000限制
  87. //public async Task<List<T>> GetByPartitionKey<T>(string partitionKey) where T : TableEntity, new()
  88. //{
  89. // var theQuery = cloudTable.CreateQuery<T>().Where(ent => ent.PartitionKey == partitionKey);
  90. // TableQuerySegment<T> querySegment = null;
  91. // var returnList = new List<T>();
  92. // while (querySegment == null || querySegment.ContinuationToken != null)
  93. // {
  94. // querySegment = await theQuery.AsTableQuery()
  95. // .ExecuteSegmentedAsync(querySegment != null ?
  96. // querySegment.ContinuationToken : null);
  97. // returnList.AddRange(querySegment);
  98. // }
  99. // return returnList;
  100. //}
  101. //Linq全部查詢沒有1000限制(要驗證)
  102. /// <summary>
  103. /// Linq全部查詢沒有1000限制(要驗證)
  104. /// 就是最低消耗查詢處理Count var query = from entity in table.CreateQuery<T>(tableName) select new { entity.PartitionKey };
  105. /// </summary>
  106. /// <typeparam name="T"></typeparam>
  107. /// <param name="partitionKey"></param>
  108. /// <returns></returns>
  109. public Task<List<T>> GetByPartitionKey<T>(string partitionKey) where T : TableEntity, new()
  110. {
  111. return Task.Run(() => cloudTable.CreateQuery<T>()
  112. .Where(ent => ent.PartitionKey == partitionKey)
  113. .ToList());
  114. }
  115. public async Task<List<T>> FindAll<T>() where T : TableEntity, new()
  116. {
  117. string TableName = await InitializeTable<T>();
  118. var exQuery = new TableQuery<T>();
  119. return await QueryList<T>(exQuery, TableName);
  120. }
  121. private async Task<List<T>> QueryList<T>(TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  122. {
  123. TableContinuationToken continuationToken = null;
  124. List<T> entitys = new List<T>();
  125. do
  126. {
  127. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  128. if (result.Results.Count > 0)
  129. {
  130. entitys.AddRange(result.ToList());
  131. }
  132. continuationToken = result.ContinuationToken;
  133. } while (continuationToken != null);
  134. return entitys;
  135. }
  136. private async Task<T> QueryObject<T>(TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  137. {
  138. TableContinuationToken continuationToken = null;
  139. T entity = new T();
  140. do
  141. {
  142. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  143. if (result.Results.Count > 0)
  144. {
  145. entity= result.ToList()[0];
  146. break;
  147. }
  148. continuationToken = result.ContinuationToken;
  149. } while (continuationToken != null);
  150. return entity;
  151. }
  152. public async Task<int> Count<T>(TableContinuationToken continuationToken) where T : TableEntity, new()
  153. {
  154. string TableName = await InitializeTable<T>();
  155. var exQuery = new TableQuery<T>();
  156. List<T> entitys = new List<T>();
  157. do
  158. {
  159. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  160. if (result.Results.Count > 0)
  161. {
  162. entitys.AddRange(result.ToList());
  163. }
  164. continuationToken = result.ContinuationToken;
  165. } while (continuationToken != null);
  166. return entitys.Count;
  167. }
  168. public async Task<int> Count<T>() where T : TableEntity, new()
  169. {
  170. string TableName = await InitializeTable<T>();
  171. TableContinuationToken continuationToken = null;
  172. var exQuery = new TableQuery<T>();
  173. List<T> entitys = new List<T>();
  174. do
  175. {
  176. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  177. if (result.Results.Count > 0)
  178. {
  179. entitys.AddRange(result.ToList());
  180. }
  181. continuationToken = result.ContinuationToken;
  182. } while (continuationToken != null);
  183. return entitys.Count;
  184. }
  185. public async Task<T> FindByRowKey<T>(string id) where T : TableEntity, new()
  186. {
  187. string TableName = await InitializeTable<T>();
  188. var exQuery = new TableQuery<T>();
  189. if (!string.IsNullOrEmpty(id))
  190. {
  191. string typeStr = SwitchType<T>(id, "RowKey");
  192. if (string.IsNullOrEmpty(typeStr))
  193. {
  194. return null;
  195. }
  196. exQuery.Where(typeStr);
  197. // exQuery.Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, id));
  198. return await QueryObject<T>(exQuery, TableName);
  199. }
  200. else
  201. {
  202. return null;
  203. }
  204. }
  205. public async Task<List<T>> FindListByDict<T>(Dictionary<string, object> dict) where T : TableEntity, new()
  206. {
  207. string TableName = await InitializeTable<T>();
  208. var exQuery = new TableQuery<T>();
  209. StringBuilder builder = new StringBuilder();
  210. if (null != dict && dict.Count > 0)
  211. {
  212. var keys = dict.Keys;
  213. int index = 1;
  214. foreach (string key in keys)
  215. {
  216. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  217. {
  218. string typeStr = SwitchType<T>(dict[key], key);
  219. if (string.IsNullOrEmpty(typeStr))
  220. {
  221. continue;
  222. }
  223. if (index == 1)
  224. {
  225. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  226. builder.Append(typeStr);
  227. }
  228. else
  229. {
  230. //builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  231. builder.Append(" " + TableOperators.And + " " + typeStr);
  232. }
  233. index++;
  234. }
  235. else
  236. {
  237. throw new Exception("The parameter must have value!");
  238. }
  239. }
  240. exQuery.Where(builder.ToString());
  241. return await QueryList<T>(exQuery, TableName);
  242. }
  243. else
  244. {
  245. return null;
  246. }
  247. }
  248. public async Task<List<T>> FindListByKey<T>(string key, object value) where T : TableEntity, new()
  249. {
  250. string TableName = await InitializeTable<T>();
  251. var exQuery = new TableQuery<T>();
  252. if (!string.IsNullOrEmpty(key) && value != null && !string.IsNullOrEmpty(value.ToString()))
  253. {
  254. string typeStr = SwitchType<T>(value, key);
  255. if (string.IsNullOrEmpty(typeStr))
  256. {
  257. return null;
  258. }
  259. exQuery.Where(typeStr);
  260. //exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, value));
  261. return await QueryList<T>(exQuery, TableName);
  262. }
  263. else
  264. {
  265. return null;
  266. }
  267. }
  268. public async Task<T> FindOneByDict<T>(IDictionary<string, object> dict) where T : TableEntity, new()
  269. {
  270. string TableName = await InitializeTable<T>();
  271. var exQuery = new TableQuery<T>();
  272. StringBuilder builder = new StringBuilder();
  273. if (null != dict && dict.Count > 0)
  274. {
  275. var keys = dict.Keys;
  276. int index = 1;
  277. foreach (string key in keys)
  278. {
  279. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  280. {
  281. string typeStr = SwitchType<T>(dict[key], key);
  282. if (string.IsNullOrEmpty(typeStr))
  283. {
  284. continue;
  285. }
  286. if (index == 1)
  287. {
  288. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  289. builder.Append(typeStr);
  290. }
  291. else
  292. {
  293. // builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  294. builder.Append(" " + TableOperators.And + " " + typeStr);
  295. }
  296. index++;
  297. }
  298. else
  299. {
  300. throw new Exception("The parameter must have value!");
  301. }
  302. }
  303. exQuery.Where(builder.ToString());
  304. return await QueryObject<T>(exQuery, TableName);
  305. }
  306. else
  307. {
  308. return null;
  309. }
  310. }
  311. private static string SwitchType<T>(object obj, string key)
  312. {
  313. Type objType = typeof(T);
  314. PropertyInfo property = objType.GetProperty(key);
  315. //Type s = obj.GetType();
  316. //TypeCode typeCode = Type.GetTypeCode(s);
  317. if (property == null)
  318. {
  319. //return null;
  320. throw new Exception(objType.FullName + " PropertyInfo doesn't include this parameter :" + key);
  321. }
  322. TypeCode typeCode = Type.GetTypeCode(property.PropertyType);
  323. switch (typeCode)
  324. {
  325. case TypeCode.String: return TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, obj.ToString());
  326. case TypeCode.Int32: return TableQuery.GenerateFilterConditionForInt(key, QueryComparisons.Equal, int.Parse(obj.ToString()));
  327. case TypeCode.Double: return TableQuery.GenerateFilterConditionForDouble(key, QueryComparisons.Equal, (double)obj);
  328. case TypeCode.Byte: return TableQuery.GenerateFilterConditionForBinary(key, QueryComparisons.Equal, (byte[])obj);
  329. case TypeCode.Boolean: return TableQuery.GenerateFilterConditionForBool(key, QueryComparisons.Equal, (bool)obj);
  330. case TypeCode.DateTime: return TableQuery.GenerateFilterConditionForDate(key, QueryComparisons.Equal, (DateTimeOffset)obj);
  331. case TypeCode.Int64: return TableQuery.GenerateFilterConditionForLong(key, QueryComparisons.Equal, long.Parse(obj.ToString()));
  332. default: return null;
  333. }
  334. }
  335. public async Task<T> FindOneByKey<T>(string key, object value) where T : TableEntity, new()
  336. {
  337. string TableName = await InitializeTable<T>();
  338. var exQuery = new TableQuery<T>();
  339. if (!string.IsNullOrEmpty(key) && value != null && !string.IsNullOrEmpty(value.ToString()))
  340. {
  341. string typeStr = SwitchType<T>(value, key);
  342. if (string.IsNullOrEmpty(typeStr))
  343. {
  344. return null;
  345. }
  346. exQuery.Where(typeStr);
  347. //exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal,
  348. // value));
  349. return await QueryObject<T>(exQuery, TableName);
  350. }
  351. else
  352. {
  353. return null;
  354. }
  355. }
  356. public async Task<List<T>> GetEntities<T>(IDictionary<string, object> dict) where T : TableEntity, new()
  357. {
  358. string TableName = await InitializeTable<T>();
  359. var exQuery = new TableQuery<T>();
  360. StringBuilder builder = new StringBuilder();
  361. if (null != dict && dict.Count > 0)
  362. {
  363. var keys = dict.Keys;
  364. int index = 1;
  365. foreach (string key in keys)
  366. {
  367. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  368. {
  369. string typeStr = SwitchType<T>(dict, key);
  370. if (string.IsNullOrEmpty(typeStr))
  371. {
  372. continue;
  373. }
  374. if (index == 1)
  375. {
  376. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  377. builder.Append(typeStr);
  378. }
  379. else
  380. {
  381. // builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  382. builder.Append(" " + TableOperators.And + " " + typeStr);
  383. }
  384. index++;
  385. }
  386. else
  387. {
  388. throw new Exception("The parameter must have value!");
  389. }
  390. }
  391. exQuery.Where(builder.ToString());
  392. return await QueryList<T>(exQuery, TableName);
  393. }
  394. else
  395. {
  396. return null;
  397. }
  398. }
  399. public async Task<List<T>> SaveAll<T>(List<T> entitys) where T : TableEntity, new()
  400. {
  401. if (entitys.IsEmpty())
  402. {
  403. return null;
  404. }
  405. string TableName = await InitializeTable<T>();
  406. IList<Dictionary<string, List<T>>> listInfo = new List<Dictionary<string, List<T>>>();
  407. foreach (IGrouping<string, T> group in entitys.GroupBy(c => c.PartitionKey))
  408. {
  409. Dictionary<string, List<T>> dictInfo = new Dictionary<string, List<T>>
  410. {
  411. { group.Key, group.ToList() }
  412. };
  413. listInfo.Add(dictInfo);
  414. }
  415. foreach (Dictionary<string, List<T>> dict in listInfo)
  416. {
  417. IList<TableResult> result = null;
  418. foreach (string key in dict.Keys)
  419. {
  420. List<T> values = dict[key];
  421. //Parallel.ForEach(Partitioner.Create(0, values.Count, 100),
  422. // async range =>
  423. // {
  424. // TableBatchOperation batchOperation = new TableBatchOperation();
  425. // for (Int32 i = range.Item1; i < range.Item2; i++)
  426. // batchOperation.Insert(values[i]);
  427. // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  428. // });
  429. int pageSize = 100;
  430. int pages = (int)Math.Ceiling((double)values.Count / pageSize);
  431. for (int i = 0; i < pages; i++)
  432. {
  433. List<T> lists = values.Skip((i) * pageSize).Take(pageSize).ToList();
  434. TableBatchOperation batchOperation = new TableBatchOperation();
  435. for (int j = 0; j < lists.Count; j++)
  436. {
  437. batchOperation.Insert(lists[j]);
  438. }
  439. result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  440. }
  441. }
  442. }
  443. return entitys;
  444. }
  445. public async Task<List<T>> UpdateAll<T>(List<T> entitys) where T : TableEntity, new()
  446. {
  447. if (entitys.IsEmpty())
  448. {
  449. return null;
  450. }
  451. string TableName = await InitializeTable<T>();
  452. IList<Dictionary<string, List<T>>> listInfo = new List<Dictionary<string, List<T>>>();
  453. foreach (IGrouping<string, T> group in entitys.GroupBy(c => c.PartitionKey))
  454. {
  455. Dictionary<string, List<T>> dictInfo = new Dictionary<string, List<T>>
  456. {
  457. { group.Key, group.ToList() }
  458. };
  459. listInfo.Add(dictInfo);
  460. }
  461. foreach (Dictionary<string, List<T>> dict in listInfo)
  462. {
  463. IList<TableResult> result = null;
  464. foreach (string key in dict.Keys)
  465. {
  466. List<T> values = dict[key];
  467. //Parallel.ForEach(Partitioner.Create(0, values.Count, 100),
  468. // async range =>
  469. // {
  470. // TableBatchOperation batchOperation = new TableBatchOperation();
  471. // for (Int32 i = range.Item1; i < range.Item2; i++)
  472. // batchOperation.Replace(values[i]);
  473. // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  474. // });
  475. int pageSize = 100;
  476. int pages = (int)Math.Ceiling((double)values.Count / pageSize);
  477. for (int i = 0; i < pages; i++)
  478. {
  479. List<T> lists = values.Skip((i) * pageSize).Take(pageSize).ToList();
  480. TableBatchOperation batchOperation = new TableBatchOperation();
  481. for (int j = 0; j < lists.Count; j++)
  482. {
  483. batchOperation.Replace(lists[j]);
  484. }
  485. result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  486. }
  487. }
  488. }
  489. return entitys;
  490. }
  491. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> entitys) where T : TableEntity, new()
  492. {
  493. if (entitys.IsEmpty())
  494. {
  495. return null;
  496. }
  497. string TableName = await InitializeTable<T>();
  498. IList<Dictionary<string, List<T>>> listInfo = new List<Dictionary<string, List<T>>>();
  499. foreach (IGrouping<string, T> group in entitys.GroupBy(c => c.PartitionKey))
  500. {
  501. Dictionary<string, List<T>> dictInfo = new Dictionary<string, List<T>>
  502. {
  503. { group.Key, group.ToList() }
  504. };
  505. listInfo.Add(dictInfo);
  506. }
  507. foreach (Dictionary<string, List<T>> dict in listInfo)
  508. {
  509. IList<TableResult> result = null;
  510. foreach (string key in dict.Keys)
  511. {
  512. List<T> values = dict[key];
  513. //Parallel.ForEach(Partitioner.Create(0, values.Count, 50),
  514. // async range =>
  515. // {
  516. // TableBatchOperation batchOperation = new TableBatchOperation();
  517. // for (Int32 i = range.Item1; i < range.Item2; i++)
  518. // batchOperation.InsertOrReplace(values[i]);
  519. // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  520. // });
  521. int pageSize = 100;
  522. int pages = (int)Math.Ceiling((double)values.Count / pageSize);
  523. for (int i = 0; i < pages; i++)
  524. {
  525. List<T> lists = values.Skip((i) * pageSize).Take(pageSize).ToList();
  526. TableBatchOperation batchOperation = new TableBatchOperation();
  527. for (int j = 0; j < lists.Count; j++)
  528. {
  529. batchOperation.InsertOrReplace(lists[j]);
  530. }
  531. result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  532. }
  533. }
  534. }
  535. return entitys;
  536. }
  537. public async Task<List<T>> DeleteAll<T>(List<T> entitys) where T : TableEntity, new()
  538. {
  539. if (entitys.IsEmpty())
  540. {
  541. return null;
  542. }
  543. string TableName = await InitializeTable<T>();
  544. IList<Dictionary<string, List<T>>> listInfo = new List<Dictionary<string, List<T>>>();
  545. foreach (IGrouping<string, T> group in entitys.GroupBy(c => c.PartitionKey))
  546. {
  547. Dictionary<string, List<T>> dictInfo = new Dictionary<string, List<T>>
  548. {
  549. { group.Key, group.ToList() }
  550. };
  551. listInfo.Add(dictInfo);
  552. }
  553. foreach (Dictionary<string, List<T>> dict in listInfo)
  554. {
  555. IList<TableResult> result = null;
  556. foreach (string key in dict.Keys)
  557. {
  558. List<T> values = dict[key];
  559. //Parallel.ForEach(Partitioner.Create(0, values.Count, 100),
  560. // async range =>
  561. // {
  562. // TableBatchOperation batchOperation = new TableBatchOperation();
  563. // for (Int32 i = range.Item1; i < range.Item2; i++)
  564. // batchOperation.Delete(values[i]);
  565. // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  566. // });
  567. int pageSize = 100;
  568. int pages = (int)Math.Ceiling((double)values.Count / pageSize);
  569. for (int i = 0; i < pages; i++)
  570. {
  571. List<T> lists = values.Skip((i) * pageSize).Take(pageSize).ToList();
  572. TableBatchOperation batchOperation = new TableBatchOperation();
  573. for (int j = 0; j < lists.Count; j++)
  574. {
  575. batchOperation.Delete(lists[j]);
  576. }
  577. result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  578. }
  579. }
  580. }
  581. return entitys;
  582. }
  583. public async Task<T> Save<T>(TableEntity entity) where T : TableEntity, new()
  584. {
  585. string TableName = await InitializeTable<T>();
  586. TableOperation operation = TableOperation.Insert(entity);
  587. TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation);
  588. return (T)result.Result;
  589. }
  590. public async Task<T> SaveOrUpdate<T>(TableEntity entity) where T : TableEntity, new()
  591. {
  592. string TableName = await InitializeTable<T>();
  593. TableOperation operation = TableOperation.InsertOrReplace(entity);
  594. TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation);
  595. return (T)result.Result;
  596. }
  597. public async Task<T> Update<T>(TableEntity entity) where T : TableEntity, new()
  598. {
  599. string TableName = await InitializeTable<T>();
  600. TableOperation operation = TableOperation.Replace(entity);
  601. TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation);
  602. return (T)result.Result;
  603. }
  604. public async Task<T> Delete<T>(TableEntity entity) where T : TableEntity, new()
  605. {
  606. string TableName = await InitializeTable<T>();
  607. TableOperation operation = TableOperation.Delete(entity);
  608. TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation);
  609. return (T)result.Result;
  610. }
  611. //public async Task<List<T>> FindListByDictAndLike<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict) where T : TableEntity, new()
  612. //{
  613. // throw new NotImplementedException();
  614. //}
  615. //public async Task<List<T>> FindListByDictAndLikeAndNotEQ<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict, Dictionary<string, object> notEQDict) where T : TableEntity, new()
  616. //{
  617. // throw new NotImplementedException();
  618. //}
  619. //public async Task<List<T>> FindListByDictAndLikeAndStartWith<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict, Dictionary<string, object> startDict) where T : TableEntity, new()
  620. //{
  621. // throw new NotImplementedException();
  622. //}
  623. public async Task<AzurePagination<T>> FindListByDict<T>(Dictionary<string, object> dict, AzureTableToken azureTableToken) where T : TableEntity, new()
  624. {
  625. string TableName = await InitializeTable<T>();
  626. var exQuery = new TableQuery<T>();
  627. StringBuilder builder = new StringBuilder();
  628. if (null != dict && dict.Count > 0)
  629. {
  630. var keys = dict.Keys;
  631. int index = 1;
  632. foreach (string key in keys)
  633. {
  634. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  635. {
  636. string typeStr = SwitchType<T>(dict, key);
  637. if (string.IsNullOrEmpty(typeStr))
  638. {
  639. continue;
  640. }
  641. if (index == 1)
  642. {
  643. // builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  644. builder.Append(typeStr);
  645. }
  646. else
  647. {
  648. builder.Append(" " + TableOperators.And + " " + typeStr);
  649. //builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  650. }
  651. index++;
  652. }
  653. else
  654. {
  655. throw new Exception("The parameter must have value!");
  656. }
  657. }
  658. exQuery.Where(builder.ToString());
  659. return await QueryList<T>(azureTableToken, exQuery, TableName);
  660. }
  661. else
  662. {
  663. return null;
  664. }
  665. }
  666. private async Task<AzurePagination<T>> QueryList<T>(AzureTableToken azureTableToken, TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  667. {
  668. TableContinuationToken tableToken = new HaBookTableContinuationToken(azureTableToken).GetContinuationToken();
  669. List<T> entitys = new List<T>();
  670. var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, tableToken);
  671. if (result.Results.Count > 0)
  672. {
  673. entitys.AddRange(result.ToList());
  674. }
  675. tableToken = result.ContinuationToken;
  676. AzurePagination<T> pagination = new AzurePagination<T>
  677. {
  678. token = new HaBookTableContinuationToken(tableToken).GetAzureTableToken(),
  679. data = entitys
  680. };
  681. return pagination;
  682. }
  683. }
  684. }