AzureTableDBRepository.cs 20 KB


  1. using TEAMModelOS.SDK.Module.AzureTable.Configuration;
  2. using TEAMModelOS.SDK.Module.AzureTable.Interfaces;
  3. using TEAMModelOS.SDK.Extension.DataResult.PageToken;
  4. using Microsoft.WindowsAzure.Storage.Table;
  5. using System;
  6. using System.Collections.Concurrent;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading.Tasks;
  11. using TEAMModelOS.SDK.Helper.Security.AESCrypt;
  12. using TEAMModelOS.SDK.Context.Exception;
  13. using System.Reflection;
  14. using TEAMModelOS.SDK.Context.Attributes.Table;
  15. namespace TEAMModelOS.SDK.Module.AzureTable.Implements
  16. {
  17. public class AzureTableDBRepository : IAzureTableDBRepository
  18. {
  19. private readonly string china = "417A7572654368696E612020202020202020202020202020202020202020202097EB27FCC1F03349787DCD35F4DE22BBDFEDC90F24738B1D7FB9167A2C191BE671B512E17D48B73A002FC98867345CD59D3250AF59FD5FDFFC67976108F9E3BC9E9F75EDE605B058C1821D16BD9EB753B8E7D39FF48163430C1B5F3B6150195B880C3FCB87D35BF3540432734B768EC28C77B4CF0D556E794DE57979C1E01C429E66F7B2794D9940CF287F2B22A22E5F266B949D5E523E709FF37229E45D1A8FC8C4341E0A8346BB976CCB3D91802FFE5A4A28577898B4E942B5BA3A4A7B796FA673782D405060E7F2CBA4F67DF59F47";
  20. private readonly string global = "417A757265476C6F62616C2020202020202020202020202020202020202020206956019D195ED330AFA660D369B9464FC5E90AB3A106FDDD7978A2772DB186CDAE21C6CBFDE2B6739F089E853B3171A27841026E61C51666347F63FDF63E4377448D493B05CF6CDB3791946B9145825DD7756392EB8EA36DBF42E5C1C0021CEC2CDB5F4EA57EBCFA98B17D7236FA2CDCA6E7FCBE1DDC45BEAF691A2462A8BC3C429CBC4BCCA3192E554D23758AA8EA5937F988C927534C70A4769ED33878BEC10E2550F121E4AEB5A2DA213F2902D602A758C7D93D5DED368544F8A86D2A0CAA7813D1D950EC81D544EE41A8EDC84173";
  21. private readonly CloudTableClient tableClient;
  22. private CloudTable Table { get; set; }
  23. public AzureTableDBRepository(AzureTableOptions options)
  24. {
  25. if (!string.IsNullOrEmpty(options.ConnectionString))
  26. {
  27. tableClient = TableClientSingleton.getInstance(options.ConnectionString).GetTableClient();
  28. }
  29. else if (AzureTableConfig.AZURE_CHINA.Equals(options.AzureTableDialect))
  30. {
  31. AESCrypt crypt = new AESCrypt();
  32. tableClient = TableClientSingleton.getInstance(crypt.Decrypt(china, options.AzureTableDialect)).GetTableClient();
  33. }
  34. else if (AzureTableConfig.AZURE_GLOBAL.Equals(options.AzureTableDialect))
  35. {
  36. AESCrypt crypt = new AESCrypt();
  37. tableClient = TableClientSingleton.getInstance(crypt.Decrypt(global, options.AzureTableDialect)).GetTableClient();
  38. }
  39. else { throw new BizException("请设置正确的AzureTable数据库配置信息!"); }
  40. }
  41. public AzureTableDBRepository()
  42. {
  43. }
  44. private async Task<string> InitializeTable<T>()
  45. {
  46. string TableName = GetTableSpace<T>();
  47. if (Table == null || !Table.Name.Equals(TableName))
  48. {
  49. Table = tableClient.GetTableReference(TableName);
  50. await Table.CreateIfNotExistsAsync();
  51. }
  52. return TableName;
  53. }
  54. private string GetTableSpace<T>()
  55. {
  56. Type type = typeof(T);
  57. string Name = type.Name;
  58. object[] attributes = type.GetCustomAttributes(true);
  59. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  60. {
  61. if (attribute is TableSpaceAttribute)
  62. {
  63. TableSpaceAttribute tableSpace = (TableSpaceAttribute)attribute;
  64. Name = tableSpace.Name + Name;
  65. }
  66. }
  67. return Name;
  68. }
  69. public async Task<List<T>> FindAll<T>() where T : TableEntity, new()
  70. {
  71. string TableName= await InitializeTable<T>();
  72. var exQuery = new TableQuery<T>();
  73. return await QueryList<T>(exQuery , TableName);
  74. }
  75. private async Task<List<T>> QueryList<T>(TableQuery<T> exQuery ,string TableName) where T : TableEntity, new()
  76. {
  77. TableContinuationToken continuationToken = null;
  78. List<T> entitys = new List<T>();
  79. do
  80. {
  81. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  82. if (result.Results.Count > 0)
  83. {
  84. entitys.AddRange(result.ToList());
  85. }
  86. continuationToken = result.ContinuationToken;
  87. } while (continuationToken != null);
  88. return entitys;
  89. }
  90. private async Task<T> QueryObject<T>(TableQuery<T> exQuery ,string TableName) where T : TableEntity, new()
  91. {
  92. TableContinuationToken continuationToken = null;
  93. T entity = new T();
  94. do
  95. {
  96. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  97. if (result.Results.Count > 0)
  98. {
  99. entity = result.ToList().Single();
  100. }
  101. continuationToken = result.ContinuationToken;
  102. } while (continuationToken != null);
  103. return entity;
  104. }
  105. public async Task<int> Count<T>(TableContinuationToken continuationToken ) where T : TableEntity, new()
  106. {
  107. string TableName = await InitializeTable<T>();
  108. var exQuery = new TableQuery<T>();
  109. List<T> entitys = new List<T>();
  110. do
  111. {
  112. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  113. if (result.Results.Count > 0)
  114. {
  115. entitys.AddRange(result.ToList());
  116. }
  117. continuationToken = result.ContinuationToken;
  118. } while (continuationToken != null);
  119. return entitys.Count;
  120. }
  121. public async Task<int> Count<T>() where T : TableEntity, new()
  122. {
  123. string TableName = await InitializeTable<T>();
  124. TableContinuationToken continuationToken = null;
  125. var exQuery = new TableQuery<T>();
  126. List<T> entitys = new List<T>();
  127. do
  128. {
  129. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken);
  130. if (result.Results.Count > 0)
  131. {
  132. entitys.AddRange(result.ToList());
  133. }
  134. continuationToken = result.ContinuationToken;
  135. } while (continuationToken != null);
  136. return entitys.Count;
  137. }
  138. public async Task<T> FindById<T>(string id) where T : TableEntity, new()
  139. {
  140. string TableName = await InitializeTable<T>();
  141. var exQuery = new TableQuery<T>();
  142. if (!string.IsNullOrEmpty(id))
  143. {
  144. exQuery.Where(TableQuery.GenerateFilterCondition("Id", QueryComparisons.Equal, id));
  145. return await QueryObject<T>(exQuery , TableName);
  146. }
  147. else
  148. {
  149. return null;
  150. }
  151. }
  152. public async Task<List<T>> FindListByDict<T>(Dictionary<string, object> dict) where T : TableEntity, new()
  153. {
  154. string TableName= await InitializeTable<T>();
  155. var exQuery = new TableQuery<T>();
  156. StringBuilder builder = new StringBuilder();
  157. if (null != dict && dict.Count > 0)
  158. {
  159. var keys = dict.Keys;
  160. int index = 1;
  161. foreach (string key in keys)
  162. {
  163. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  164. {
  165. if (index == 1)
  166. {
  167. builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  168. }
  169. else
  170. {
  171. builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  172. }
  173. index++;
  174. }
  175. }
  176. exQuery.Where(builder.ToString());
  177. return await QueryList<T>(exQuery, TableName);
  178. }
  179. else {
  180. return null;
  181. }
  182. }
  183. public async Task<List<T>> FindListByKey<T>(string key, string value) where T : TableEntity, new()
  184. {
  185. string TableName = await InitializeTable<T>();
  186. var exQuery = new TableQuery<T>();
  187. if (!string.IsNullOrEmpty(key) && !string.IsNullOrEmpty(value))
  188. {
  189. exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, value));
  190. return await QueryList<T>(exQuery, TableName);
  191. }
  192. else {
  193. return null;
  194. }
  195. }
  196. public async Task<T> FindOneByDict<T>(IDictionary<string, object> dict) where T : TableEntity, new()
  197. {
  198. string TableName = await InitializeTable<T>();
  199. var exQuery = new TableQuery<T>();
  200. StringBuilder builder = new StringBuilder();
  201. if (null != dict && dict.Count > 0)
  202. {
  203. var keys = dict.Keys;
  204. int index = 1;
  205. foreach (string key in keys)
  206. {
  207. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  208. {
  209. if (index == 1)
  210. {
  211. builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  212. }
  213. else
  214. {
  215. builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  216. }
  217. index++;
  218. }
  219. }
  220. exQuery.Where(builder.ToString());
  221. return await QueryObject<T>(exQuery ,TableName);
  222. }
  223. else
  224. {
  225. return null;
  226. }
  227. }
  228. public async Task<T> FindOneByKey<T>(string key, string value) where T : TableEntity, new()
  229. {
  230. string TableName = await InitializeTable<T>();
  231. var exQuery = new TableQuery<T>();
  232. if (!string.IsNullOrEmpty(key) && !string.IsNullOrEmpty(value))
  233. {
  234. exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal,
  235. value));
  236. return await QueryObject<T>(exQuery ,TableName );
  237. }
  238. else
  239. {
  240. return null;
  241. }
  242. }
  243. public async Task<List<T>> GetEntities<T>(IDictionary<string, object> dict) where T : TableEntity, new()
  244. {
  245. string TableName= await InitializeTable<T>();
  246. var exQuery = new TableQuery<T>();
  247. StringBuilder builder = new StringBuilder();
  248. if (null != dict && dict.Count > 0)
  249. {
  250. var keys = dict.Keys;
  251. int index = 1;
  252. foreach (string key in keys)
  253. {
  254. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  255. {
  256. if (index == 1)
  257. {
  258. builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  259. }
  260. else
  261. {
  262. builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  263. }
  264. index++;
  265. }
  266. }
  267. exQuery.Where(builder.ToString());
  268. return await QueryList<T>(exQuery, TableName);
  269. }
  270. else {
  271. return null;
  272. }
  273. }
  274. public async Task<List<T>> SaveAll<T>(List<T> entitys) where T : TableEntity, new()
  275. {
  276. string TableName = await InitializeTable<T>();
  277. IList<TableResult> result = null;
  278. Parallel.ForEach(Partitioner.Create(0, entitys.Count, 100),
  279. async range =>
  280. {
  281. TableBatchOperation batchOperation = new TableBatchOperation();
  282. for (Int32 i = range.Item1; i < range.Item2; i++)
  283. batchOperation.Insert(entitys[i]);
  284. result = await tableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  285. });
  286. return entitys;
  287. }
  288. public async Task<List<T>> UpdateAll<T>(List<T> entitys) where T : TableEntity, new()
  289. {
  290. string TableName = await InitializeTable<T>();
  291. IList<TableResult> result = null;
  292. Parallel.ForEach(Partitioner.Create(0, entitys.Count, 100),
  293. async range =>
  294. {
  295. TableBatchOperation batchOperation = new TableBatchOperation();
  296. for (Int32 i = range.Item1; i < range.Item2; i++)
  297. batchOperation.Replace(entitys[i]);
  298. result = await tableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  299. });
  300. return entitys;
  301. }
  302. public async Task<List<T>> SaveOrUpdateAll<T>(List<T> entitys) where T : TableEntity, new() {
  303. string TableName = await InitializeTable<T>();
  304. IList<TableResult> result = null;
  305. Parallel.ForEach(Partitioner.Create(0, entitys.Count, 100),
  306. async range =>
  307. {
  308. TableBatchOperation batchOperation = new TableBatchOperation();
  309. for (Int32 i = range.Item1; i < range.Item2; i++)
  310. batchOperation.InsertOrReplace(entitys[i]);
  311. result = await tableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  312. });
  313. return entitys;
  314. }
  315. public async Task<List<T>> DeleteAll<T>(List<T> entitys) where T : TableEntity, new()
  316. {
  317. string TableName = await InitializeTable<T>();
  318. IList<TableResult> result = null;
  319. Parallel.ForEach(Partitioner.Create(0, entitys.Count, 100),
  320. async range =>
  321. {
  322. TableBatchOperation batchOperation = new TableBatchOperation();
  323. for (Int32 i = range.Item1; i < range.Item2; i++)
  324. batchOperation.Delete(entitys[i]);
  325. result = await tableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation);
  326. });
  327. return (List<T>)result;
  328. }
  329. public async Task<T> Save<T>(TableEntity entity) where T : TableEntity, new()
  330. {
  331. string TableName= await InitializeTable<T>();
  332. TableOperation operation = TableOperation.Insert(entity);
  333. TableResult result = await tableClient.GetTableReference(TableName).ExecuteAsync(operation);
  334. return (T)result.Result;
  335. }
  336. public async Task<T> SaveOrUpdate<T>(TableEntity entity) where T : TableEntity, new()
  337. {
  338. string TableName = await InitializeTable<T>();
  339. TableOperation operation = TableOperation.InsertOrReplace(entity);
  340. TableResult result = await tableClient.GetTableReference(TableName).ExecuteAsync(operation);
  341. return (T)result.Result;
  342. }
  343. public async Task<T> Update<T>(TableEntity entity) where T : TableEntity, new()
  344. {
  345. string TableName = await InitializeTable<T>();
  346. TableOperation operation = TableOperation.Replace(entity);
  347. TableResult result = await tableClient.GetTableReference(TableName).ExecuteAsync(operation);
  348. return (T)result.Result;
  349. }
  350. public async Task<T> Delete<T>(TableEntity entity) where T : TableEntity, new()
  351. {
  352. string TableName = await InitializeTable<T>();
  353. TableOperation operation = TableOperation.Delete(entity);
  354. TableResult result = await tableClient.GetTableReference(TableName).ExecuteAsync(operation);
  355. return (T)result.Result;
  356. }
  357. //public async Task<List<T>> FindListByDictAndLike<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict) where T : TableEntity, new()
  358. //{
  359. // throw new NotImplementedException();
  360. //}
  361. //public async Task<List<T>> FindListByDictAndLikeAndNotEQ<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict, Dictionary<string, object> notEQDict) where T : TableEntity, new()
  362. //{
  363. // throw new NotImplementedException();
  364. //}
  365. //public async Task<List<T>> FindListByDictAndLikeAndStartWith<T>(Dictionary<string, object> dict, Dictionary<string, object> likeDict, Dictionary<string, object> startDict) where T : TableEntity, new()
  366. //{
  367. // throw new NotImplementedException();
  368. //}
  369. public async Task<AzurePagination<T>> FindListByDict<T>(Dictionary<string, object> dict, AzureTableToken azureTableToken) where T : TableEntity, new() {
  370. string TableName = await InitializeTable<T>();
  371. var exQuery = new TableQuery<T>();
  372. StringBuilder builder = new StringBuilder();
  373. if (null != dict && dict.Count > 0)
  374. {
  375. var keys = dict.Keys;
  376. int index = 1;
  377. foreach (string key in keys)
  378. {
  379. if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString()))
  380. {
  381. if (index == 1)
  382. {
  383. builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  384. }
  385. else
  386. {
  387. builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString()));
  388. }
  389. index++;
  390. }
  391. }
  392. exQuery.Where(builder.ToString());
  393. return await QueryList<T>(azureTableToken, exQuery , TableName);
  394. }
  395. else {
  396. return null;
  397. }
  398. }
  399. private async Task<AzurePagination<T>> QueryList<T>(AzureTableToken azureTableToken, TableQuery<T> exQuery,string TableName) where T : TableEntity, new()
  400. {
  401. TableContinuationToken tableToken = new HaBookTableContinuationToken(azureTableToken).GetContinuationToken();
  402. List<T> entitys = new List<T>();
  403. var result = await tableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, tableToken);
  404. if (result.Results.Count > 0)
  405. {
  406. entitys.AddRange(result.ToList());
  407. }
  408. tableToken = result.ContinuationToken;
  409. AzurePagination<T> pagination = new AzurePagination<T>
  410. {
  411. token = new HaBookTableContinuationToken(tableToken).GetAzureTableToken(),
  412. data = entitys
  413. };
  414. return pagination;
  415. }
  416. }
  417. }