using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using TEAMModelOS.SDK.Context.Attributes.Azure;
using TEAMModelOS.SDK.Context.Exception;
using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
{
public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository, IDisposable
{
private CosmosClient CosmosClient { get; set; }
///
/// 线程安全的dict类型
///
private Dictionary DocumentCollectionDict { get; set; } = new Dictionary();
private string DatabaseId { get; set; }
private int CollectionThroughput { get; set; }
private Database database = null;
private string[] ScanModel { get; set; }
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
DatabaseId = options.Database;
CollectionThroughput = options.CollectionThroughput;
ScanModel = options.ScanModel;
// InitializeDatabase().GetAwaiter().GetResult();
}
catch (CosmosException e)
{
Dispose(true);
throw new BizException(e.Message, 500, e.StackTrace);
}
}
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
DatabaseId = options.Database;
CollectionThroughput = options.CollectionThroughput;
ScanModel = options.ScanModel;
// InitializeDatabase().GetAwaiter().GetResult();
}
catch (CosmosException e)
{
Dispose(true);
throw new BizException(e.Message, 500, e.StackTrace);
}
}
public async Task InitializeDatabase()
{
try
{
database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
FeedIterator resultSetIterator = database.GetContainerQueryIterator();
while (resultSetIterator.HasMoreResults)
{
foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
{
DocumentCollectionDict.TryAdd(container.Id, database.GetContainer(container.Id));
}
}
//获取数据库所有的表
List types = ReflectorExtensions.GetAllTypeAsAttribute(ScanModel);
foreach (Type type in types)
{
string PartitionKey = GetPartitionKey(type);
string CollectionName = "";
int RU = 0;
IEnumerable attributes = type.GetCustomAttributes(true);
if (!string.IsNullOrEmpty(attributes.First().Name))
{
CollectionName = attributes.First().Name;
}
else
{
CollectionName = type.Name;
}
if (attributes.First().RU > 400)
{
RU = attributes.First().RU;
}
else
{
RU = CollectionThroughput;
}
//如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
if (DocumentCollectionDict.TryGetValue(CollectionName, out Container collection))
{ //更新RU
int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReadThroughputAsync();
if (throughputResponse < RU)
{
await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReplaceThroughputAsync(RU);
}
}
else
{
ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
if (!string.IsNullOrEmpty(PartitionKey))
{
containerProperties.PartitionKeyPath = "/" + PartitionKey;
}
if (RU > CollectionThroughput)
{
CollectionThroughput = RU;
}
Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
}
}
}
catch (CosmosException e)
{
throw new BizException(e.Message, 500, e.StackTrace);
}
}
private string GetPartitionKey()
{
Type type = typeof(T);
return GetPartitionKey(type);
}
private string GetPartitionKey(Type type)
{
PropertyInfo[] properties = type.GetProperties();
List attrProperties = new List();
foreach (PropertyInfo property in properties)
{
if (property.Name.Equals("PartitionKey"))
{
attrProperties.Add(property);
break;
}
object[] attributes = property.GetCustomAttributes(true);
foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
{
if (attribute is PartitionKeyAttribute)
{
attrProperties.Add(property);
}
}
}
if (attrProperties.Count <= 0)
{
throw new BizException(type.Name + "has no PartitionKey !");
}
else
{
if (attrProperties.Count == 1)
{
return attrProperties[0].Name;
}
else { throw new BizException("PartitionKey can only be single!"); }
}
}
private async Task InitializeCollection()
{
Type type = typeof(T);
string partitionKey = GetPartitionKey();
string CollectionName;
IEnumerable attributes = type.GetCustomAttributes(true);
if (!string.IsNullOrEmpty(attributes.First().Name))
{
CollectionName = attributes.First().Name;
}
else
{
CollectionName = type.Name;
}
return await InitializeCollection(CollectionName, partitionKey);
}
private async Task InitializeCollection(string CollectionName, string PartitionKey)
{
/////内存中已经存在这个表则直接返回
if (DocumentCollectionDict.TryGetValue(CollectionName, out Container DocumentCollection))
{
return DocumentCollection;
}///如果没有则尝试默认创建
else
{
ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
if (!string.IsNullOrEmpty(PartitionKey))
{
containerProperties.PartitionKeyPath = "/" + PartitionKey;
}
Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
return containerWithConsistentIndexing;
}
}
public async Task DeleteAll(List> ids) where T : ID
{
string partitionKey = GetPartitionKey();
await Task.Run(() => Parallel.ForEach(ids, (item) =>
{
Task.WaitAll(DeleteAsync(item.Value, item.Key));
}));
}
public async Task DeleteAll(List entities) where T : ID
{
string partitionKey = GetPartitionKey();
Type type = typeof(T);
await Task.Run(() => Parallel.ForEach(entities, (item) =>
{
object o = type.GetProperty(partitionKey).GetValue(item, null);
Task.WaitAll(DeleteAsync(item.id, o.ToString()));
}));
}
public async Task DeleteAsync(string id, string pk) where T : ID
{
Container container = await InitializeCollection();
ItemResponse response = await container.DeleteItemAsync(id: id, partitionKey: new PartitionKey(pk));
return response.Resource;
}
public async Task DeleteAsync(T entity, string pk) where T : ID
{
Container container = await InitializeCollection();
string partitionKey = GetPartitionKey();
Type type = typeof(T);
object o = type.GetProperty(partitionKey).GetValue(entity, null);
ItemResponse response = await container.DeleteItemAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
return response.Resource;
}
//public async Task DeleteAsync(string id) where T : ID
//{
// Container container = await InitializeCollection();
// ItemResponse response = await container.DeleteItemAsync(id: id, partitionKey: new PartitionKey(GetPartitionKey()));
// return response.Resource;
//}
public async Task> FindAll() where T : ID
{
Container container = await InitializeCollection();
return await ResultsFromFeedIterator(container.GetItemQueryIterator());
}
private async Task> ResultsFromFeedIterator(FeedIterator query, int? maxItemCount = null)
{
List results = new List();
while (query.HasMoreResults)
{
foreach (T t in await query.ReadNextAsync())
{
results.Add(t);
if (results.Count == maxItemCount)
{
return results;
}
}
}
return results;
}
private async Task> ResultsFromFeedIterator(FeedIterator query, Func, Task> batchAction, int itemsPerPage)
{
List results = new List();
while (query.HasMoreResults)
{
if (results.Count() >= itemsPerPage)
{
await batchAction(results);
results.Clear();
}
results.AddRange(await query.ReadNextAsync());
}
if (results.Count() > 0)
{
await batchAction(results);
results.Clear();
}
return results;
}
public async Task> FindByDict(string CollectionName, Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
{
if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
{
//StringBuilder sql = new StringBuilder("select value(c) from c");
//SQLHelper.GetSQL(dict, ref sql);
//CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
//{
// QueryText = sql.ToString()
//};
StringBuilder sql = new StringBuilder("select value(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator query = container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindCountByDict(string CollectionName, Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
{
if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
{
//StringBuilder sql = new StringBuilder("select value count(c) from c");
//SQLHelper.GetSQL(dict, ref sql);
//CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
//{
// QueryText = sql.ToString()
//};
StringBuilder sql = new StringBuilder("select value count(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator query = container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindByParams(Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) where T : ID
{
return await FindByDict(dict, itemsPerPage, maxItemCount, partitionKey);
}
public async Task> FindByDict(Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) where T : ID
{
StringBuilder sql = new StringBuilder("select value(c) from c");
SQLHelper.GetSQL(dict, ref sql);
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = sql.ToString()
};
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
return await ResultsFromQueryAndOptions(cosmosDbQuery, queryRequestOptions);
}
private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
{
Container container = await InitializeCollection();
FeedIterator query = container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
{
return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
}
private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
int? maxBufferedItemCount = null,
int? maxConcurrency = null)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
MaxBufferedItemCount = maxBufferedItemCount ?? 100,
MaxConcurrency = maxConcurrency ?? 50
};
return queryRequestOptions;
}
private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, Func, Task> batchAction, QueryRequestOptions queryOptions)
{
Container container = await InitializeCollection();
FeedIterator query = container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
}
private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage
};
return queryRequestOptions;
}
public async Task> FindLinq(Expression> query = null, Expression> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
{
//QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator feedIterator;
Container container = await InitializeCollection();
if (query == null)
{
if (order != null)
{
if (isDesc)
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderByDescending(order)
.ToFeedIterator();
}
else
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderBy(order)
.ToFeedIterator();
}
}
else
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.ToFeedIterator();
}
}
else
{
if (order != null)
{
if (isDesc)
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query).OrderByDescending(order)
.ToFeedIterator();
}
else
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query).OrderBy(order)
.ToFeedIterator();
}
}
else
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query)
.ToFeedIterator();
}
}
return await ResultsFromFeedIterator(feedIterator);
}
public async Task> FindSQL(string sql, Dictionary Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
{
Container container = await InitializeCollection();
QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
if (Parameters != null)
{
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = sql,
Parameters = Parameters
};
FeedIterator feedIterator = container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
return await ResultsFromFeedIterator(feedIterator);
}
else
{
QueryDefinition queryDefinition = new QueryDefinition(sql);
return await ResultsFromFeedIterator(container.GetItemQueryIterator(queryDefinition));
}
}
//public async Task> FindSQL(string sql, bool isPK) where T : ID
//{
// Container container = await InitializeCollection();
// QueryDefinition queryDefinition = new QueryDefinition(sql);
// return await ResultsFromFeedIterator(container.GetItemQueryIterator(queryDefinition));
//}
public async Task ReplaceObject(T entity) where T : ID
{
Container container = await InitializeCollection();
ItemResponse response = await container.ReplaceItemAsync(item: entity, id: entity.id);
if (response.StatusCode.Equals(HttpStatusCode.OK))
{
return response.Resource;
}
else { throw new BizException("error"); }
}
//public async Task ReplaceObject(T entity, string key, string partitionKey) where T : ID
//{
// Container container = await InitializeCollection();
// ItemResponse response = await container.ReplaceItemAsync(item: entity, id: entity.id);
// if (response.StatusCode.Equals(HttpStatusCode.OK))
// {
// return response.Resource;
// }
// else { throw new BizException("error"); }
//}
public async Task Save(T entity) where T : ID
{
try
{
Container container = await InitializeCollection();
ItemResponse response = await container.CreateItemAsync(entity);
return response.Resource;
}
catch (Exception e)
{
throw new BizException(e.Message);
}
}
public async Task> SaveAll(List enyites) where T : ID
{
await Task.Run(() => Parallel.ForEach(enyites, (item) =>
{
Task.WaitAll(Save(item));
}));
return enyites;
}
public async Task Update(T entity) where T : ID
{
Container container = await InitializeCollection();
ItemResponse response = await container.UpsertItemAsync(entity);
return response.Resource;
}
public async Task> UpdateAll(List entities) where T : ID
{
await Task.Run(() => Parallel.ForEach(entities, (item) =>
{
Task.WaitAll(Update(item));
}));
return entities;
}
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
CosmosClient?.Dispose();
}
}
public async Task FindById(string id) where T : ID
{
Container container = await InitializeCollection();
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = @"SELECT *
FROM c
WHERE c.id = @id",
Parameters = new Dictionary
{
{ "@id",id}
}
};
FeedIterator feedIterator = container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition);
return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
}
public async Task FindByIdPk(string id, string pk) where T : ID
{
Container container = await InitializeCollection();
ItemResponse response = await container.ReadItemAsync(id: id, partitionKey: new PartitionKey(pk));
return response.Resource;
}
public async Task> FindByDictTest(Dictionary dict, int itemsPerPage = 1, int? maxItemCount = 1, string partitionKey = null) where T : ID
{
//Container container = await InitializeCollection();
StringBuilder sql = new StringBuilder("select value(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
//FeedIterator feedIterator = container
// .GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition);
// return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
return await ResultsFromQueryAndOptions(cosmosDbQuery, queryRequestOptions, maxItemCount);
}
}
}