AzureTableDBRepository.cs 23 KB


  1. using TEAMModelOS.SDK.Module.AzureTable.Configuration;
  2. using TEAMModelOS.SDK.Module.AzureTable.Interfaces;
  3. using TEAMModelOS.SDK.Extension.DataResult.PageToken;
  4. using Microsoft.WindowsAzure.Storage.Table;
  5. using System;
  6. using System.Collections.Concurrent;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading.Tasks;
  11. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  12. using TEAMModelOS.SDK.Context.Exception;
  13. using System.Reflection;
  14. using TEAMModelOS.SDK.Context.Attributes.Table;
  15. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  16. namespace TEAMModelOS.SDK.Module.AzureTable.Implements
  17. {
  18. public class AzureTableDBRepository : IAzureTableDBRepository
  19. {
  20. private readonly string china = "417A7572654368696E612020202020202020202020202020202020202020202097EB27FCC1F03349787DCD35F4DE22BBDFEDC90F24738B1D7FB9167A2C191BE671B512E17D48B73A002FC98867345CD59D3250AF59FD5FDFFC67976108F9E3BC9E9F75EDE605B058C1821D16BD9EB753B8E7D39FF48163430C1B5F3B6150195B880C3FCB87D35BF3540432734B768EC28C77B4CF0D556E794DE57979C1E01C429E66F7B2794D9940CF287F2B22A22E5F266B949D5E523E709FF37229E45D1A8FC8C4341E0A8346BB976CCB3D91802FFE5A4A28577898B4E942B5BA3A4A7B796FA673782D405060E7F2CBA4F67DF59F47";
  21. private readonly string global = "417A757265476C6F62616C2020202020202020202020202020202020202020206956019D195ED330AFA660D369B9464FC5E90AB3A106FDDD7978A2772DB186CDAE21C6CBFDE2B6739F089E853B3171A27841026E61C51666347F63FDF63E4377448D493B05CF6CDB3791946B9145825DD7756392EB8EA36DBF42E5C1C0021CEC2CDB5F4EA57EBCFA98B17D7236FA2CDCA6E7FCBE1DDC45BEAF691A2462A8BC3C429CBC4BCCA3192E554D23758AA8EA5937F988C927534C70A4769ED33878BEC10E2550F121E4AEB5A2DA213F2902D602A758C7D93D5DED368544F8A86D2A0CAA7813D1D950EC81D544EE41A8EDC84173";
  22. private readonly CloudTableClient tableClient;
  23. private CloudTable Table { get; set; }
  24. public AzureTableDBRepository(AzureTableOptions options)
  25. {
  26. if (!string.IsNullOrEmpty(options.ConnectionString))
  27. {
  28. tableClient = TableClientSingleton.getInstance(options.ConnectionString).GetTableClient();
  29. }
  30. else if (AzureTableConfig.AZURE_CHINA.Equals(options.AzureTableDialect))
  31. {
  32. AESCrypt crypt = new AESCrypt();
  33. tableClient = TableClientSingleton.getInstance(crypt.Decrypt(china, options.AzureTableDialect)).GetTableClient();
  34. }
  35. else if (AzureTableConfig.AZURE_GLOBAL.Equals(options.AzureTableDialect))
  36. {
  37. AESCrypt crypt = new AESCrypt();
  38. tableClient = TableClientSingleton.getInstance(crypt.Decrypt(global, options.AzureTableDialect)).GetTableClient();
  39. }
  40. else { throw new BizException("请设置正确的AzureTable数据库配置信息!"); }
  41. }
  42. public AzureTableDBRepository()
  43. {
  44. }
  45. private async Task<string> InitializeTable<T>()
  46. {
  47. string TableName = GetTableSpace<T>();
  48. if (Table == null || !Table.Name.Equals(TableName))
  49. {
  50. Table = tableClient.GetTableReference(TableName);
  51. await Table.CreateIfNotExistsAsync();
  52. }
  53. return TableName;
  54. }
  55. private string GetTableSpace<T>()
  56. {
  57. Type type = typeof(T);
  58. string Name = type.Name;
  59. object[] attributes = type.GetCustomAttributes(true);
  60. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  61. {
  62. if (attribute is TableSpaceAttribute)
  63. {
  64. TableSpaceAttribute tableSpace = (TableSpaceAttribute)attribute;
  65. Name = tableSpace.Name + Name;
  66. }
  67. }
  68. return Name;
  69. }
  70. public async Task<List<T>> FindAll<T>() where T : TableEntity, new()
  71. {
  72. string TableName = await InitializeTable<T>();
  73. var exQuery = new TableQuery<T>();
  74. return await QueryList<T>(exQuery, TableName);
  75. }
  76. private async Task<List<T>> QueryList<T>(TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  77. {
  78. TableContinuationToken continuationToken = null;
  79. List<T> entitys = new List<T>();
  80. do
  81. {
  82. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  83. if (result.Results.Count > 0)
  84. {
  85. entitys.AddRange(result.ToList());
  86. }
  87. continuationToken = result.ContinuationToken;
  88. } while (continuationToken != null);
  89. return entitys;
  90. }
  91. private async Task<T> QueryObject<T>(TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  92. {
  93. TableContinuationToken continuationToken = null;
  94. T entity = new T();
  95. do
  96. {
  97. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  98. if (result.Results.Count > 0)
  99. {
  100. entity = result.ToList().Single();
  101. }
  102. continuationToken = result.ContinuationToken;
  103. } while (continuationToken != null);
  104. return entity;
  105. }
  106. public async Task<int> Count<T>(TableContinuationToken continuationToken) where T : TableEntity, new()
  107. {
  108. string TableName = await InitializeTable<T>();
  109. var exQuery = new TableQuery<T>();
  110. List<T> entitys = new List<T>();
  111. do
  112. {
  113. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  114. if (result.Results.Count > 0)
  115. {
  116. entitys.AddRange(result.ToList());
  117. }
  118. continuationToken = result.ContinuationToken;
  119. } while (continuationToken != null);
  120. return entitys.Count;
  121. }
  122. public async Task<int> Count<T>() where T : TableEntity, new()
  123. {
  124. string TableName = await InitializeTable<T>();
  125. TableContinuationToken continuationToken = null;
  126. var exQuery = new TableQuery<T>();
  127. List<T> entitys = new List<T>();
  128. do
  129. {
  130. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  131. if (result.Results.Count > 0)
  132. {
  133. entitys.AddRange(result.ToList());
  134. }
  135. continuationToken = result.ContinuationToken;
  136. } while (continuationToken != null);
  137. return entitys.Count;
  138. }
  139. public async Task<T> FindByRowKey<T>(string id) where T : TableEntity, new()
  140. {
  141. string TableName = await InitializeTable<T>();
  142. var exQuery = new TableQuery<T>();
  143. if (!string.IsNullOrEmpty(id))
  144. {
  145. string typeStr = SwitchType(id, "RowKey");
  146. exQuery.Where(typeStr);
  147. // exQuery.Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, id));
  148. return await QueryObject<T>(exQuery, TableName);
  149. }
  150. else
  151. {
  152. return null;
  153. }
  154. }
  155. public async Task<List<T>> FindListByDict<T>(Dictionary<string, object> dict) where T : TableEntity, new()
  156. {
  157. string TableName = await InitializeTable<T>();
  158. var exQuery = new TableQuery<T>();
  159. StringBuilder builder = new StringBuilder();
  160. if (null != dict && dict.Count > 0)
  161. {
  162. var keys = dict.Keys;
  163. int index = 1;
  164. foreach (string key in keys)
  165. {
  166. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  167. {
  168. string typeStr = SwitchType(dict[key], key);
  169. if (index == 1)
  170. {
  171. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  172. builder.Append(typeStr);
  173. }
  174. else
  175. {
  176. //builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  177. builder.Append(" " + TableOperators.And + " " + typeStr);
  178. }
  179. index++;
  180. }
  181. }
  182. exQuery.Where(builder.ToString());
  183. return await QueryList<T>(exQuery, TableName);
  184. }
  185. else
  186. {
  187. return null;
  188. }
  189. }
  190. public async Task<List<T>> FindListByKey<T>(string key, object value) where T : TableEntity, new()
  191. {
  192. string TableName = await InitializeTable<T>();
  193. var exQuery = new TableQuery<T>();
  194. if (!string.IsNullOrEmpty(key) && value != null && !string.IsNullOrEmpty(value.ToString()))
  195. {
  196. string typeStr = SwitchType(value, key);
  197. exQuery.Where(typeStr);
  198. //exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, value));
  199. return await QueryList<T>(exQuery, TableName);
  200. }
  201. else
  202. {
  203. return null;
  204. }
  205. }
  206. public async Task<T> FindOneByDict<T>(IDictionary<string, object> dict) where T : TableEntity, new()
  207. {
  208. string TableName = await InitializeTable<T>();
  209. var exQuery = new TableQuery<T>();
  210. StringBuilder builder = new StringBuilder();
  211. if (null != dict && dict.Count > 0)
  212. {
  213. var keys = dict.Keys;
  214. int index = 1;
  215. foreach (string key in keys)
  216. {
  217. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  218. {
  219. string typeStr = SwitchType(dict[key], key);
  220. if (index == 1)
  221. {
  222. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  223. builder.Append(typeStr);
  224. }
  225. else
  226. {
  227. // builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  228. builder.Append(" " + TableOperators.And + " " + typeStr);
  229. }
  230. index++;
  231. }
  232. }
  233. exQuery.Where(builder.ToString());
  234. return await QueryObject<T>(exQuery, TableName);
  235. }
  236. else
  237. {
  238. return null;
  239. }
  240. }
  241. private static string SwitchType(object obj, string key)
  242. {
  243. Type s = obj.GetType();
  244. TypeCode typeCode = Type.GetTypeCode(s);
  245. switch (typeCode)
  246. {
  247. case TypeCode.String: return TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, obj.ToString());
  248. case TypeCode.Int32: return TableQuery.GenerateFilterConditionForInt(key, QueryComparisons.Equal, (int)obj);
  249. case TypeCode.Double: return TableQuery.GenerateFilterConditionForDouble(key, QueryComparisons.Equal, (double)obj);
  250. case TypeCode.Byte: return TableQuery.GenerateFilterConditionForBinary(key, QueryComparisons.Equal, (byte[])obj);
  251. case TypeCode.Boolean: return TableQuery.GenerateFilterConditionForBool(key, QueryComparisons.Equal, (bool)obj);
  252. case TypeCode.DateTime: return TableQuery.GenerateFilterConditionForDate(key, QueryComparisons.Equal, (DateTimeOffset)obj);
  253. case TypeCode.Int64: return TableQuery.GenerateFilterConditionForLong(key, QueryComparisons.Equal, (long)obj);
  254. default: return null;
  255. }
  256. }
  257. public async Task<T> FindOneByKey<T>(string key, object value) where T : TableEntity, new()
  258. {
  259. string TableName = await InitializeTable<T>();
  260. var exQuery = new TableQuery<T>();
  261. if (!string.IsNullOrEmpty(key) && value != null && !string.IsNullOrEmpty(value.ToString()))
  262. {
  263. string typeStr = SwitchType(value, key);
  264. exQuery.Where(typeStr);
  265. //exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal,
  266. // value));
  267. return await QueryObject<T>(exQuery, TableName);
  268. }
  269. else
  270. {
  271. return null;
  272. }
  273. }
  274. public async Task<List<T>> GetEntities<T>(IDictionary<string, object> dict) where T : TableEntity, new()
  275. {
  276. string TableName = await InitializeTable<T>();
  277. var exQuery = new TableQuery<T>();
  278. StringBuilder builder = new StringBuilder();
  279. if (null != dict && dict.Count > 0)
  280. {
  281. var keys = dict.Keys;
  282. int index = 1;
  283. foreach (string key in keys)
  284. {
  285. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  286. {
  287. string typeStr = SwitchType(dict, key);
  288. if (index == 1)
  289. {
  290. //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  291. builder.Append(typeStr);
  292. }
  293. else
  294. {
  295. // builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  296. builder.Append(" " + TableOperators.And + " " + typeStr);
  297. }
  298. index++;
  299. }
  300. }
  301. exQuery.Where(builder.ToString());
  302. return await QueryList<T>(exQuery, TableName);
  303. }
  304. else
  305. {
  306. return null;
  307. }
  308. }
  309. public async Task<List<T>> SaveAll<T>(List<T> entitys) where T : TableEntity, new()
  310. {
  311. if (entitys.IsEmpty())
  312. {
  313. return null;
  314. }
  315. string TableName = await InitializeTable<T>();
  316. IList<TableResult> result = null;
  317. Parallel.ForEach(Partitioner.Create(0, entitys.Count, 100),
  318. async range =>
  319. {
  320. TableBatchOperation batchOperation = new TableBatchOperation();
  321. for (Int32 i = range.Item1; i < range.Item2; i++)
  322. batchOperation.Insert(entitys[i]);
  323. result = await tableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  324. });
  325. return entitys;
  326. }
  327. public async Task<List<T>> UpdateAll<T>(List<T> entitys) where T : TableEntity, new()
  328. {
  329. if (entitys.IsEmpty()) {
  330. return null;
  331. }
  332. string TableName = await InitializeTable<T>();
  333. IList<TableResult> result = null;
  334. Parallel.ForEach(Partitioner.Create(0, entitys.Count, 100),
  335. async range =>
  336. {
  337. TableBatchOperation batchOperation = new TableBatchOperation();
  338. for (Int32 i = range.Item1; i < range.Item2; i++)
  339. batchOperation.Replace(entitys[i]);
  340. result = await tableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  341. });
  342. return entitys;
  343. }
  344. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> entitys) where T : TableEntity, new()
  345. {
  346. if (entitys.IsEmpty())
  347. {
  348. return null;
  349. }
  350. string TableName = await InitializeTable<T>();
  351. IList<TableResult> result = null;
  352. Parallel.ForEach(Partitioner.Create(0, entitys.Count, 100),
  353. async range =>
  354. {
  355. TableBatchOperation batchOperation = new TableBatchOperation();
  356. for (Int32 i = range.Item1; i < range.Item2; i++)
  357. batchOperation.InsertOrReplace(entitys[i]);
  358. result = await tableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  359. });
  360. return entitys;
  361. }
  362. public async Task<List<T>> DeleteAll<T>(List<T> entitys) where T : TableEntity, new()
  363. {
  364. if (entitys.IsEmpty())
  365. {
  366. return null;
  367. }
  368. string TableName = await InitializeTable<T>();
  369. IList<TableResult> result = null;
  370. Parallel.ForEach(Partitioner.Create(0, entitys.Count, 100),
  371. async range =>
  372. {
  373. TableBatchOperation batchOperation = new TableBatchOperation();
  374. for (Int32 i = range.Item1; i < range.Item2; i++)
  375. batchOperation.Delete(entitys[i]);
  376. result = await tableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  377. });
  378. return (List<T>)result;
  379. }
  380. public async Task<T> Save<T>(TableEntity entity) where T : TableEntity, new()
  381. {
  382. string TableName = await InitializeTable<T>();
  383. TableOperation operation = TableOperation.Insert(entity);
  384. TableResult result = await tableClient.GetTableReference(TableName).ExecuteAsync(operation);
  385. return (T)result.Result;
  386. }
  387. public async Task<T> SaveOrUpdate<T>(TableEntity entity) where T : TableEntity, new()
  388. {
  389. string TableName = await InitializeTable<T>();
  390. TableOperation operation = TableOperation.InsertOrReplace(entity);
  391. TableResult result = await tableClient.GetTableReference(TableName).ExecuteAsync(operation);
  392. return (T)result.Result;
  393. }
  394. public async Task<T> Update<T>(TableEntity entity) where T : TableEntity, new()
  395. {
  396. string TableName = await InitializeTable<T>();
  397. TableOperation operation = TableOperation.Replace(entity);
  398. TableResult result = await tableClient.GetTableReference(TableName).ExecuteAsync(operation);
  399. return (T)result.Result;
  400. }
  401. public async Task<T> Delete<T>(TableEntity entity) where T : TableEntity, new()
  402. {
  403. string TableName = await InitializeTable<T>();
  404. TableOperation operation = TableOperation.Delete(entity);
  405. TableResult result = await tableClient.GetTableReference(TableName).ExecuteAsync(operation);
  406. return (T)result.Result;
  407. }
  408. //public async Task<List<T>> FindListByDictAndLike<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict) where T : TableEntity, new()
  409. //{
  410. // throw new NotImplementedException();
  411. //}
  412. //public async Task<List<T>> FindListByDictAndLikeAndNotEQ<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict, Dictionary<string, object> notEQDict) where T : TableEntity, new()
  413. //{
  414. // throw new NotImplementedException();
  415. //}
  416. //public async Task<List<T>> FindListByDictAndLikeAndStartWith<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict, Dictionary<string, object> startDict) where T : TableEntity, new()
  417. //{
  418. // throw new NotImplementedException();
  419. //}
  420. public async Task<AzurePagination<T>> FindListByDict<T>(Dictionary<string, object> dict, AzureTableToken azureTableToken) where T : TableEntity, new()
  421. {
  422. string TableName = await InitializeTable<T>();
  423. var exQuery = new TableQuery<T>();
  424. StringBuilder builder = new StringBuilder();
  425. if (null != dict && dict.Count > 0)
  426. {
  427. var keys = dict.Keys;
  428. int index = 1;
  429. foreach (string key in keys)
  430. {
  431. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  432. {
  433. string typeStr = SwitchType(dict, key);
  434. if (index == 1)
  435. {
  436. // builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  437. builder.Append(typeStr);
  438. }
  439. else
  440. {
  441. builder.Append(" " + TableOperators.And + " " + typeStr);
  442. //builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  443. }
  444. index++;
  445. }
  446. }
  447. exQuery.Where(builder.ToString());
  448. return await QueryList<T>(azureTableToken, exQuery, TableName);
  449. }
  450. else
  451. {
  452. return null;
  453. }
  454. }
  455. private async Task<AzurePagination<T>> QueryList<T>(AzureTableToken azureTableToken, TableQuery<T> exQuery, string TableName) where T : TableEntity, new()
  456. {
  457. TableContinuationToken tableToken = new HaBookTableContinuationToken(azureTableToken).GetContinuationToken();
  458. List<T> entitys = new List<T>();
  459. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, tableToken);
  460. if (result.Results.Count > 0)
  461. {
  462. entitys.AddRange(result.ToList());
  463. }
  464. tableToken = result.ContinuationToken;
  465. AzurePagination<T> pagination = new AzurePagination<T>
  466. {
  467. token = new HaBookTableContinuationToken(tableToken).GetAzureTableToken(),
  468. data = entitys
  469. };
  470. return pagination;
  471. }
  472. }
  473. }