|
@@ -1,851 +0,0 @@
|
|
|
-using System;
|
|
|
-using System.Collections.Generic;
|
|
|
-using System.Threading.Tasks;
|
|
|
-using System.Linq;
|
|
|
-using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
|
|
|
-using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
|
|
|
-using Microsoft.Azure.Documents.Client;
|
|
|
-using Microsoft.Azure.Documents;
|
|
|
-using TEAMModelOS.SDK.Helper.Security.AESCrypt;
|
|
|
-using TEAMModelOS.SDK.Context.Exception;
|
|
|
-using Microsoft.Azure.Documents.Linq;
|
|
|
-using TEAMModelOS.SDK.Helper.Query.LinqHelper;
|
|
|
-using System.Reflection;
|
|
|
-using Microsoft.Azure.Cosmos;
|
|
|
-using Microsoft.Azure.CosmosDB.BulkExecutor;
|
|
|
-using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
|
|
|
-using System.Threading;
|
|
|
-using TEAMModelOS.SDK.Helper.Common.JsonHelper;
|
|
|
-using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
|
|
|
-using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
|
|
|
-using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
|
|
|
-using TEAMModelOS.SDK.Context.Attributes.Azure;
|
|
|
-using System.Text;
|
|
|
-using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
|
|
|
-using Microsoft.AspNetCore.Hosting;
|
|
|
-using System.Collections.Concurrent;
|
|
|
-using DataType = Microsoft.Azure.Documents.DataType;
|
|
|
-using RequestOptions = Microsoft.Azure.Documents.Client.RequestOptions;
|
|
|
-using PartitionKey = Microsoft.Azure.Documents.PartitionKey;
|
|
|
-using Newtonsoft.Json.Linq;
|
|
|
-using TEAMModelOS.SDK.Helper.Common.LogHelper;
|
|
|
-using System.Text.Json;
|
|
|
-using System.Collections;
|
|
|
-
|
|
|
-namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
|
|
|
-{ /// <summary>
|
|
|
- /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
|
|
|
- /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
|
|
|
- /// </summary>
|
|
|
- public class AzureCosmosDBRepository : IAzureCosmosDBRepository
|
|
|
- {
|
|
|
- /// <summary>
|
|
|
- /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
|
|
|
- /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
|
|
|
- /// </summary>
|
|
|
-
|
|
|
-
|
|
|
- private DocumentClient CosmosClient { get; set; }
|
|
|
- /// <summary>
|
|
|
- /// 线程安全的dict类型
|
|
|
- /// </summary>
|
|
|
- private ConcurrentDictionary<string, DocumentCollection> DocumentCollectionDict { get; set; } = new ConcurrentDictionary<string, DocumentCollection>();
|
|
|
-
|
|
|
- // private DocumentCollection CosmosCollection { get; set; }
|
|
|
- private string Database { get; set; }
|
|
|
- private int CollectionThroughput { get; set; }
|
|
|
- public AzureCosmosDBRepository(AzureCosmosDBOptions options)
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- if (!string.IsNullOrEmpty(options.ConnectionString))
|
|
|
- {
|
|
|
- CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
|
|
|
- }
|
|
|
- Database = options.Database;
|
|
|
- CollectionThroughput = options.CollectionThroughput;
|
|
|
- CosmosClient.CreateDatabaseIfNotExistsAsync(new Microsoft.Azure.Documents.Database { Id = Database });
|
|
|
- // _connectionString = options.ConnectionString;
|
|
|
- // CosmosSerializer
|
|
|
-
|
|
|
- //获取数据库所有的表
|
|
|
- Microsoft.Azure.Documents.Client.FeedResponse<DocumentCollection> collections = CosmosClient.ReadDocumentCollectionFeedAsync(UriFactory.CreateDatabaseUri(Database)).GetAwaiter().GetResult();
|
|
|
- foreach (IGrouping<string, DocumentCollection> group in collections.GroupBy(c => c.Id))
|
|
|
- {
|
|
|
- DocumentCollectionDict.TryAdd(group.Key, group.First());
|
|
|
- }
|
|
|
- //collections
|
|
|
- List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(options.ScanModel);
|
|
|
- foreach (Type type in types)
|
|
|
- {
|
|
|
- string PartitionKey = GetPartitionKey(type);
|
|
|
- string CollectionName = "";
|
|
|
- int RU = 0;
|
|
|
- IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
|
|
|
- if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
|
|
|
- {
|
|
|
- CollectionName = attributes.First<CosmosDBAttribute>().Name;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- CollectionName = type.Name;
|
|
|
- }
|
|
|
- if (attributes.First<CosmosDBAttribute>().RU > 400)
|
|
|
- {
|
|
|
- RU = attributes.First<CosmosDBAttribute>().RU;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- RU = options.CollectionThroughput;
|
|
|
- }
|
|
|
- //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
|
|
|
- if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
|
|
|
- {
|
|
|
- Offer offer = CosmosClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
|
|
|
- OfferV2 offerV2 = (OfferV2)offer;
|
|
|
- //更新RU
|
|
|
- if (offerV2.Content.OfferThroughput < RU)
|
|
|
- {
|
|
|
- CosmosClient.ReplaceOfferAsync(new OfferV2(offer, RU));
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
|
|
|
- collectionDefinition.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
|
|
|
-
|
|
|
- // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
|
|
|
- if (!string.IsNullOrEmpty(PartitionKey))
|
|
|
- {
|
|
|
- collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
|
|
|
- }
|
|
|
- // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
|
|
|
- if (RU > CollectionThroughput)
|
|
|
- {
|
|
|
- CollectionThroughput = RU;
|
|
|
- }
|
|
|
- DocumentCollection DocumentCollection = CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
|
|
|
- UriFactory.CreateDatabaseUri(Database), collectionDefinition, new Microsoft.Azure.Documents.Client.RequestOptions { OfferThroughput = CollectionThroughput }).GetAwaiter().GetResult();
|
|
|
- DocumentCollectionDict.TryAdd(CollectionName, DocumentCollection);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- catch (DocumentClientException de)
|
|
|
- {
|
|
|
- Exception baseException = de.GetBaseException();
|
|
|
- //Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- Exception baseException = e.GetBaseException();
|
|
|
- //Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
|
|
|
- }
|
|
|
- finally
|
|
|
- {
|
|
|
- // Console.WriteLine("End of demo, press any key to exit.");
|
|
|
- // Console.ReadKey();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private async Task<DocumentCollection> InitializeCollection<T>()
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- string partitionKey = GetPartitionKey<T>();
|
|
|
- string CollectionName;
|
|
|
- IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
|
|
|
- if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
|
|
|
- {
|
|
|
- CollectionName = attributes.First<CosmosDBAttribute>().Name;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- CollectionName = type.Name;
|
|
|
- }
|
|
|
- return await InitializeCollection(CollectionName, partitionKey);
|
|
|
- }
|
|
|
-
|
|
|
- private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
|
|
|
- {
|
|
|
- /////内存中已经存在这个表则直接返回
|
|
|
- if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection DocumentCollection))
|
|
|
- {
|
|
|
- return DocumentCollection;
|
|
|
- }///如果没有则尝试默认创建
|
|
|
- else
|
|
|
- {
|
|
|
- DocumentCollection documentCollection = new DocumentCollection { Id = CollectionName };
|
|
|
- documentCollection.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
|
|
|
-
|
|
|
- // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
|
|
|
- if (!string.IsNullOrEmpty(PartitionKey))
|
|
|
- {
|
|
|
- documentCollection.PartitionKey.Paths.Add("/" + PartitionKey);
|
|
|
- }
|
|
|
- // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
|
|
|
- documentCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
|
|
|
- UriFactory.CreateDatabaseUri(Database), documentCollection, new RequestOptions { OfferThroughput = CollectionThroughput });
|
|
|
- DocumentCollectionDict.TryAdd(CollectionName, documentCollection);
|
|
|
- return documentCollection;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private string GetPartitionKey<T>()
|
|
|
- {
|
|
|
- Type type = typeof(T);
|
|
|
- return GetPartitionKey(type);
|
|
|
- }
|
|
|
- private string GetPartitionKey(Type type)
|
|
|
- {
|
|
|
- PropertyInfo[] properties = type.GetProperties();
|
|
|
- List<PropertyInfo> attrProperties = new List<PropertyInfo>();
|
|
|
- foreach (PropertyInfo property in properties)
|
|
|
- {
|
|
|
- if (property.Name.Equals("PartitionKey"))
|
|
|
- {
|
|
|
- attrProperties.Add(property);
|
|
|
- break;
|
|
|
- }
|
|
|
- object[] attributes = property.GetCustomAttributes(true);
|
|
|
- foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
|
|
|
- {
|
|
|
- if (attribute is PartitionKeyAttribute)
|
|
|
- {
|
|
|
- attrProperties.Add(property);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (attrProperties.Count <= 0)
|
|
|
- {
|
|
|
- throw new BizException(type.Name +"has no PartitionKey !!!!!!");
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- if (attrProperties.Count == 1)
|
|
|
- {
|
|
|
- return attrProperties[0].Name;
|
|
|
- }
|
|
|
- else { throw new BizException("PartitionKey can only be single!"); }
|
|
|
- }
|
|
|
- }
|
|
|
- public async Task<T> Save<T>(T entity) //where T : object, new()
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- DocumentCollection collection= await InitializeCollection<T>();
|
|
|
- ResourceResponse<Document> doc =
|
|
|
- await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
|
|
|
- //Console.WriteLine(doc.ActivityId);
|
|
|
- return entity;
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- throw new BizException(e.Message);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public async Task<T> Update<T>(T entity)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- ResourceResponse<Document> doc =
|
|
|
- await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
|
|
|
- return entity;
|
|
|
- }
|
|
|
- public async Task<string> ReplaceObject<T>(T entity, string key)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- try
|
|
|
- {
|
|
|
- ResourceResponse<Document> doc =
|
|
|
- await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key), entity);
|
|
|
- return key;
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- Console.WriteLine("{0} Exception caught.", e);
|
|
|
- //return false;
|
|
|
- }
|
|
|
- return null;
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- try
|
|
|
- {
|
|
|
- ResourceResponse<Document> doc =
|
|
|
- await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key),
|
|
|
- entity,
|
|
|
- new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
|
|
|
- return key;
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- throw new BizException(e.Message);
|
|
|
- //Console.WriteLine("{0} Exception caught.", e);
|
|
|
- //return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public async Task<List<T>> FindAll<T>()
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- Boolean open = true;
|
|
|
- List<T> objs = new List<T>();
|
|
|
-
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- //查询条数 -1是全部
|
|
|
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
|
|
|
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), queryOptions).AsDocumentQuery();
|
|
|
- while (query.HasMoreResults)
|
|
|
- {
|
|
|
- foreach (T obj in await query.ExecuteNextAsync())
|
|
|
- {
|
|
|
- objs.Add(obj);
|
|
|
- }
|
|
|
- }
|
|
|
- return objs;
|
|
|
- //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
-
|
|
|
- List<T> objs = new List<T>();
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- //查询条数 -1是全部
|
|
|
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
|
|
|
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
|
|
|
-
|
|
|
- // query.Where();
|
|
|
- return objs;
|
|
|
- //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public async Task<List<T>> FindSQL<T>(string sql)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- //List<T> objs = new List<T>();
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), sql);
|
|
|
- //foreach (var item in query)
|
|
|
- //{
|
|
|
- // objs.Add(item);
|
|
|
- //}
|
|
|
- return query.ToList<T>();
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- //List<T> objs = new List<T>();
|
|
|
- // Boolean open = IsPk;
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- //查询条数 -1是全部
|
|
|
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
|
|
|
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql, queryOptions);
|
|
|
- //foreach (var item in query)
|
|
|
- //{
|
|
|
- // objs.Add(item);
|
|
|
- //}
|
|
|
- return query.ToList<T>();
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
|
|
|
- {
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- Type t = typeof(T);
|
|
|
- Boolean open = true;
|
|
|
- List<Filter> filters = new List<Filter>();
|
|
|
-
|
|
|
- string PKname = "";
|
|
|
- PropertyInfo[] properties = t.GetProperties();
|
|
|
- foreach (PropertyInfo property in properties)
|
|
|
- {
|
|
|
- object[] attributes = property.GetCustomAttributes(true);
|
|
|
- foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
|
|
|
- {
|
|
|
- if (attribute is PartitionKeyAttribute)
|
|
|
- {
|
|
|
- PKname = property.Name;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- foreach (string key in dict.Keys)
|
|
|
- {
|
|
|
- //if (t.Name.Equals(key)) {
|
|
|
- // open = false;
|
|
|
- //}
|
|
|
-
|
|
|
- if (PKname.Equals(key))
|
|
|
- {
|
|
|
- open = false;
|
|
|
- }
|
|
|
- filters.Add(new Filter { Contrast = "and", Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- //List<T> objs = new List<T>();
|
|
|
- await InitializeCollection<T>();
|
|
|
- //查询条数 -1是全部
|
|
|
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
|
|
|
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
|
|
|
- List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
|
|
|
- return list;
|
|
|
- //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
|
|
|
-
|
|
|
- }
|
|
|
- public async Task<string> DeleteAsync<T>(string id)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- ResourceResponse<Document> doc =
|
|
|
- await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, id));
|
|
|
- //Console.WriteLine(doc.ActivityId);
|
|
|
- return id;
|
|
|
- }
|
|
|
- public async Task<T> DeleteAsync<T>(T entity)
|
|
|
- {
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- Type t = typeof(T);
|
|
|
- string PartitionKey = GetPartitionKey<T>();
|
|
|
- if (!string.IsNullOrEmpty(PartitionKey))
|
|
|
- {
|
|
|
- string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
|
|
|
- string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
|
|
|
- ResourceResponse<Document> doc =
|
|
|
- await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
|
|
|
- ResourceResponse<Document> doc =
|
|
|
- await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue));
|
|
|
- }
|
|
|
- //log4net 日志記錄
|
|
|
- string uuidKey = Guid.NewGuid().ToString();
|
|
|
- string logkey = "【" + uuidKey + "】";
|
|
|
- LogHelper.Info(this,
|
|
|
- logkey
|
|
|
- +"【删除】"
|
|
|
- + "【表】"
|
|
|
- + t.Name
|
|
|
- + "【数据】"
|
|
|
- + entity.ToJson()
|
|
|
- + logkey);
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- //Console.WriteLine(doc.ActivityId);
|
|
|
- return entity;
|
|
|
-
|
|
|
- }
|
|
|
- public async Task<string> DeleteAsync<T>(string id, string partitionKey)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
-
|
|
|
- DocumentCollection collection = await InitializeCollection<T>();
|
|
|
- ResourceResponse<Document> doc =
|
|
|
- await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
|
|
|
- //Console.WriteLine(doc.ActivityId);
|
|
|
- return id;
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public async Task<List<T>> SaveAll<T>(List<T> enyites)
|
|
|
- {
|
|
|
- DocumentCollection dataCollection = await InitializeCollection<T>();
|
|
|
- // Set retry options high for initialization (default values).
|
|
|
- CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
|
|
|
- CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
|
|
|
- IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
|
|
|
- await bulkExecutor.InitializeAsync();
|
|
|
- // Set retries to 0 to pass control to bulk executor.
|
|
|
- CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
|
|
|
- CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
|
|
|
- BulkImportResponse bulkImportResponse = null;
|
|
|
- long totalNumberOfDocumentsInserted = 0;
|
|
|
- double totalRequestUnitsConsumed = 0;
|
|
|
- double totalTimeTakenSec = 0;
|
|
|
- var tokenSource = new CancellationTokenSource();
|
|
|
- var token = tokenSource.Token;
|
|
|
- int pageSize = 50;
|
|
|
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
-
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
- {
|
|
|
- List<string> documentsToImportInBatch = new List<string>();
|
|
|
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- for (int j = 0; j < lists.Count; j++)
|
|
|
- {
|
|
|
- documentsToImportInBatch.Add(lists[j].ToJson());
|
|
|
- }
|
|
|
- var tasks = new List<Task>
|
|
|
- { Task.Run(async () =>
|
|
|
- {
|
|
|
- do
|
|
|
- {
|
|
|
- bulkImportResponse = await bulkExecutor.BulkImportAsync(
|
|
|
- documents: documentsToImportInBatch,
|
|
|
- enableUpsert: true,
|
|
|
- disableAutomaticIdGeneration: true,
|
|
|
- maxConcurrencyPerPartitionKeyRange: null,
|
|
|
- maxInMemorySortingBatchSize: null,
|
|
|
- cancellationToken: token);
|
|
|
-
|
|
|
- } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
|
|
|
- totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
|
|
|
- totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
|
|
|
- totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
|
|
|
- },
|
|
|
- token)
|
|
|
- };
|
|
|
- await Task.WhenAll(tasks);
|
|
|
- }
|
|
|
- return enyites;
|
|
|
- }
|
|
|
- public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
|
|
|
- {
|
|
|
-
|
|
|
- DocumentCollection dataCollection = await InitializeCollection<T>();
|
|
|
- IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
|
|
|
- await bulkExecutor.InitializeAsync();
|
|
|
- BulkUpdateResponse bulkUpdateResponse = null;
|
|
|
- long totalNumberOfDocumentsUpdated = 0;
|
|
|
-
|
|
|
- double totalRequestUnitsConsumed = 0;
|
|
|
- double totalTimeTakenSec = 0;
|
|
|
-
|
|
|
- var tokenSource = new CancellationTokenSource();
|
|
|
- var token = tokenSource.Token;
|
|
|
- // Generate update operations.
|
|
|
- List<UpdateOperation> updateOperations = new List<UpdateOperation>();
|
|
|
- // Unset the description field.
|
|
|
- if (null != updateFilters && updateFilters.Count > 0)
|
|
|
- {
|
|
|
- var keys = updateFilters.Keys;
|
|
|
- foreach (string key in keys)
|
|
|
- {
|
|
|
- // updateOperations.Add(new SetUpdateOperation<string>())
|
|
|
- if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
|
|
|
- {
|
|
|
- updateOperations.Add(SwitchType(key, updateFilters[key]));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (deleteKeys.IsNotEmpty())
|
|
|
- {
|
|
|
- foreach (string key in deleteKeys)
|
|
|
- {
|
|
|
- updateOperations.Add(new UnsetUpdateOperation(key));
|
|
|
- }
|
|
|
- }
|
|
|
- List<T> list = await FindByParams<T>(dict);
|
|
|
- int pageSize = 100;
|
|
|
- int pages = (int)Math.Ceiling((double)list.Count / pageSize);
|
|
|
- string partitionKey = "/" + GetPartitionKey<T>();
|
|
|
- Type type = typeof(T);
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
- {
|
|
|
- List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
|
|
|
- List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- for (int j = 0; j < lists.Count; j++)
|
|
|
- {
|
|
|
- string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
|
|
|
- string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
|
|
|
- updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
|
|
|
- }
|
|
|
- var tasks = new List<Task>
|
|
|
- { Task.Run(async () =>
|
|
|
- {
|
|
|
- do
|
|
|
- {
|
|
|
- //try
|
|
|
- //{
|
|
|
- bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
|
|
|
- updateItems: updateItemsInBatch,
|
|
|
- maxConcurrencyPerPartitionKeyRange: null,
|
|
|
- cancellationToken: token);
|
|
|
- //}
|
|
|
- //catch (DocumentClientException de)
|
|
|
- //{
|
|
|
- // break;
|
|
|
- //}
|
|
|
- //catch (Exception e)
|
|
|
- //{
|
|
|
- // break;
|
|
|
- //}
|
|
|
- } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
|
|
|
- totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
|
|
|
- totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
|
|
|
- totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
|
|
|
- },
|
|
|
- token)
|
|
|
- };
|
|
|
- await Task.WhenAll(tasks);
|
|
|
- }
|
|
|
- return list;
|
|
|
- }
|
|
|
- public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
|
|
|
- {
|
|
|
- DocumentCollection dataCollection = await InitializeCollection<T>();
|
|
|
- List<T> list = await FindByDict<T>(dict);
|
|
|
- List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
|
|
|
- string PartitionKey = GetPartitionKey<T>();
|
|
|
- if (list.IsNotEmpty())
|
|
|
- {
|
|
|
- foreach (T t in list)
|
|
|
- {
|
|
|
-
|
|
|
- string pkValue = t.GetType().GetProperty(PartitionKey).GetValue(t).ToString();
|
|
|
- string id = t.GetType().GetProperty("id").GetValue(t) + "";
|
|
|
- pkIdTuplesToDelete.Add(new Tuple<string, string>(pkValue, id));
|
|
|
-
|
|
|
- //log4net 日志記錄
|
|
|
- string uuidKey = Guid.NewGuid().ToString();
|
|
|
- string logkey = "【" + uuidKey + "】";
|
|
|
- LogHelper.Info(this,
|
|
|
- logkey
|
|
|
- + "【删除】"
|
|
|
- + "【表】"
|
|
|
- + t.GetType()
|
|
|
- + "【数据】"
|
|
|
- + t.ToJson()
|
|
|
- + logkey);
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- return list;
|
|
|
- }
|
|
|
- long totalNumberOfDocumentsDeleted = 0;
|
|
|
- double totalRequestUnitsConsumed = 0;
|
|
|
- double totalTimeTakenSec = 0;
|
|
|
- BulkDeleteResponse bulkDeleteResponse = null;
|
|
|
-
|
|
|
- BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
|
|
|
- await bulkExecutor.InitializeAsync();
|
|
|
-
|
|
|
- try
|
|
|
- {
|
|
|
- bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
|
|
|
- totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
|
|
|
- totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
|
|
|
- totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
|
|
|
- }
|
|
|
- catch (DocumentClientException de)
|
|
|
- {
|
|
|
- throw new BizException("Document client exception: {0}"+ de);
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- throw new BizException("Exception: "+ e);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- return list;
|
|
|
- }
|
|
|
- private static UpdateOperation SwitchType(string key, object obj)
|
|
|
- {
|
|
|
- Type s = obj.GetType();
|
|
|
- TypeCode typeCode = Type.GetTypeCode(s);
|
|
|
- return typeCode switch
|
|
|
- {
|
|
|
- TypeCode.String => new SetUpdateOperation<string>(key, obj.ToString()),
|
|
|
- TypeCode.Int32 => new SetUpdateOperation<Int32>(key, (Int32)obj),
|
|
|
- TypeCode.Double => new SetUpdateOperation<Double>(key, (Double)obj),
|
|
|
- TypeCode.Byte => new SetUpdateOperation<Byte>(key, (Byte)obj),
|
|
|
- TypeCode.Boolean => new SetUpdateOperation<Boolean>(key, (Boolean)obj),
|
|
|
- TypeCode.DateTime => new SetUpdateOperation<DateTime>(key, (DateTime)obj),
|
|
|
- TypeCode.Int64 => new SetUpdateOperation<Int64>(key, (Int64)obj),
|
|
|
- _ => null,
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk=true)
|
|
|
- {
|
|
|
- Type t = typeof(T);
|
|
|
- // List<T> objs = new List<T>();
|
|
|
- DocumentCollection collection= await InitializeCollection<T>();
|
|
|
- StringBuilder sql = new StringBuilder("select value(c) from c");
|
|
|
- SQLHelper.GetSQL(dict,ref sql);
|
|
|
- //查询条数 -1是全部
|
|
|
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
|
|
|
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql.ToString(), queryOptions);
|
|
|
- //foreach (var item in query)
|
|
|
- //{
|
|
|
- // objs.Add(item);
|
|
|
- //}
|
|
|
- return query.ToList<T>();
|
|
|
- }
|
|
|
-
|
|
|
- private static string GenSql(object obj, string key)
|
|
|
- {
|
|
|
-
|
|
|
- if (obj is JArray array)
|
|
|
- {
|
|
|
- StringBuilder sql = new StringBuilder(" and c." + key + " in (");
|
|
|
- foreach (JValue obja in array)
|
|
|
- {
|
|
|
- if (obja.Value is string a)
|
|
|
- {
|
|
|
- sql.Append("'" + a + "',");
|
|
|
- }
|
|
|
- if (obja.Value is int b)
|
|
|
- {
|
|
|
- sql.Append(b + ",");
|
|
|
-
|
|
|
- }
|
|
|
- if (obja.Value is double c)
|
|
|
- {
|
|
|
- sql.Append(c + ",");
|
|
|
- }
|
|
|
- if (obja.Value is bool d)
|
|
|
- {
|
|
|
- sql.Append(d + ",");
|
|
|
-
|
|
|
- }
|
|
|
- if (obja.Value is long e)
|
|
|
- {
|
|
|
- sql.Append(e + ",");
|
|
|
- }
|
|
|
- if (obja.Value is DateTime f)
|
|
|
- {
|
|
|
- sql.Append(f + ",");
|
|
|
- }
|
|
|
- }
|
|
|
- string sqls = sql.ToString().Substring(0, sql.Length - 1);
|
|
|
- sqls += ") ";
|
|
|
- return sqls;
|
|
|
- }
|
|
|
- else if (key.StartsWith("$."))
|
|
|
- {
|
|
|
- Type s = obj.GetType();
|
|
|
- TypeCode typeCode = Type.GetTypeCode(s);
|
|
|
- string keyLike = key.Replace("$.", "");
|
|
|
- return typeCode switch
|
|
|
- {
|
|
|
- TypeCode.String => "and " + "Contains( c." + keyLike + " , \'" + obj.ToString() + "\') = true ",
|
|
|
- TypeCode.Int32 => "and " + "Contains( c." + keyLike + " , \'" + int.Parse(obj.ToString()) + "\') = true ",
|
|
|
- TypeCode.Double => "and " + "Contains( c." + keyLike + " , \'" + double.Parse(obj.ToString()) + "\') = true ",
|
|
|
- //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
|
|
|
- TypeCode.Boolean => "and " + "Contains( c." + keyLike + " , \'" + bool.Parse(obj.ToString()) + "\') = true ",
|
|
|
- TypeCode.DateTime => "and " + "Contains( c." + keyLike + " , \'" + (DateTime)obj + "\') = true ",
|
|
|
- TypeCode.Int64 => "and " + "Contains( c." + keyLike + " , \'" + long.Parse(obj.ToString()) + "\') = true ",
|
|
|
- _ => null,
|
|
|
- };
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
-
|
|
|
- Type s = obj.GetType();
|
|
|
- TypeCode typeCode = Type.GetTypeCode(s);
|
|
|
-
|
|
|
- return typeCode switch
|
|
|
- {
|
|
|
- TypeCode.String => " and c." + key + "=" + "'" + obj.ToString() + "'",
|
|
|
- TypeCode.Int32 => " and c." + key + "=" + int.Parse(obj.ToString()),
|
|
|
- TypeCode.Double => " and c." + key + "=" + double.Parse(obj.ToString()),
|
|
|
- //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
|
|
|
- TypeCode.Boolean => " and c." + key + "=" + bool.Parse(obj.ToString()),
|
|
|
- TypeCode.DateTime => " and c." + key + "=" + (DateTime)obj,
|
|
|
- TypeCode.Int64 => " and c." + key + "=" + long.Parse(obj.ToString()),
|
|
|
- _ => null,
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public IQueryable<dynamic> FindByDict(string CollectionName, Dictionary<string, object> dict)
|
|
|
- {
|
|
|
- if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
|
|
|
- {
|
|
|
- // collection = await InitializeCollection(CollectionName, "");
|
|
|
- /* StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
|
|
|
- if (dict != null)
|
|
|
- {
|
|
|
- foreach (string key in dict.Keys)
|
|
|
- {
|
|
|
- sql.Append(GenSql(dict[key], key));
|
|
|
- }
|
|
|
- }*/
|
|
|
- StringBuilder sql = new StringBuilder("select value(c) from c");
|
|
|
- SQLHelper.GetSQL(dict,ref sql);
|
|
|
- FeedOptions queryOptions;
|
|
|
- if (collection.PartitionKey.Paths.Count > 0)
|
|
|
- {
|
|
|
- queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
|
|
|
- }
|
|
|
- //查询条数 -1是全部
|
|
|
- var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
|
|
|
- return query;
|
|
|
- }
|
|
|
- else {
|
|
|
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- public IQueryable<dynamic> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
|
|
|
- {
|
|
|
- if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
|
|
|
- {
|
|
|
-
|
|
|
- StringBuilder sql = new StringBuilder("select value count(c) from c");
|
|
|
- SQLHelper.GetSQL(dict, ref sql);
|
|
|
- FeedOptions queryOptions;
|
|
|
- if (collection.PartitionKey.Paths.Count > 0)
|
|
|
- {
|
|
|
- queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
|
|
|
- }
|
|
|
- //查询条数 -1是全部
|
|
|
- var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
|
|
|
- return query;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- private static void GetPageNum(Dictionary<string, object> dict, ref int offsetNum, ref int limitNum, ref bool pageBool)
|
|
|
- {
|
|
|
- dict.TryGetValue("OFFSET", out object offset);
|
|
|
- dict.Remove("OFFSET");
|
|
|
- dict.TryGetValue("LIMIT", out object limit);
|
|
|
- dict.Remove("LIMIT");
|
|
|
- if (offset != null && limit != null)
|
|
|
- {
|
|
|
- pageBool = true;
|
|
|
- offsetNum = int.Parse(offset.ToString());
|
|
|
- limitNum = int.Parse(limit.ToString());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|