using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using TEAMModelOS.SDK.Context.Attributes.Azure;
using TEAMModelOS.SDK.Context.Exception;
using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
{
internal class CosmosModelInfo {
public Container container{ get; set; }
public bool cache { get; set; }
}
public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
{
private CosmosClient CosmosClient { get; set; }
///
/// 线程安全的dict类型
///
private Dictionary DocumentCollectionDict { get; set; } = new Dictionary();
private string DatabaseId { get; set; }
private int CollectionThroughput { get; set; }
private Database database { get; set; }
int pageSize = 200;
private const string CacheCosmosPrefix = "cosmos:";
private string[] ScanModel { get; set; }
private const int timeoutSeconds = 86400;
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
DatabaseId = options.Database;
CollectionThroughput = options.CollectionThroughput;
ScanModel = options.ScanModel;
// InitializeDatabase().GetAwaiter().GetResult();
}
catch (CosmosException e)
{
// Dispose(true);
throw new BizException(e.Message, 500, e.StackTrace);
}
}
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
DatabaseId = options.Database;
CollectionThroughput = options.CollectionThroughput;
ScanModel = options.ScanModel;
// InitializeDatabase().GetAwaiter().GetResult();
}
catch (CosmosException e)
{
// Dispose(true);
throw new BizException(e.Message, 500, e.StackTrace);
}
}
public async Task InitializeDatabase()
{
try
{
database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
FeedIterator resultSetIterator = database.GetContainerQueryIterator();
while (resultSetIterator.HasMoreResults)
{
foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
{
DocumentCollectionDict.TryAdd(container.Id, new CosmosModelInfo { container= database.GetContainer(container.Id) ,cache=false});
}
}
//获取数据库所有的表
List types = ReflectorExtensions.GetAllTypeAsAttribute(ScanModel);
foreach (Type type in types)
{
string PartitionKey = GetPartitionKey(type);
string CollectionName = "";
int RU = 0;
bool cache = false;
IEnumerable attributes = type.GetCustomAttributes(true);
if (!string.IsNullOrEmpty(attributes.First().Name))
{
CollectionName = attributes.First().Name;
}
else
{
CollectionName = type.Name;
}
if ( attributes.First().Cache)
{
cache = attributes.First().Cache;
}
//else
//{
// cache = false;
//}
if (attributes.First().RU > 400)
{
RU = attributes.First().RU;
}
else
{
RU = CollectionThroughput;
}
//如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
{ //更新RU
cosmosModelInfo.cache = cache;
int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReadThroughputAsync();
if (throughputResponse < RU)
{
await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
}
}
else
{
ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
if (!string.IsNullOrEmpty(PartitionKey))
{
containerProperties.PartitionKeyPath = "/" + PartitionKey;
}
if (RU > CollectionThroughput)
{
CollectionThroughput = RU;
}
Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
DocumentCollectionDict.TryAdd(CollectionName, new CosmosModelInfo { container= containerWithConsistentIndexing ,cache=cache});
}
}
}
catch (CosmosException e)
{
throw new BizException(e.Message, 500, e.StackTrace);
}
}
private string GetPartitionKey()
{
Type type = typeof(T);
return GetPartitionKey(type);
}
private string GetPartitionKey(Type type)
{
PropertyInfo[] properties = type.GetProperties();
List attrProperties = new List();
foreach (PropertyInfo property in properties)
{
if (property.Name.Equals("PartitionKey"))
{
attrProperties.Add(property);
break;
}
object[] attributes = property.GetCustomAttributes(true);
foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
{
if (attribute is PartitionKeyAttribute)
{
attrProperties.Add(property);
}
}
}
if (attrProperties.Count <= 0)
{
throw new BizException(type.Name + "has no PartitionKey !");
}
else
{
if (attrProperties.Count == 1)
{
return attrProperties[0].Name;
}
else { throw new BizException("PartitionKey can only be single!"); }
}
}
private async Task InitializeCollection()
{
Type type = typeof(T);
string partitionKey = GetPartitionKey();
string CollectionName;
IEnumerable attributes = type.GetCustomAttributes(true);
if (!string.IsNullOrEmpty(attributes.First().Name))
{
CollectionName = attributes.First().Name;
}
else
{
CollectionName = type.Name;
}
return await InitializeCollection(CollectionName, partitionKey);
}
private async Task InitializeCollection(string CollectionName, string PartitionKey)
{
/////内存中已经存在这个表则直接返回
if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
{
return cosmosModelInfo;
}///如果没有则尝试默认创建
else
{
ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
if (!string.IsNullOrEmpty(PartitionKey))
{
containerProperties.PartitionKeyPath = "/" + PartitionKey;
}
Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = false };
DocumentCollectionDict.TryAdd(CollectionName, cosmosModel);
return cosmosModel;
}
}
public async Task> DeleteAll(List> ids) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
//string partitionKey = GetPartitionKey();
//await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(DeleteAsync(item.Value, item.Key));
//}));
List idPks = new List();
int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
List tasks = new List(lists.Count);
lists.ForEach(item =>
{
tasks.Add(container.container.DeleteItemStreamAsync(item.Value, new PartitionKey(item.Key))
.ContinueWith((Task task) =>
{
using (ResponseMessage response = task.Result)
{
idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
// if (!response.IsSuccessStatusCode)
// {
// }
}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id,x.Value );
});
}
}
stopwatch.Stop();
return idPks;
}
public async Task> DeleteAll(Dictionary dict) where T : ID
{
List list= await FindByDict(dict);
return await DeleteAll(list);
}
public async Task> DeleteAll(List enyites) where T : ID
{
List idPks = new List();
CosmosModelInfo container = await InitializeCollection();
string pk = GetPartitionKey();
Type type = typeof(T);
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(x =>
{
object o = type.GetProperty(pk).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), x.id);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
using (ResponseMessage response = task.Result)
{
idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
}
}
));
});
await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
});
}
}
stopwatch.Stop();
return idPks;
}
public async Task DeleteAsync(IdPk idPk) where T : ID {
return await DeleteAsync(idPk.id, idPk.pk);
}
public async Task DeleteAsync(string id, string pk) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
if (container.cache && RedisHelper.Instance != null)
{
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
}
return new IdPk { id =id, pk = pk, StatusCode = response.StatusCode };
}
public async Task DeleteAsync(T entity) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
string partitionKey = GetPartitionKey();
Type type = typeof(T);
object o = type.GetProperty(partitionKey).GetValue(entity, null);
ResponseMessage response = await container.container.DeleteItemStreamAsync (id: entity.id, partitionKey: new PartitionKey(o.ToString()));
if (container.cache && RedisHelper.Instance != null)
{
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
}
return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
}
//public async Task DeleteAsync(string id) where T : ID
//{
// Container container = await InitializeCollection();
// ItemResponse response = await container.DeleteItemAsync(id: id, partitionKey: new PartitionKey(GetPartitionKey()));
// return response.Resource;
//}
public async Task> FindAll(List propertys = null) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
StringBuilder sql;
sql = SQLHelperParametric.GetSQLSelect(propertys);
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery {QueryText = sql.ToString() };
FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
return await ResultsFromFeedIterator(query);
}
private async Task> ResultsFromFeedIterator(FeedIterator query, int? maxItemCount = null)
{
List results = new List();
while (query.HasMoreResults)
{
foreach (T t in await query.ReadNextAsync())
{
results.Add(t);
if (results.Count == maxItemCount)
{
return results;
}
}
}
return results;
}
private async Task> ResultsFromFeedIterator(FeedIterator query, Func, Task> batchAction, int itemsPerPage)
{
List results = new List();
while (query.HasMoreResults)
{
if (results.Count() >= itemsPerPage)
{
await batchAction(results);
results.Clear();
}
results.AddRange(await query.ReadNextAsync());
}
if (results.Count() > 0)
{
await batchAction(results);
results.Clear();
}
return results;
}
public async Task> FindByDict(string CollectionName, Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List propertys = null)
{
if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
{
//StringBuilder sql = new StringBuilder("select value(c) from c");
//SQLHelper.GetSQL(dict, ref sql);
//CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
//{
// QueryText = sql.ToString()
//};
StringBuilder sql;
sql = SQLHelperParametric.GetSQLSelect(propertys);
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindCountByDict(string CollectionName, Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
{
if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
{
dict.Remove("@CURRPAGE");
dict.Remove("@PAGESIZE");
dict.Remove("@ASC");
dict.Remove("@DESC");
StringBuilder sql = new StringBuilder("select value count(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindByParams(Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List propertys = null) where T : ID
{
return await FindByDict(dict, itemsPerPage, maxItemCount, partitionKey, propertys);
}
public async Task> FindByDict(Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null,List propertys = null) where T : ID
{
StringBuilder sql;
sql = SQLHelperParametric.GetSQLSelect(propertys);
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
return await ResultsFromQueryAndOptions(cosmosDbQuery, queryRequestOptions);
}
private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
{
CosmosModelInfo container = await InitializeCollection();
FeedIterator query = container.container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
{
return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
}
private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
int? maxBufferedItemCount = null,
int? maxConcurrency = null)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
MaxBufferedItemCount = maxBufferedItemCount ?? 100,
MaxConcurrency = maxConcurrency ?? 50
};
return queryRequestOptions;
}
private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, Func, Task> batchAction, QueryRequestOptions queryOptions)
{
CosmosModelInfo container = await InitializeCollection();
FeedIterator query = container.container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
}
private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage
};
return queryRequestOptions;
}
public async Task> FindLinq(Expression> query = null, Expression> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
{
//QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator feedIterator;
CosmosModelInfo container = await InitializeCollection();
if (query == null)
{
if (order != null)
{
if (isDesc)
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderByDescending(order)
.ToFeedIterator();
}
else
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderBy(order)
.ToFeedIterator();
}
}
else
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.ToFeedIterator();
}
}
else
{
if (order != null)
{
if (isDesc)
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query).OrderByDescending(order)
.ToFeedIterator();
}
else
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query).OrderBy(order)
.ToFeedIterator();
}
}
else
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query)
.ToFeedIterator();
}
}
return await ResultsFromFeedIterator(feedIterator);
}
public async Task> FindSQL(string sql, Dictionary Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
if (Parameters != null)
{
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = sql,
Parameters = Parameters
};
FeedIterator feedIterator = container.container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
return await ResultsFromFeedIterator(feedIterator);
}
else
{
QueryDefinition queryDefinition = new QueryDefinition(sql);
return await ResultsFromFeedIterator(container.container.GetItemQueryIterator(queryDefinition));
}
}
public async Task Save(T entity) where T : ID
{
try
{
CosmosModelInfo container = await InitializeCollection();
ItemResponse response = await container.container.CreateItemAsync(entity);
if (container.cache && RedisHelper.Instance!=null) {
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Resource;
}
catch (Exception e)
{
throw new BizException(e.Message);
}
}
public async Task> SaveAll(List enyites) where T : ID
{
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
CosmosModelInfo container = await InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string pk = GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(async x =>
{
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x ,new JsonSerializerOptions { IgnoreNullValues=true});
object o = type.GetProperty(pk).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
}
}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix+container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null&&!flag) {
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public async Task SaveOrUpdate(T entity) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
ItemResponse response = await container.container.UpsertItemAsync(item: entity);
if (container.cache && RedisHelper.Instance != null)
{
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Resource;
}
public async Task> SaveOrUpdateAll(List enyites) where T : ID
{
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Update(item));
//}));
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
CosmosModelInfo container = await InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string pk = GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(async x =>
{
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(pk).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null&&!flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public async Task Update(T entity) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
string pk = GetPartitionKey();
object o = typeof(T).GetProperty(pk).GetValue(entity, null);
ItemResponse response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
if (container.cache && RedisHelper.Instance != null)
{
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Resource;
}
internal class Item
{
public string id { get; set; }
public string pk { get; set; }
public MemoryStream stream { get; set; }
}
public async Task> UpdateAll(List enyites) where T : ID
{
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Update(item));
//}));
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
CosmosModelInfo container = await InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string pk = GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List- itemsToInsert = new List
- ();
lists.ForEach(async x =>
{
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(pk).GetValue(x, null);
Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null&&!flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
//public void Dispose()
//{
// Dispose(true);
//}
//protected virtual void Dispose(bool disposing)
//{
// if (disposing)
// {
// CosmosClient?.Dispose();
// }
//}
private async Task FindByIdAsSql(string id) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = @"SELECT *
FROM c
WHERE c.id = @id",
Parameters = new Dictionary
{
{ "@id",id}
}
};
FeedIterator feedIterator = container.container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition);
return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
}
public async Task FindByIdPk(string id, string pk) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
ItemResponse response = await container.container.ReadItemAsync(id: id, partitionKey: new PartitionKey(pk));
return response.Resource;
}
public async Task FindById(string id) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
if (container.cache && RedisHelper.Instance != null)
{
return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id , id , timeoutSeconds, () => { return FindByIdAsSql(id); }) ;
}
else {
return await FindByIdAsSql(id);
}
}
public async Task
> FindByIds(List ids) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
if (container.cache && RedisHelper.Instance != null)
{
List list = new List();
List NotIn = new List();
foreach (string id in ids) {
if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
{
NotIn.Add(id);
}
else {
list.Add(await RedisHelper.HGetAsync(CacheCosmosPrefix + container.container.Id, id));
}
}
if (NotIn.IsNotEmpty()) {
List noInList = await FindByDict(new Dictionary { { "id", NotIn.ToArray() } });
noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
list.AddRange(noInList);
}
return list;
}
else
{
return await FindByDict(new Dictionary { { "id", ids.ToArray() } });
}
}
public async Task FindById(string CollectionName, string id)
{
if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
{
if (container.cache && RedisHelper.Instance != null)
{
return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id,id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary { { "id", id } }); });
}
else
{
return await FindByDict(CollectionName, new Dictionary { { "id", id } });
}
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindByIds(string CollectionName, List ids)
{
if (DocumentCollectionDict.TryGetValue(CollectionName, out CosmosModelInfo container))
{
if (container.cache && RedisHelper.Instance != null)
{
return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds, () => { return FindByDict(CollectionName, new Dictionary { { "id", ids.ToArray() } }); });
}
else
{
return await FindByDict(CollectionName, new Dictionary { { "id", ids.ToArray() } });
}
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
}
}