using TEAMModelOS.SDK.Module.AzureTable.Configuration; using TEAMModelOS.SDK.Module.AzureTable.Interfaces; using TEAMModelOS.SDK; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using TEAMModelOS.SDK.Helper.Security.AESCrypt; using TEAMModelOS.SDK.Context.Exception; using System.Reflection; using TEAMModelOS.SDK.Context.Attributes.Azure; using TEAMModelOS.SDK.Helper.Common.CollectionHelper; using TEAMModelOS.SDK.Context.Configuration; using Microsoft.Extensions.Configuration; using Microsoft.Azure.Cosmos.Table; namespace TEAMModelOS.SDK.Module.AzureTable.Implements { //Linq全部查詢有1000限制 /* public async Task> GetByPartitionKey(string partitionKey) { var theQuery = table.CreateQuery().Where(ent => ent.PartitionKey == partitionKey); TableQuerySegment querySegment = null; var returnList = new List(); while (querySegment == null || querySegment.ContinuationToken != null) { querySegment = await theQuery.AsTableQuery() .ExecuteSegmentedAsync(querySegment != null ? querySegment.ContinuationToken : null); returnList.AddRange(querySegment); } return returnList; //Linq全部查詢沒有1000限制(要驗證) public Task> GetByPartitionKey(string partitionKey) { return Task.Run(() => table.CreateQuery() .Where(ent => ent.PartitionKey == partitionKey) .ToList()); } }*/ /// /// 带处理问题 /// 1.使用 Microsoft.Azure.Cosmos.Table /// 2.AddSingleton 可能不適合CloudTableClient 线程不安全 /// 3.能否使用 Efcore /// 4.有CreateQuery可以用,也有AsTableQuery轉換 還有Microsoft.Azure.Cosmos.Table.Queryable 支持 Linq lambda語法 /// 5. 另外可以驗證一下最低查詢投影處理Count時 var query = from entity in table.CreateQuery(tableName) select new { entity.PartitionKey }; /// /// 另外你有優化的話,順便驗證一下 /// public class AzureTableDBRepository : IAzureTableDBRepository { private CloudTableClient CloudTableClient { get; set; } private CloudTable cloudTable { get; set; } private AzureTableOptions options { get; set; } public AzureTableDBRepository() { options = BaseConfigModel.Configuration.GetSection("Azure:Table").Get(); CloudTableClient = CloudStorageAccount.Parse(options.ConnectionString).CreateCloudTableClient(); } private string GetTableSpace() { Type type = typeof(T); string Name = type.Name; object[] attributes = type.GetCustomAttributes(true); foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例, { if (attribute is TableNameAttribute tableSpace) { Name = tableSpace.Name; } } return Name; } private async Task InitializeTable() { string TableName = GetTableSpace(); if (cloudTable == null || !cloudTable.Name.Equals(TableName)) { cloudTable = CloudTableClient.GetTableReference(TableName); await cloudTable.CreateIfNotExistsAsync(); } return TableName; } //Linq全部查詢有1000限制 //public async Task> GetByPartitionKey(string partitionKey) where T : TableEntity, new() //{ // var theQuery = cloudTable.CreateQuery().Where(ent => ent.PartitionKey == partitionKey); // TableQuerySegment querySegment = null; // var returnList = new List(); // while (querySegment == null || querySegment.ContinuationToken != null) // { // querySegment = await theQuery.AsTableQuery() // .ExecuteSegmentedAsync(querySegment != null ? // querySegment.ContinuationToken : null); // returnList.AddRange(querySegment); // } // return returnList; //} //Linq全部查詢沒有1000限制(要驗證) /// /// Linq全部查詢沒有1000限制(要驗證) /// 就是最低消耗查詢處理Count var query = from entity in table.CreateQuery(tableName) select new { entity.PartitionKey }; /// /// /// /// public Task> GetByPartitionKey(string partitionKey) where T : TableEntity, new() { return Task.Run(() => cloudTable.CreateQuery() .Where(ent => ent.PartitionKey == partitionKey) .ToList()); } public async Task> FindAll() where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); return await QueryList(exQuery, TableName); } private async Task> QueryList(TableQuery exQuery, string TableName) where T : TableEntity, new() { TableContinuationToken continuationToken = null; List entitys = new List(); do { var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken); if (result.Results.Count > 0) { entitys.AddRange(result.ToList()); } continuationToken = result.ContinuationToken; } while (continuationToken != null); return entitys; } private async Task QueryObject(TableQuery exQuery, string TableName) where T : TableEntity, new() { TableContinuationToken continuationToken = null; T entity = new T(); do { var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken); if (result.Results.Count > 0) { entity= result.ToList()[0]; break; } continuationToken = result.ContinuationToken; } while (continuationToken != null); return entity; } public async Task Count(TableContinuationToken continuationToken) where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); List entitys = new List(); do { var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken); if (result.Results.Count > 0) { entitys.AddRange(result.ToList()); } continuationToken = result.ContinuationToken; } while (continuationToken != null); return entitys.Count; } public async Task Count() where T : TableEntity, new() { string TableName = await InitializeTable(); TableContinuationToken continuationToken = null; var exQuery = new TableQuery(); List entitys = new List(); do { var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, continuationToken); if (result.Results.Count > 0) { entitys.AddRange(result.ToList()); } continuationToken = result.ContinuationToken; } while (continuationToken != null); return entitys.Count; } public async Task FindByRowKey(string id) where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); if (!string.IsNullOrEmpty(id)) { string typeStr = SwitchType(id, "RowKey"); if (string.IsNullOrEmpty(typeStr)) { return null; } exQuery.Where(typeStr); // exQuery.Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, id)); return await QueryObject(exQuery, TableName); } else { return null; } } public async Task> FindListByDict(Dictionary dict) where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); StringBuilder builder = new StringBuilder(); if (null != dict && dict.Count > 0) { var keys = dict.Keys; int index = 1; foreach (string key in keys) { if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString())) { string typeStr = SwitchType(dict[key], key); if (string.IsNullOrEmpty(typeStr)) { continue; } if (index == 1) { //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString())); builder.Append(typeStr); } else { //builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString())); builder.Append(" " + TableOperators.And + " " + typeStr); } index++; } else { throw new Exception("The parameter must have value!"); } } exQuery.Where(builder.ToString()); return await QueryList(exQuery, TableName); } else { return null; } } public async Task> FindListByKey(string key, object value) where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); if (!string.IsNullOrEmpty(key) && value != null && !string.IsNullOrEmpty(value.ToString())) { string typeStr = SwitchType(value, key); if (string.IsNullOrEmpty(typeStr)) { return null; } exQuery.Where(typeStr); //exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, value)); return await QueryList(exQuery, TableName); } else { return null; } } public async Task FindOneByDict(IDictionary dict) where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); StringBuilder builder = new StringBuilder(); if (null != dict && dict.Count > 0) { var keys = dict.Keys; int index = 1; foreach (string key in keys) { if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString())) { string typeStr = SwitchType(dict[key], key); if (string.IsNullOrEmpty(typeStr)) { continue; } if (index == 1) { //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString())); builder.Append(typeStr); } else { // builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString())); builder.Append(" " + TableOperators.And + " " + typeStr); } index++; } else { throw new Exception("The parameter must have value!"); } } exQuery.Where(builder.ToString()); return await QueryObject(exQuery, TableName); } else { return null; } } private static string SwitchType(object obj, string key) { Type objType = typeof(T); PropertyInfo property = objType.GetProperty(key); //Type s = obj.GetType(); //TypeCode typeCode = Type.GetTypeCode(s); if (property == null) { //return null; throw new Exception(objType.FullName + " PropertyInfo doesn't include this parameter :" + key); } TypeCode typeCode = Type.GetTypeCode(property.PropertyType); switch (typeCode) { case TypeCode.String: return TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, obj.ToString()); case TypeCode.Int32: return TableQuery.GenerateFilterConditionForInt(key, QueryComparisons.Equal, int.Parse(obj.ToString())); case TypeCode.Double: return TableQuery.GenerateFilterConditionForDouble(key, QueryComparisons.Equal, (double)obj); case TypeCode.Byte: return TableQuery.GenerateFilterConditionForBinary(key, QueryComparisons.Equal, (byte[])obj); case TypeCode.Boolean: return TableQuery.GenerateFilterConditionForBool(key, QueryComparisons.Equal, (bool)obj); case TypeCode.DateTime: return TableQuery.GenerateFilterConditionForDate(key, QueryComparisons.Equal, (DateTimeOffset)obj); case TypeCode.Int64: return TableQuery.GenerateFilterConditionForLong(key, QueryComparisons.Equal, long.Parse(obj.ToString())); default: return null; } } public async Task FindOneByKey(string key, object value) where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); if (!string.IsNullOrEmpty(key) && value != null && !string.IsNullOrEmpty(value.ToString())) { string typeStr = SwitchType(value, key); if (string.IsNullOrEmpty(typeStr)) { return null; } exQuery.Where(typeStr); //exQuery.Where(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, // value)); return await QueryObject(exQuery, TableName); } else { return null; } } public async Task> GetEntities(IDictionary dict) where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); StringBuilder builder = new StringBuilder(); if (null != dict && dict.Count > 0) { var keys = dict.Keys; int index = 1; foreach (string key in keys) { if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString())) { string typeStr = SwitchType(dict, key); if (string.IsNullOrEmpty(typeStr)) { continue; } if (index == 1) { //builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString())); builder.Append(typeStr); } else { // builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString())); builder.Append(" " + TableOperators.And + " " + typeStr); } index++; } else { throw new Exception("The parameter must have value!"); } } exQuery.Where(builder.ToString()); return await QueryList(exQuery, TableName); } else { return null; } } public async Task> SaveAll(List entitys) where T : TableEntity, new() { if (entitys.IsEmpty()) { return null; } string TableName = await InitializeTable(); IList>> listInfo = new List>>(); foreach (IGrouping group in entitys.GroupBy(c => c.PartitionKey)) { Dictionary> dictInfo = new Dictionary> { { group.Key, group.ToList() } }; listInfo.Add(dictInfo); } foreach (Dictionary> dict in listInfo) { IList result = null; foreach (string key in dict.Keys) { List values = dict[key]; //Parallel.ForEach(Partitioner.Create(0, values.Count, 100), // async range => // { // TableBatchOperation batchOperation = new TableBatchOperation(); // for (Int32 i = range.Item1; i < range.Item2; i++) // batchOperation.Insert(values[i]); // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation); // }); int pageSize = 100; int pages = (int)Math.Ceiling((double)values.Count / pageSize); for (int i = 0; i < pages; i++) { List lists = values.Skip((i) * pageSize).Take(pageSize).ToList(); TableBatchOperation batchOperation = new TableBatchOperation(); for (int j = 0; j < lists.Count; j++) { batchOperation.Insert(lists[j]); } result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation); } } } return entitys; } public async Task> UpdateAll(List entitys) where T : TableEntity, new() { if (entitys.IsEmpty()) { return null; } string TableName = await InitializeTable(); IList>> listInfo = new List>>(); foreach (IGrouping group in entitys.GroupBy(c => c.PartitionKey)) { Dictionary> dictInfo = new Dictionary> { { group.Key, group.ToList() } }; listInfo.Add(dictInfo); } foreach (Dictionary> dict in listInfo) { IList result = null; foreach (string key in dict.Keys) { List values = dict[key]; //Parallel.ForEach(Partitioner.Create(0, values.Count, 100), // async range => // { // TableBatchOperation batchOperation = new TableBatchOperation(); // for (Int32 i = range.Item1; i < range.Item2; i++) // batchOperation.Replace(values[i]); // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation); // }); int pageSize = 100; int pages = (int)Math.Ceiling((double)values.Count / pageSize); for (int i = 0; i < pages; i++) { List lists = values.Skip((i) * pageSize).Take(pageSize).ToList(); TableBatchOperation batchOperation = new TableBatchOperation(); for (int j = 0; j < lists.Count; j++) { batchOperation.Replace(lists[j]); } result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation); } } } return entitys; } public async Task> SaveOrUpdateAll(List entitys) where T : TableEntity, new() { if (entitys.IsEmpty()) { return null; } string TableName = await InitializeTable(); IList>> listInfo = new List>>(); foreach (IGrouping group in entitys.GroupBy(c => c.PartitionKey)) { Dictionary> dictInfo = new Dictionary> { { group.Key, group.ToList() } }; listInfo.Add(dictInfo); } foreach (Dictionary> dict in listInfo) { IList result = null; foreach (string key in dict.Keys) { List values = dict[key]; //Parallel.ForEach(Partitioner.Create(0, values.Count, 50), // async range => // { // TableBatchOperation batchOperation = new TableBatchOperation(); // for (Int32 i = range.Item1; i < range.Item2; i++) // batchOperation.InsertOrReplace(values[i]); // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation); // }); int pageSize = 100; int pages = (int)Math.Ceiling((double)values.Count / pageSize); for (int i = 0; i < pages; i++) { List lists = values.Skip((i) * pageSize).Take(pageSize).ToList(); TableBatchOperation batchOperation = new TableBatchOperation(); for (int j = 0; j < lists.Count; j++) { batchOperation.InsertOrReplace(lists[j]); } result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation); } } } return entitys; } public async Task> DeleteAll(List entitys) where T : TableEntity, new() { if (entitys.IsEmpty()) { return null; } string TableName = await InitializeTable(); IList>> listInfo = new List>>(); foreach (IGrouping group in entitys.GroupBy(c => c.PartitionKey)) { Dictionary> dictInfo = new Dictionary> { { group.Key, group.ToList() } }; listInfo.Add(dictInfo); } foreach (Dictionary> dict in listInfo) { IList result = null; foreach (string key in dict.Keys) { List values = dict[key]; //Parallel.ForEach(Partitioner.Create(0, values.Count, 100), // async range => // { // TableBatchOperation batchOperation = new TableBatchOperation(); // for (Int32 i = range.Item1; i < range.Item2; i++) // batchOperation.Delete(values[i]); // result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation); // }); int pageSize = 100; int pages = (int)Math.Ceiling((double)values.Count / pageSize); for (int i = 0; i < pages; i++) { List lists = values.Skip((i) * pageSize).Take(pageSize).ToList(); TableBatchOperation batchOperation = new TableBatchOperation(); for (int j = 0; j < lists.Count; j++) { batchOperation.Delete(lists[j]); } result = await CloudTableClient.GetTableReference(TableName).ExecuteBatchAsync(batchOperation); } } } return entitys; } public async Task Save(TableEntity entity) where T : TableEntity, new() { string TableName = await InitializeTable(); TableOperation operation = TableOperation.Insert(entity); TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation); return (T)result.Result; } public async Task SaveOrUpdate(TableEntity entity) where T : TableEntity, new() { string TableName = await InitializeTable(); TableOperation operation = TableOperation.InsertOrReplace(entity); TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation); return (T)result.Result; } public async Task Update(TableEntity entity) where T : TableEntity, new() { string TableName = await InitializeTable(); TableOperation operation = TableOperation.Replace(entity); TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation); return (T)result.Result; } public async Task Delete(TableEntity entity) where T : TableEntity, new() { string TableName = await InitializeTable(); TableOperation operation = TableOperation.Delete(entity); TableResult result = await CloudTableClient.GetTableReference(TableName).ExecuteAsync(operation); return (T)result.Result; } //public async Task> FindListByDictAndLike(Dictionary dict, Dictionary likeDict) where T : TableEntity, new() //{ // throw new NotImplementedException(); //} //public async Task> FindListByDictAndLikeAndNotEQ(Dictionary dict, Dictionary likeDict, Dictionary notEQDict) where T : TableEntity, new() //{ // throw new NotImplementedException(); //} //public async Task> FindListByDictAndLikeAndStartWith(Dictionary dict, Dictionary likeDict, Dictionary startDict) where T : TableEntity, new() //{ // throw new NotImplementedException(); //} public async Task> FindListByDict(Dictionary dict, AzureTableToken azureTableToken) where T : TableEntity, new() { string TableName = await InitializeTable(); var exQuery = new TableQuery(); StringBuilder builder = new StringBuilder(); if (null != dict && dict.Count > 0) { var keys = dict.Keys; int index = 1; foreach (string key in keys) { if (dict[key] != null && !string.IsNullOrEmpty(dict[key].ToString())) { string typeStr = SwitchType(dict, key); if (string.IsNullOrEmpty(typeStr)) { continue; } if (index == 1) { // builder.Append(TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString())); builder.Append(typeStr); } else { builder.Append(" " + TableOperators.And + " " + typeStr); //builder.Append(" " + TableOperators.And + " " + TableQuery.GenerateFilterCondition(key, QueryComparisons.Equal, dict[key].ToString())); } index++; } else { throw new Exception("The parameter must have value!"); } } exQuery.Where(builder.ToString()); return await QueryList(azureTableToken, exQuery, TableName); } else { return null; } } private async Task> QueryList(AzureTableToken azureTableToken, TableQuery exQuery, string TableName) where T : TableEntity, new() { TableContinuationToken tableToken = new HaBookTableContinuationToken(azureTableToken).GetContinuationToken(); List entitys = new List(); var result = await CloudTableClient.GetTableReference(TableName).ExecuteQuerySegmentedAsync(exQuery, tableToken); if (result.Results.Count > 0) { entitys.AddRange(result.ToList()); } tableToken = result.ContinuationToken; AzurePagination pagination = new AzurePagination { token = new HaBookTableContinuationToken(tableToken).GetAzureTableToken(), data = entitys }; return pagination; } } }