using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Linq;
using OpenXmlPowerTools;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using TEAMModelOS.SDK.Context.Attributes.Azure;
using TEAMModelOS.SDK.Context.Exception;
using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
{
public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
{
private CosmosClient CosmosClient { get; set; }
public CosmosDict CosmosDict { get; set; } = new CosmosDict();
///
/// 数据库名
///
private string DatabaseId { get; set; }
///
/// RU
///
private int CollectionThroughput { get; set; }
///
/// 数据库对象
///
private Database database { get; set; }
///
/// 分页大小
///
int pageSize = 200;
///
/// 缓存前缀
///
private const string CacheCosmosPrefix = "cosmos:";
///
/// 扫描类
///
private string[] ScanModel { get; set; }
///
/// 超时时间
///
private const int timeoutSeconds = 86400;
///
/// ttl时长 1秒
///
private const int ttl = 1;
///
/// 更新源通知容器
///
private string leaseId = "AleaseContainer";
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
DatabaseId = options.Database;
CollectionThroughput = options.CollectionThroughput;
ScanModel = options.ScanModel;
}
catch (CosmosException e)
{
throw new BizException(e.Message, 500, e.StackTrace);
}
}
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
DatabaseId = options.Database;
CollectionThroughput = options.CollectionThroughput;
ScanModel = options.ScanModel;
}
catch (CosmosException e)
{
throw new BizException(e.Message, 500, e.StackTrace);
}
}
///
/// 初始化CosmosDB数据库
///
///
public async Task InitializeDatabase() {
database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
FeedIterator resultSetIterator = database.GetContainerQueryIterator();
while (resultSetIterator.HasMoreResults)
{
foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
{
CosmosDict.nameCosmos.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), cache = false, monitor = false });
}
}
bool isMonitor = false;
//获取数据库所有的表
List types = ReflectorExtensions.GetAllTypeAsAttribute(ScanModel);
foreach (Type type in types)
{
string PartitionKey = GetPartitionKey(type);
string CollectionName = "";
int RU = 0;
bool cache = false;
bool monitor = false;
IEnumerable attributes = type.GetCustomAttributes(true);
if (attributes != null && !string.IsNullOrEmpty(attributes.First().Name))
{
CollectionName = attributes.First().Name;
}
else {
throw new BizException("必须指定容器名",ResponseCode.PARAMS_ERROR);
}
if (attributes.First().Cache)
{
cache = attributes.First().Cache;
}
if (attributes.First().Monitor)
{
monitor = attributes.First().Monitor;
if (monitor)
{
isMonitor = true;
}
}
if (attributes.First().RU > 400)
{
RU = attributes.First().RU;
}
else
{
RU = CollectionThroughput;
}
//如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
if (CosmosDict.nameCosmos.TryGetValue(CollectionName, out CosmosModelInfo cosmosModelInfo))
{ //更新RU
cosmosModelInfo.cache = cache;
Container container = CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id);
int? throughputResponse = await container.ReadThroughputAsync();
if (throughputResponse < RU)
{
await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
}
CosmosModelInfo cosmos = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type };
CosmosDict.nameCosmos[CollectionName] = cosmos;
CosmosDict.typeCosmos.Add(type.Name, cosmos);
}
else {
ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName, DefaultTimeToLive = -1 };
if (!string.IsNullOrEmpty(PartitionKey))
{
containerProperties.PartitionKeyPath = "/" + PartitionKey;
}
if (RU > CollectionThroughput)
{
CollectionThroughput = RU;
}
Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
CosmosModelInfo cosmos = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type };
CosmosDict.nameCosmos[CollectionName] = cosmos;
CosmosDict.typeCosmos.Add(type.Name, cosmos);
}
}
if (isMonitor)
{
ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
CosmosDict.nameCosmos.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
}
return CosmosDict;
}
private string GetPartitionKey()
{
Type type = typeof(T);
return GetPartitionKey(type);
}
private string GetPartitionKey(Type type)
{
PropertyInfo[] properties = type.GetProperties();
List attrProperties = new List();
foreach (PropertyInfo property in properties)
{
if (property.Name.Equals("PartitionKey"))
{
attrProperties.Add(property);
break;
}
object[] attributes = property.GetCustomAttributes(true);
foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
{
if (attribute is PartitionKeyAttribute)
{
attrProperties.Add(property);
}
}
}
if (attrProperties.Count <= 0)
{
throw new BizException(type.Name + " has no PartitionKey !");
}
else
{
if (attrProperties.Count == 1)
{
return attrProperties[0].Name;
}
else { throw new BizException(type.Name+" PartitionKey can only be single!"); }
}
}
private async Task InitializeCollection()
{
Type type = typeof(T);
string partitionKey = GetPartitionKey();
CosmosDBAttribute cosmosDBAttribute = null;
IEnumerable attributes = type.GetCustomAttributes(true);
if (attributes != null && !string.IsNullOrEmpty(attributes.First().Name))
{
cosmosDBAttribute = attributes.First();
}
else {
throw new BizException(type.Name+"未指定CosmosDB表名",ResponseCode.PARAMS_ERROR);
}
return await InitializeCollection(cosmosDBAttribute, type.Name, partitionKey);
}
private async Task InitializeCollection(CosmosDBAttribute cosmosDBAttribute, string typeName, string PartitionKey)
{
/////内存中已经存在这个表则直接返回
if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo cosmosModelInfo))
{
return cosmosModelInfo;
}///如果没有则尝试默认创建
else
{
ContainerProperties containerProperties = new ContainerProperties { Id = cosmosDBAttribute.Name };
if (!string.IsNullOrEmpty(PartitionKey))
{
containerProperties.PartitionKeyPath = "/"+ PartitionKey;
}
Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: cosmosDBAttribute.RU);
CosmosModelInfo cosmosModel = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cosmosDBAttribute.Cache,monitor= cosmosDBAttribute.Monitor };
CosmosDict. nameCosmos.TryAdd(cosmosDBAttribute.Name, cosmosModel);
CosmosDict.typeCosmos.TryAdd(typeName, cosmosModel);
return cosmosModel;
}
}
///
/// 按TTL删除
///
///
///
///
private async Task> DeleteTTL(List list) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
list.ForEach(x => { x.ttl = ttl; });
list = await DeleteTTlALL(list);
if (container.cache && RedisHelper.Instance != null)
{
list.ForEach(async x => {
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
});
}
return list;
}
private async Task> DeleteTTlALL(List enyites) where T : ID
{
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Update(item));
//}));
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
CosmosModelInfo container = await InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey = GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(async x =>
{
x.pk = type.Name;
//x.ttl = null;
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(partitionKey).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public async Task> DeleteAll(List ids) where T : ID
{
// string pk = GetPartitionKey();
CosmosModelInfo container = await InitializeCollection();
List idPks = new List();
if (container.monitor)
{
List list = await FindByDict(new Dictionary() { { "id", ids.Select(x => x.id).ToArray() } });
list = await DeleteTTL(list);
return ids;
}
else
{
int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
List tasks = new List(lists.Count);
lists.ForEach(item =>
{
tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
.ContinueWith((Task task) =>
{
using (ResponseMessage response = task.Result)
{
idPks.Add(new IdPk { id = item.id, pk = item.pk, StatusCode = response.StatusCode });
// if (!response.IsSuccessStatusCode)
// {
// }
}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
});
}
}
stopwatch.Stop();
return idPks;
}
}
public async Task> DeleteAll(Dictionary dict) where T : ID
{
if (dict.Keys.Count > 0)
{
List list = await FindByDict(dict);
return await DeleteAll(list);
}
else
{
throw new BizException("参数为空", 500);
}
}
public async Task> DeleteAll(List enyites) where T : ID
{
Type type = typeof(T);
string pk = GetPartitionKey();
CosmosModelInfo container = await InitializeCollection();
List idPks = new List();
if (container.monitor)
{
enyites = await DeleteTTL(enyites);
foreach (T t in enyites)
{
object o = type.GetProperty(pk).GetValue(t, null);
idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
}
return idPks;
}
else
{
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(x =>
{
object o = type.GetProperty(pk).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), x.id);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
using (ResponseMessage response = task.Result)
{
idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
}
}
));
});
await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
});
}
}
stopwatch.Stop();
return idPks;
}
}
public async Task DeleteAsync(IdPk idPk) where T : ID
{
return await DeleteAsync(idPk.id, idPk.pk);
}
public async Task DeleteAsync(string id, string pk) where T : ID
{
// pk = GetPartitionKey();
CosmosModelInfo container = await InitializeCollection();
if (container.monitor)
{
List list = await FindByDict(new Dictionary() { { "id", id } });
if (list.Count > 0)
{
await DeleteTTL(list);
return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
}
else
{
throw new BizException("未找到ID匹配的数据,删除失败");
}
}
else
{
ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
if (container.cache && RedisHelper.Instance != null)
{
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
}
return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
}
}
public async Task DeleteAsync(T entity) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
string partitionKey = GetPartitionKey();
Type type = typeof(T);
object o = type.GetProperty(partitionKey).GetValue(entity, null);
if (container.monitor)
{
await DeleteTTL(new List() { entity });
return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
}
else
{
ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
if (container.cache && RedisHelper.Instance != null)
{
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
}
return new IdPk { id = entity.id, pk = partitionKey, StatusCode = response.StatusCode };
}
}
public async Task> FindAll(List propertys = null) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
string pk = typeof(T).Name;
StringBuilder sql;
sql = SQLHelperParametric.GetSQLSelect(propertys);
sql.Append(" where c.pk ='"+ pk + "'");
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery { QueryText = sql.ToString() };
FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
return await ResultsFromFeedIterator(query);
}
private async Task> ResultsFromFeedIterator(FeedIterator query, int? maxItemCount = null)
{
List results = new List();
while (query.HasMoreResults)
{
foreach (T t in await query.ReadNextAsync())
{
results.Add(t);
if (results.Count == maxItemCount)
{
return results;
}
}
}
return results;
}
private async Task> ResultsFromFeedIterator(FeedIterator query, Func, Task> batchAction, int itemsPerPage)
{
List results = new List();
while (query.HasMoreResults)
{
if (results.Count() >= itemsPerPage)
{
await batchAction(results);
results.Clear();
}
results.AddRange(await query.ReadNextAsync());
}
if (results.Count() > 0)
{
await batchAction(results);
results.Clear();
}
return results;
}
public async Task> FindByDict(string CollectionName, Dictionary dict, List propertys = null)
{
if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
{
//StringBuilder sql = new StringBuilder("select value(c) from c");
//SQLHelper.GetSQL(dict, ref sql);
//CosmosDbQuery cosmosDbQuery = new CosmosDbQuery, int itemsPerPage = -1, int?
//{
// QueryText = sql.ToString()
//};
string pk = container.type.Name;
StringBuilder sql;
sql = SQLHelperParametric.GetSQLSelect(propertys);
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindCountByDict(Dictionary dict) {
CosmosModelInfo container = await InitializeCollection();
string pk = typeof(T).Name;
StringBuilder sql = new StringBuilder("select value count(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
if (cosmosDbQuery == null)
{
return new List { 0 };
}
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query);
}
public async Task> FindCountByDict(string CollectionName, Dictionary dict)
{
if (CosmosDict.typeCosmos.TryGetValue(CollectionName, out CosmosModelInfo container))
{
string pk = container.type.Name;
dict.Remove("@CURRPAGE");
dict.Remove("@PAGESIZE");
dict.Remove("@ASC");
dict.Remove("@DESC");
StringBuilder sql = new StringBuilder("select value count(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
if (cosmosDbQuery == null) {
return new List {0 };
}
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
FeedIterator query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindByParams(Dictionary dict, List propertys = null) where T : ID
{
return await FindByDict(dict, propertys);
}
public async Task> FindByDict(Dictionary dict, List propertys = null) where T : ID
{
StringBuilder sql;
sql = SQLHelperParametric.GetSQLSelect(propertys);
string pk = typeof(T).Name;
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql, pk);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
return await ResultsFromQueryAndOptions(cosmosDbQuery, queryRequestOptions);
}
private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions)
{
if (cosmosDbQuery == null) {
return null;
}
CosmosModelInfo container = await InitializeCollection();
FeedIterator query = container.container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query);
}
private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
{
return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
}
private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
int? maxBufferedItemCount = null,
int? maxConcurrency = null)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
MaxBufferedItemCount = maxBufferedItemCount ?? 100,
MaxConcurrency = maxConcurrency ?? 50
};
return queryRequestOptions;
}
private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, Func, Task> batchAction, QueryRequestOptions queryOptions)
{
CosmosModelInfo container = await InitializeCollection();
FeedIterator query = container.container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
}
private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage
};
return queryRequestOptions;
}
public async Task> FindLinq(Expression> query = null, Expression> order = null, bool isDesc = false) where T : ID
{
string pk = typeof(T).Name;
query = query.And(x => x.pk == pk);
//QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
FeedIterator feedIterator;
CosmosModelInfo container = await InitializeCollection();
if (query == null)
{
if (order != null)
{
if (isDesc)
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderByDescending(order)
.ToFeedIterator();
}
else
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderBy(order)
.ToFeedIterator();
}
}
else
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.ToFeedIterator();
}
}
else
{
if (order != null)
{
if (isDesc)
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query).OrderByDescending(order)
.ToFeedIterator();
}
else
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query).OrderBy(order)
.ToFeedIterator();
}
}
else
{
feedIterator = container.container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query)
.ToFeedIterator();
}
}
return await ResultsFromFeedIterator(feedIterator);
}
public async Task> FindSQL(string sql, Dictionary Parameters = null) where T : ID
{
if (sql.Contains(".pk"))
{
CosmosModelInfo container = await InitializeCollection();
QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
if (Parameters != null)
{
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = sql,
Parameters = Parameters
};
FeedIterator feedIterator = container.container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
return await ResultsFromFeedIterator(feedIterator);
}
else
{
QueryDefinition queryDefinition = new QueryDefinition(sql);
return await ResultsFromFeedIterator(container.container.GetItemQueryIterator(queryDefinition));
}
}
else
{
throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
}
}
public async Task Save(T entity) where T : ID
{
try
{
CosmosModelInfo container = await InitializeCollection();
entity.pk = container.type.Name;
entity.ttl = null;
ItemResponse response = await container.container.CreateItemAsync(entity);
if (container.cache && RedisHelper.Instance != null)
{
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Resource;
}
catch (Exception e)
{
throw new BizException(e.Message);
}
}
public async Task> SaveAll(List enyites) where T : ID
{
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
CosmosModelInfo container = await InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey = GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(async x =>
{
x.pk = type.Name;
x.ttl = null;
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(partitionKey).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
}
}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public async Task SaveOrUpdate(T entity) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
entity.pk = container.type.Name;
entity.ttl = null;
ItemResponse response = await container.container.UpsertItemAsync(item: entity);
if (container.cache && RedisHelper.Instance != null)
{
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Resource;
}
public async Task> SaveOrUpdateAll(List enyites) where T : ID
{
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Update(item));
//}));
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
CosmosModelInfo container = await InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey = GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(async x =>
{
x.pk = type.Name;
x.ttl = null;
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(partitionKey).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public async Task Update(T entity) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
string partitionKey = GetPartitionKey();
Type type = typeof(T);
entity.pk = type.Name;
entity.ttl = null;
object o = type.GetProperty(partitionKey).GetValue(entity, null);
ItemResponse response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
if (container.cache && RedisHelper.Instance != null)
{
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Resource;
}
internal class Item
{
public string id { get; set; }
public string pk { get; set; }
public MemoryStream stream { get; set; }
}
public async Task> UpdateAll(List enyites) where T : ID
{
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Update(item));
//}));
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
CosmosModelInfo container = await InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey = GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List- itemsToInsert = new List
- ();
lists.ForEach(async x =>
{
x.pk = type.Name;
x.ttl = null;
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(partitionKey).GetValue(x, null);
Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
private async Task
> FindByIdAsSql(string id) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
string pk = container.type.Name;
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = @"SELECT *
FROM c
WHERE c.pk='" + pk + "' and c.id = @id",
Parameters = new Dictionary
{
{ "@id",id}
}
};
FeedIterator feedIterator = container.container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition);
return (await ResultsFromFeedIterator(feedIterator)).ToList();
}
public async Task FindByIdPk(string id, string pk) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
try {
ItemResponse response = await container.container.ReadItemAsync(id: id, partitionKey: new PartitionKey(pk));
return response.Resource;
}
catch (Exception ex) {
return default;
}
}
public async Task> FindById(string id, bool cache = true) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
if (container.cache && RedisHelper.Instance != null && cache == true)
{
return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByIdAsSql(id); });
}
else
{
return await FindByIdAsSql(id);
}
}
public async Task> FindByIds(List ids) where T : ID
{
CosmosModelInfo container = await InitializeCollection();
if (container.cache && RedisHelper.Instance != null)
{
List list = new List();
List NotIn = new List();
foreach (string id in ids)
{
if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
{
NotIn.Add(id);
}
else
{
list.Add(await RedisHelper.HGetAsync(CacheCosmosPrefix + container.container.Id, id));
}
}
if (NotIn.IsNotEmpty())
{
List noInList = await FindByDict(new Dictionary { { "id", NotIn.ToArray() } });
noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
list.AddRange(noInList);
}
return list;
}
else
{
return await FindByDict(new Dictionary { { "id", ids.ToArray() } });
}
}
public async Task FindById(string typeName, string id)
{
if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo container))
{
if (container.cache && RedisHelper.Instance != null)
{
return await RedisHelper.CacheShellAsync(CacheCosmosPrefix + container.container.Id, id, timeoutSeconds, () => { return FindByDict(typeName, new Dictionary { { "id", id } }); });
}
else
{
return await FindByDict(typeName, new Dictionary { { "id", id } });
}
}
else
{
throw new BizException("CollectionName named:" + typeName + " dose not exsit in Database!");
}
}
public async Task