using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using TEAMModelOS.SDK.Context.Attributes.Azure;
using TEAMModelOS.SDK.Context.Exception;
using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
{
public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
{
private CosmosClient CosmosClient { get; set; }
///
/// 线程安全的dict类型
///
private Dictionary DocumentCollectionDict { get; set; } = new Dictionary();
private string DatabaseId { get; set; }
private int CollectionThroughput { get; set; }
private Database database { get; set; }
private string[] ScanModel { get; set; }
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
DatabaseId = options.Database;
CollectionThroughput = options.CollectionThroughput;
ScanModel = options.ScanModel;
// InitializeDatabase().GetAwaiter().GetResult();
}
catch (CosmosException e)
{
// Dispose(true);
throw new BizException(e.Message, 500, e.StackTrace);
}
}
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
DatabaseId = options.Database;
CollectionThroughput = options.CollectionThroughput;
ScanModel = options.ScanModel;
// InitializeDatabase().GetAwaiter().GetResult();
}
catch (CosmosException e)
{
// Dispose(true);
throw new BizException(e.Message, 500, e.StackTrace);
}
}
public async Task InitializeDatabase()
{
try
{
database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
FeedIterator resultSetIterator = database.GetContainerQueryIterator();
while (resultSetIterator.HasMoreResults)
{
foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
{
DocumentCollectionDict.TryAdd(container.Id, database.GetContainer(container.Id));
}
}
//获取数据库所有的表
List types = ReflectorExtensions.GetAllTypeAsAttribute(ScanModel);
foreach (Type type in types)
{
string PartitionKey = GetPartitionKey(type);
string CollectionName = "";
int RU = 0;
IEnumerable attributes = type.GetCustomAttributes(true);
if (!string.IsNullOrEmpty(attributes.First().Name))
{
CollectionName = attributes.First().Name;
}
else
{
CollectionName = type.Name;
}
if (attributes.First().RU > 400)
{
RU = attributes.First().RU;
}
else
{
RU = CollectionThroughput;
}
//如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
if (DocumentCollectionDict.TryGetValue(CollectionName, out Container collection))
{ //更新RU
int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReadThroughputAsync();
if (throughputResponse < RU)
{
await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReplaceThroughputAsync(RU);
}
}
else
{
ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
if (!string.IsNullOrEmpty(PartitionKey))
{
containerProperties.PartitionKeyPath = "/" + PartitionKey;
}
if (RU > CollectionThroughput)
{
CollectionThroughput = RU;
}
Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
}
}
}
catch (CosmosException e)
{
throw new BizException(e.Message, 500, e.StackTrace);
}
}
private string GetPartitionKey()
{
Type type = typeof(T);
return GetPartitionKey(type);
}
private string GetPartitionKey(Type type)
{
PropertyInfo[] properties = type.GetProperties();
List attrProperties = new List();
foreach (PropertyInfo property in properties)
{
if (property.Name.Equals("PartitionKey"))
{
attrProperties.Add(property);
break;
}
object[] attributes = property.GetCustomAttributes(true);
foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
{
if (attribute is PartitionKeyAttribute)
{
attrProperties.Add(property);
}
}
}
if (attrProperties.Count <= 0)
{
throw new BizException(type.Name + "has no PartitionKey !");
}
else
{
if (attrProperties.Count == 1)
{
return attrProperties[0].Name;
}
else { throw new BizException("PartitionKey can only be single!"); }
}
}
private async Task InitializeCollection()
{
Type type = typeof(T);
string partitionKey = GetPartitionKey();
string CollectionName;
IEnumerable attributes = type.GetCustomAttributes(true);
if (!string.IsNullOrEmpty(attributes.First().Name))
{
CollectionName = attributes.First().Name;
}
else
{
CollectionName = type.Name;
}
return await InitializeCollection(CollectionName, partitionKey);
}
private async Task InitializeCollection(string CollectionName, string PartitionKey)
{
/////内存中已经存在这个表则直接返回
if (DocumentCollectionDict.TryGetValue(CollectionName, out Container DocumentCollection))
{
return DocumentCollection;
}///如果没有则尝试默认创建
else
{
ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
if (!string.IsNullOrEmpty(PartitionKey))
{
containerProperties.PartitionKeyPath = "/" + PartitionKey;
}
Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
return containerWithConsistentIndexing;
}
}
public async Task DeleteAll(List> ids) where T : ID
{
Container container = await InitializeCollection();
//string partitionKey = GetPartitionKey();
//await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(DeleteAsync(item.Value, item.Key));
//}));
Stopwatch stopwatch = Stopwatch.StartNew();
List tasks = new List(ids.Count);
ids.ForEach(item => {
tasks.Add(container.DeleteItemStreamAsync(item.Value,new PartitionKey(item.Key))
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
//await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Save(item));
//}));
await Task.WhenAll(tasks);
stopwatch.Stop();
}
public async Task DeleteAll(List entities) where T : ID
{
//string partitionKey = GetPartitionKey();
//Type type = typeof(T);
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// object o = type.GetProperty(partitionKey).GetValue(item, null);
// Task.WaitAll(DeleteAsync(item.id, o.ToString()));
//}));
Container container = await InitializeCollection();
string pk = GetPartitionKey();
Type type = typeof(T);
List> itemsToInsert = new List>();
entities.ForEach( x => {
//MemoryStream stream = new MemoryStream();
//await JsonSerializer.SerializeAsync(stream, x);
object o = type.GetProperty(pk).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), x.id);
itemsToInsert.Add(keyValue);
});
Stopwatch stopwatch = Stopwatch.StartNew();
List tasks = new List(entities.Count);
itemsToInsert.ForEach(item => {
tasks.Add(container.DeleteItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
//await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Save(item));
//}));
await Task.WhenAll(tasks);
stopwatch.Stop();
}
public async Task DeleteAsync(string id, string pk) where T : ID
{
Container container = await InitializeCollection();
ItemResponse response = await container.DeleteItemAsync(id: id, partitionKey: new PartitionKey(pk));
return response.Resource;
}
public async Task DeleteAsync(T entity) where T : ID
{
Container container = await InitializeCollection();
string partitionKey = GetPartitionKey();
Type type = typeof(T);
object o = type.GetProperty(partitionKey).GetValue(entity, null);
ItemResponse response = await container.DeleteItemAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
return response.Resource;
}
//public async Task DeleteAsync(string id) where T : ID
//{
// Container container = await InitializeCollection();
// ItemResponse response = await container.DeleteItemAsync(id: id, partitionKey: new PartitionKey(GetPartitionKey()));
// return response.Resource;
//}
public async Task> FindAll() where T : ID
{
Container container = await InitializeCollection();
return await ResultsFromFeedIterator(container.GetItemQueryIterator());
}
private async Task> ResultsFromFeedIterator(FeedIterator query, int? maxItemCount = null)
{
List results = new List();
while (query.HasMoreResults)
{
foreach (T t in await query.ReadNextAsync())
{
results.Add(t);
if (results.Count == maxItemCount)
{
return results;
}
}
}
return results;
}
private async Task> ResultsFromFeedIterator(FeedIterator query, Func, Task> batchAction, int itemsPerPage)
{
List results = new List();
while (query.HasMoreResults)
{
if (results.Count() >= itemsPerPage)
{
await batchAction(results);
results.Clear();
}
results.AddRange(await query.ReadNextAsync());
}
if (results.Count() > 0)
{
await batchAction(results);
results.Clear();
}
return results;
}
public async Task> FindByDict(string CollectionName, Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
{
if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
{
//StringBuilder sql = new StringBuilder("select value(c) from c");
//SQLHelper.GetSQL(dict, ref sql);
//CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
//{
// QueryText = sql.ToString()
//};
StringBuilder sql = new StringBuilder("select value(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict,sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator query = container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindCountByDict(string CollectionName, Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
{
if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
{
//StringBuilder sql = new StringBuilder("select value count(c) from c");
//SQLHelper.GetSQL(dict, ref sql);
//CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
//{
// QueryText = sql.ToString()
//};
StringBuilder sql = new StringBuilder("select value count(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator query = container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public async Task> FindByParams(Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) where T : ID
{
return await FindByDict(dict, itemsPerPage, maxItemCount, partitionKey);
}
public async Task> FindByDict(Dictionary dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null) where T : ID
{
StringBuilder sql = new StringBuilder("select value(c) from c");
CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
return await ResultsFromQueryAndOptions(cosmosDbQuery, queryRequestOptions);
}
private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
{
Container container = await InitializeCollection();
FeedIterator query = container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query, maxItemCount);
}
private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
{
return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
}
private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
int? maxBufferedItemCount = null,
int? maxConcurrency = null)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
MaxBufferedItemCount = maxBufferedItemCount ?? 100,
MaxConcurrency = maxConcurrency ?? 50
};
return queryRequestOptions;
}
private async Task> ResultsFromQueryAndOptions(CosmosDbQuery cosmosDbQuery, Func, Task> batchAction, QueryRequestOptions queryOptions)
{
Container container = await InitializeCollection();
FeedIterator query = container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
}
private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage
};
return queryRequestOptions;
}
public async Task> FindLinq(Expression> query = null, Expression> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
{
//QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
FeedIterator feedIterator;
Container container = await InitializeCollection();
if (query == null)
{
if (order != null)
{
if (isDesc)
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderByDescending(order)
.ToFeedIterator();
}
else
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions).OrderBy(order)
.ToFeedIterator();
}
}
else
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.ToFeedIterator();
}
}
else
{
if (order != null)
{
if (isDesc)
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query).OrderByDescending(order)
.ToFeedIterator();
}
else
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query).OrderBy(order)
.ToFeedIterator();
}
}
else
{
feedIterator = container
.GetItemLinqQueryable(requestOptions: queryRequestOptions)
.Where(query)
.ToFeedIterator();
}
}
return await ResultsFromFeedIterator(feedIterator);
}
public async Task> FindSQL(string sql, Dictionary Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
{
Container container = await InitializeCollection();
QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
if (Parameters != null)
{
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = sql,
Parameters = Parameters
};
FeedIterator feedIterator = container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
return await ResultsFromFeedIterator(feedIterator);
}
else
{
QueryDefinition queryDefinition = new QueryDefinition(sql);
return await ResultsFromFeedIterator(container.GetItemQueryIterator(queryDefinition));
}
}
//public async Task> FindSQL(string sql, bool isPK) where T : ID
//{
// Container container = await InitializeCollection();
// QueryDefinition queryDefinition = new QueryDefinition(sql);
// return await ResultsFromFeedIterator(container.GetItemQueryIterator(queryDefinition));
//}
public async Task ReplaceObject(T entity) where T : ID
{
Container container = await InitializeCollection();
ItemResponse response = await container.ReplaceItemAsync(item: entity, id: entity.id);
if (response.StatusCode.Equals(HttpStatusCode.OK))
{
return response.Resource;
}
else { throw new BizException("error"); }
}
//public async Task ReplaceObject(T entity, string key, string partitionKey) where T : ID
//{
// Container container = await InitializeCollection();
// ItemResponse response = await container.ReplaceItemAsync(item: entity, id: entity.id);
// if (response.StatusCode.Equals(HttpStatusCode.OK))
// {
// return response.Resource;
// }
// else { throw new BizException("error"); }
//}
public async Task Save(T entity) where T : ID
{
try
{
Container container = await InitializeCollection();
ItemResponse response = await container.CreateItemAsync(entity);
return response.Resource;
}
catch (Exception e)
{
throw new BizException(e.Message);
}
}
public async Task> SaveAll(List enyites) where T : ID
{
Container container = await InitializeCollection();
string pk= GetPartitionKey();
Type type = typeof(T);
List> itemsToInsert = new List>();
enyites.ForEach(async x => {
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x);
object o = type.GetProperty(pk).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
int count=0;
Stopwatch stopwatch = Stopwatch.StartNew();
List tasks = new List(enyites.Count);
itemsToInsert.ForEach(item => {
tasks.Add(container.CreateItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
count++;
Console.WriteLine(response.RequestMessage);
}
}
}
));
});
await Task.WhenAll(tasks);
stopwatch.Stop();
Console.WriteLine(count);
return enyites;
}
public async Task Update(T entity) where T : ID
{
Container container = await InitializeCollection();
ItemResponse response = await container.UpsertItemAsync(entity);
return response.Resource;
}
public async Task> UpdateAll(List entities) where T : ID
{
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Update(item));
//}));
Container container = await InitializeCollection();
string pk = GetPartitionKey();
Type type = typeof(T);
List> itemsToInsert = new List>();
entities.ForEach(async x => {
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x);
object o = type.GetProperty(pk).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
Stopwatch stopwatch = Stopwatch.StartNew();
List tasks = new List(entities.Count);
itemsToInsert.ForEach(item => {
tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
//await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Save(item));
//}));
await Task.WhenAll(tasks);
stopwatch.Stop();
return entities;
}
//public void Dispose()
//{
// Dispose(true);
//}
//protected virtual void Dispose(bool disposing)
//{
// if (disposing)
// {
// CosmosClient?.Dispose();
// }
//}
public async Task FindById(string id) where T : ID
{
Container container = await InitializeCollection();
CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
{
QueryText = @"SELECT *
FROM c
WHERE c.id = @id",
Parameters = new Dictionary
{
{ "@id",id}
}
};
FeedIterator feedIterator = container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition);
return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
}
public async Task FindByIdPk(string id, string pk) where T : ID
{
Container container = await InitializeCollection();
ItemResponse response = await container.ReadItemAsync(id: id, partitionKey: new PartitionKey(pk));
return response.Resource;
}
}
}