using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents;
using TEAMModelOS.SDK.Helper.Security.AESCrypt;
using TEAMModelOS.SDK.Context.Exception;
using Microsoft.Azure.Documents.Linq;
using TEAMModelOS.SDK.Helper.Query.LinqHelper;
using System.Reflection;
using Microsoft.Azure.CosmosDB.BulkExecutor;
using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
using System.Threading;
using TEAMModelOS.SDK.Helper.Common.JsonHelper;
using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
using TEAMModelOS.SDK.Context.Attributes.Azure;
using System.Text;
namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
{ ///
/// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
/// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
///
public class AzureCosmosDBRepository : IAzureCosmosDBRepository
{
///
/// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
/// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
///
private readonly string china_con = "417A7572654368696E6120202020202020202020202020202020202020202020D63873D37F845F9DC7607B4DF4787EE26598454CE32FB5F2EE778A34A5015736196DF7940C67A034CDD4C4B44CD37C20";
private readonly string china_key = "417A7572654368696E61202020202020202020202020202020202020202020203CAA1DF7E3203F0ABCB2D60C1F3DCB6D90676C4D5467167F6E6A2CB3DDE975EA37B06BBAE6E012936BEDB6D5D60B28B13642F755CB25D1958BE5366EA20FA7C47E04A67B6A96111C61C3270CD0E5539CA45E3A77A6B483F47419BBAEDE75C0F6";
private readonly string global_con = "417A757265476C6F62616C2020202020202020202020202020202020202020200E357979CB69243DBF4E41BF5526830F89AB746007AC68A3DD3F9CFDA781509F1C48B2359120A5E58B8C7B1EDAA99DEA";
private readonly string global_key = "417A757265476C6F62616C2020202020202020202020202020202020202020209FF199D61813D1F4857D55CFB0A7D6A797FECF39A7F47553E9C1AF23674CB04BA95748A4A3C07B90F32E5EF26E0982DBF90001E066432075C434351D73FB387D27A50716D90F414F34A4579D846C27804F658705C05A7224EC4D695FD7A5EE23";
private DocumentClient CosmosClient { get; set; }
private DocumentCollection CosmosCollection { get; set; }
private string Database { get; set; }
private int CollectionThroughput { get; set; }
public AzureCosmosDBRepository(AzureCosmosDBOptions options)
{
try
{
if (!string.IsNullOrEmpty(options.ConnectionString))
{
CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
}
else if (AzureCosmosDBConfig.AZURE_CHINA.Equals(options.AzureTableDialect))
{
AESCrypt crypt = new AESCrypt();
CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(china_con, options.AzureTableDialect), crypt.Decrypt(china_key, options.AzureTableDialect)).GetCosmosDBClient();
}
else if (AzureCosmosDBConfig.AZURE_GLOBAL.Equals(options.AzureTableDialect))
{
AESCrypt crypt = new AESCrypt();
CosmosClient = CosmosDBClientSingleton.getInstance(crypt.Decrypt(global_con, options.AzureTableDialect), crypt.Decrypt(global_key, options.AzureTableDialect)).GetCosmosDBClient();
}
else
{
throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
}
Database = options.Database;
CollectionThroughput = options.CollectionThroughput;
CosmosClient.CreateDatabaseIfNotExistsAsync(new Database { Id = Database });
// _connectionString = options.ConnectionString;
}
catch (DocumentClientException de)
{
Exception baseException = de.GetBaseException();
Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
}
catch (Exception e)
{
Exception baseException = e.GetBaseException();
Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
// Console.ReadKey();
}
}
private async Task InitializeCollection()
{
Type t = typeof(T);
if (CosmosCollection == null || !CosmosCollection.Id.Equals(t.Name))
{
DocumentCollection collectionDefinition = new DocumentCollection { Id = t.Name };
collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
string partitionKey = GetPartitionKey();
// collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection() };
if (!string.IsNullOrEmpty(partitionKey))
{
collectionDefinition.PartitionKey.Paths.Add("/" + partitionKey);
}
// CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }
);
}
return CosmosCollection;
}
private string GetPartitionKey()
{
Type type = typeof(T);
PropertyInfo[] properties = type.GetProperties();
List attrProperties = new List();
foreach (PropertyInfo property in properties)
{
if (property.Name.Equals("PartitionKey"))
{
attrProperties.Add(property);
break;
}
object[] attributes = property.GetCustomAttributes(true);
foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
{
if (attribute is PartitionKeyAttribute)
{
attrProperties.Add(property);
}
}
}
if (attrProperties.Count <= 0)
{
return null;
}
else
{
if (attrProperties.Count == 1)
{
return attrProperties[0].Name;
}
else { throw new BizException("PartitionKey can only be single!"); }
}
}
public async Task Save(T entity) //where T : object, new()
{
try
{
Type t = typeof(T);
DocumentCollection documentCollection = await InitializeCollection();
ResourceResponse doc =
await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
//Console.WriteLine(doc.ActivityId);
return entity;
}
catch (Exception e)
{
throw new BizException(e.Message);
}
}
public async Task Update(T entity)
{
Type t = typeof(T);
await InitializeCollection();
ResourceResponse doc =
await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
return entity;
}
public async Task ReplaceObject(T entity, string key)
{
Type t = typeof(T);
await InitializeCollection();
try
{
ResourceResponse doc =
await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key), entity);
return key;
}
catch (Exception e)
{
Console.WriteLine("{0} Exception caught.", e);
//return false;
}
return null;
}
public async Task ReplaceObject(T entity, string key, string partitionKey)
{
Type t = typeof(T);
await InitializeCollection();
try
{
ResourceResponse doc =
await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key),
entity,
new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
return key;
}
catch (Exception e)
{
throw new BizException(e.Message);
//Console.WriteLine("{0} Exception caught.", e);
//return false;
}
}
public async Task> FindAll()
{
Type t = typeof(T);
Boolean open = true;
List objs = new List();
//await InitializeCollection();
//查询条数 -1是全部
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions).AsDocumentQuery();
while (query.HasMoreResults)
{
foreach (T obj in await query.ExecuteNextAsync())
{
objs.Add(obj);
}
}
return objs;
//return CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
}
public async Task> FindLinq(Func, object> singleOrDefault)
{
Type t = typeof(T);
List objs = new List();
await InitializeCollection();
//查询条数 -1是全部
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
// query.Where();
return objs;
//return CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
}
public async Task> FindSQL(string sql)
{
Type t = typeof(T);
List objs = new List();
await InitializeCollection();
var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql);
foreach (var item in query)
{
objs.Add(item);
}
return objs;
}
public async Task> FindSQL(string sql, bool IsPk)
{
Type t = typeof(T);
List objs = new List();
Boolean open = IsPk;
await InitializeCollection();
//查询条数 -1是全部
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql, queryOptions);
foreach (var item in query)
{
objs.Add(item);
}
return objs;
}
public async Task> FindByParams(Dictionary dict)
{
//await InitializeCollection();
Type t = typeof(T);
Boolean open = true;
List filters = new List();
string PKname = "";
PropertyInfo[] properties = t.GetProperties();
foreach (PropertyInfo property in properties)
{
object[] attributes = property.GetCustomAttributes(true);
foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
{
if (attribute is PartitionKeyAttribute)
{
PKname = property.Name;
break;
}
}
}
foreach (string key in dict.Keys)
{
//if (t.Name.Equals(key)) {
// open = false;
//}
if (PKname.Equals(key))
{
open = false;
}
filters.Add(new Filter { Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
}
//List objs = new List();
await InitializeCollection();
//查询条数 -1是全部
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
List list = DynamicLinq.GenerateFilter(query, filters).ToList();
return list;
//return CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
}
public async Task DeleteAsync(string id)
{
Type t = typeof(T);
await InitializeCollection();
ResourceResponse doc =
await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id));
//Console.WriteLine(doc.ActivityId);
return id;
}
public async Task DeleteAsync(T entity)
{
await InitializeCollection();
Type t = typeof(T);
string PartitionKey = GetPartitionKey();
if (!string.IsNullOrEmpty(PartitionKey))
{
string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
ResourceResponse doc =
await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
}
else
{
string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
ResourceResponse doc =
await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue));
}
//Console.WriteLine(doc.ActivityId);
return entity;
}
public async Task DeleteAsync(string id, string partitionKey)
{
Type t = typeof(T);
await InitializeCollection();
ResourceResponse doc =
await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
//Console.WriteLine(doc.ActivityId);
return id;
}
public async Task> SaveAll(List enyites)
{
DocumentCollection dataCollection = await InitializeCollection();
// Set retry options high for initialization (default values).
CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass control to bulk executor.
CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
BulkImportResponse bulkImportResponse = null;
long totalNumberOfDocumentsInserted = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
int pageSize = 100;
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
for (int i = 0; i < pages; i++)
{
List documentsToImportInBatch = new List();
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
for (int j = 0; j < lists.Count; j++)
{
documentsToImportInBatch.Add(lists[j].ToJson());
}
var tasks = new List
{ Task.Run(async () =>
{
do
{
//try
//{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: null,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
//}
//catch (DocumentClientException de)
//{
// break;
//}
//catch (Exception e)
//{
// break;
//}
} while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
},
token)
};
await Task.WhenAll(tasks);
}
return enyites;
}
public async Task> UpdateAll(Dictionary dict, Dictionary updateFilters, List deleteKeys = null)
{
DocumentCollection dataCollection = await InitializeCollection();
IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
await bulkExecutor.InitializeAsync();
BulkUpdateResponse bulkUpdateResponse = null;
long totalNumberOfDocumentsUpdated = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
// Generate update operations.
List updateOperations = new List();
// Unset the description field.
if (null != updateFilters && updateFilters.Count > 0)
{
var keys = updateFilters.Keys;
foreach (string key in keys)
{
// updateOperations.Add(new SetUpdateOperation())
if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
{
updateOperations.Add(SwitchType(key, updateFilters[key]));
}
}
}
if (deleteKeys.IsNotEmpty())
{
foreach (string key in deleteKeys)
{
updateOperations.Add(new UnsetUpdateOperation(key));
}
}
List list = await FindByParams(dict);
int pageSize = 100;
int pages = (int)Math.Ceiling((double)list.Count / pageSize);
string partitionKey = "/" + GetPartitionKey();
Type type = typeof(T);
for (int i = 0; i < pages; i++)
{
List updateItemsInBatch = new List();
List lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
for (int j = 0; j < lists.Count; j++)
{
string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
}
var tasks = new List
{ Task.Run(async () =>
{
do
{
//try
//{
bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
updateItems: updateItemsInBatch,
maxConcurrencyPerPartitionKeyRange: null,
cancellationToken: token);
//}
//catch (DocumentClientException de)
//{
// break;
//}
//catch (Exception e)
//{
// break;
//}
} while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
},
token)
};
await Task.WhenAll(tasks);
}
return list;
}
public async Task> DeleteAll(Dictionary dict)
{
DocumentCollection dataCollection = await InitializeCollection();
List list = await FindByParams(dict);
List> pkIdTuplesToDelete = new List>();
if (list.IsNotEmpty())
{
foreach (T t in list)
{
string id = t.GetType().GetProperty("id").GetValue(t) + "";
pkIdTuplesToDelete.Add(new Tuple(id, id));
}
}
else
{
return null;
}
long totalNumberOfDocumentsDeleted = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
BulkDeleteResponse bulkDeleteResponse = null;
BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
await bulkExecutor.InitializeAsync();
bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
return list;
}
private static UpdateOperation SwitchType(string key, object obj)
{
Type s = obj.GetType();
TypeCode typeCode = Type.GetTypeCode(s);
switch (typeCode)
{
case TypeCode.String: return new SetUpdateOperation(key, obj.ToString());
case TypeCode.Int32: return new SetUpdateOperation(key, (Int32)obj);
case TypeCode.Double: return new SetUpdateOperation(key, (Double)obj);
case TypeCode.Byte: return new SetUpdateOperation(key, (Byte)obj);
case TypeCode.Boolean: return new SetUpdateOperation(key, (Boolean)obj);
case TypeCode.DateTime: return new SetUpdateOperation(key, (DateTime)obj);
case TypeCode.Int64: return new SetUpdateOperation(key, (Int64)obj);
default: return null;
}
}
public async Task> FindByDict(Dictionary dict, bool IsPk)
{
Type t = typeof(T);
List objs = new List();
Boolean open = IsPk;
await InitializeCollection();
StringBuilder sql = new StringBuilder("select * from c where 1=1 ");
if (dict != null)
{
foreach (string key in dict.Keys)
{
sql.Append(GenSql(dict[key], key));
}
}
//查询条数 -1是全部
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql.ToString(), queryOptions);
foreach (var item in query)
{
objs.Add(item);
}
return objs;
}
private static string GenSql(object obj, string key)
{
Type s = obj.GetType();
TypeCode typeCode = Type.GetTypeCode(s);
switch (typeCode)
{
case TypeCode.String: return "and c." + key + "=" + "'" + obj.ToString() + "'";
case TypeCode.Int32: return "and c." + key + "=" + int.Parse(obj.ToString());
case TypeCode.Double: return "and c." + key + "=" + double.Parse(obj.ToString());
//case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
case TypeCode.Boolean: return "and c." + key + "=" + bool.Parse(obj.ToString());
case TypeCode.DateTime: return "and c." + key + "=" + (DateTime)obj;
case TypeCode.Int64: return "and c." + key + "=" + long.Parse(obj.ToString());
default: return null;
}
}
}
}