123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939 |
- using System;
- using System.Collections.Generic;
- using System.Threading.Tasks;
- using System.Linq;
- using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
- using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
- using Microsoft.Azure.Documents.Client;
- using Microsoft.Azure.Documents;
- using TEAMModelOS.SDK.Helper.Security.AESCrypt;
- using TEAMModelOS.SDK.Context.Exception;
- using Microsoft.Azure.Documents.Linq;
- using TEAMModelOS.SDK.Helper.Query.LinqHelper;
- using System.Reflection;
- using Microsoft.Azure.Cosmos;
- using Microsoft.Azure.CosmosDB.BulkExecutor;
- using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
- using System.Threading;
- using TEAMModelOS.SDK.Helper.Common.JsonHelper;
- using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
- using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
- using Microsoft.Azure.CosmosDB.BulkExecutor.BulkDelete;
- using TEAMModelOS.SDK.Context.Attributes.Azure;
- using System.Text;
- using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
- using Microsoft.AspNetCore.Hosting;
- using System.Collections.Concurrent;
- using DataType = Microsoft.Azure.Documents.DataType;
- using RequestOptions = Microsoft.Azure.Documents.Client.RequestOptions;
- using PartitionKey = Microsoft.Azure.Documents.PartitionKey;
- using Newtonsoft.Json.Linq;
- namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
- { /// <summary>
- /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
- /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
- /// </summary>
- public class AzureCosmosDBRepository : IAzureCosmosDBRepository
- {
- /// <summary>
- /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
- /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
- /// </summary>
- private DocumentClient CosmosClient { get; set; }
- /// <summary>
- /// 线程安全的dict类型
- /// </summary>
- private ConcurrentDictionary<string, DocumentCollection> DocumentCollectionDict { get; set; } = new ConcurrentDictionary<string, DocumentCollection>();
- // private DocumentCollection CosmosCollection { get; set; }
- private string Database { get; set; }
- private int CollectionThroughput { get; set; }
- public AzureCosmosDBRepository(AzureCosmosDBOptions options)
- {
- try
- {
- if (!string.IsNullOrEmpty(options.ConnectionString))
- {
- CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
- }
- else
- {
- throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
- }
- Database = options.Database;
- CollectionThroughput = options.CollectionThroughput;
- CosmosClient.CreateDatabaseIfNotExistsAsync(new Microsoft.Azure.Documents.Database { Id = Database });
- // _connectionString = options.ConnectionString;
- // CosmosSerializer
- //获取数据库所有的表
- Microsoft.Azure.Documents.Client.FeedResponse<DocumentCollection> collections = CosmosClient.ReadDocumentCollectionFeedAsync(UriFactory.CreateDatabaseUri(Database)).GetAwaiter().GetResult();
- foreach (IGrouping<string, DocumentCollection> group in collections.GroupBy(c => c.Id))
- {
- DocumentCollectionDict.TryAdd(group.Key, group.First());
- }
- //collections
- List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(options.ScanModel);
- foreach (Type type in types)
- {
- string PartitionKey = GetPartitionKey(type);
- string CollectionName = "";
- int RU = 0;
- IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
- if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
- {
- CollectionName = attributes.First<CosmosDBAttribute>().Name;
- }
- else
- {
- CollectionName = type.Name;
- }
- if (attributes.First<CosmosDBAttribute>().RU > 400)
- {
- RU = attributes.First<CosmosDBAttribute>().RU;
- }
- else
- {
- RU = options.CollectionThroughput;
- }
- //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
- if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
- {
- Offer offer = CosmosClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
- OfferV2 offerV2 = (OfferV2)offer;
- //更新RU
- if (offerV2.Content.OfferThroughput < RU)
- {
- CosmosClient.ReplaceOfferAsync(new OfferV2(offer, RU));
- }
- }
- else
- {
- DocumentCollection collectionDefinition = new DocumentCollection { Id = CollectionName };
- collectionDefinition.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
- // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
- if (!string.IsNullOrEmpty(PartitionKey))
- {
- collectionDefinition.PartitionKey.Paths.Add("/" + PartitionKey);
- }
- // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
- if (RU > CollectionThroughput)
- {
- CollectionThroughput = RU;
- }
- DocumentCollection DocumentCollection = CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
- UriFactory.CreateDatabaseUri(Database), collectionDefinition, new Microsoft.Azure.Documents.Client.RequestOptions { OfferThroughput = CollectionThroughput }).GetAwaiter().GetResult();
- DocumentCollectionDict.TryAdd(CollectionName, DocumentCollection);
- }
- }
- }
- catch (DocumentClientException de)
- {
- Exception baseException = de.GetBaseException();
- //Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
- }
- catch (Exception e)
- {
- Exception baseException = e.GetBaseException();
- //Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
- }
- finally
- {
- // Console.WriteLine("End of demo, press any key to exit.");
- // Console.ReadKey();
- }
- }
- private async Task<DocumentCollection> InitializeCollection<T>()
- {
- Type type = typeof(T);
- string partitionKey = GetPartitionKey<T>();
- string CollectionName;
- IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
- if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
- {
- CollectionName = attributes.First<CosmosDBAttribute>().Name;
- }
- else
- {
- CollectionName = type.Name;
- }
- return await InitializeCollection(CollectionName, partitionKey);
- }
- private async Task<DocumentCollection> InitializeCollection(string CollectionName, string PartitionKey)
- {
- /////内存中已经存在这个表则直接返回
- if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection DocumentCollection))
- {
- return DocumentCollection;
- }///如果没有则尝试默认创建
- else
- {
- DocumentCollection documentCollection = new DocumentCollection { Id = CollectionName };
- documentCollection.IndexingPolicy = new Microsoft.Azure.Documents.IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
- // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
- if (!string.IsNullOrEmpty(PartitionKey))
- {
- documentCollection.PartitionKey.Paths.Add("/" + PartitionKey);
- }
- // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
- documentCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
- UriFactory.CreateDatabaseUri(Database), documentCollection, new RequestOptions { OfferThroughput = CollectionThroughput });
- DocumentCollectionDict.TryAdd(CollectionName, documentCollection);
- return documentCollection;
- }
- }
- private string GetPartitionKey<T>()
- {
- Type type = typeof(T);
- return GetPartitionKey(type);
- }
- private string GetPartitionKey(Type type)
- {
- PropertyInfo[] properties = type.GetProperties();
- List<PropertyInfo> attrProperties = new List<PropertyInfo>();
- foreach (PropertyInfo property in properties)
- {
- if (property.Name.Equals("PartitionKey"))
- {
- attrProperties.Add(property);
- break;
- }
- object[] attributes = property.GetCustomAttributes(true);
- foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
- {
- if (attribute is PartitionKeyAttribute)
- {
- attrProperties.Add(property);
- }
- }
- }
- if (attrProperties.Count <= 0)
- {
- throw new BizException(type.Name +"has no PartitionKey !!!!!!");
- }
- else
- {
- if (attrProperties.Count == 1)
- {
- return attrProperties[0].Name;
- }
- else { throw new BizException("PartitionKey can only be single!"); }
- }
- }
- public async Task<T> Save<T>(T entity) //where T : object, new()
- {
- try
- {
- Type t = typeof(T);
- DocumentCollection collection= await InitializeCollection<T>();
- ResourceResponse<Document> doc =
- await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
- //Console.WriteLine(doc.ActivityId);
- return entity;
- }
- catch (Exception e)
- {
- throw new BizException(e.Message);
- }
- }
- public async Task<T> Update<T>(T entity)
- {
- Type t = typeof(T);
- DocumentCollection collection = await InitializeCollection<T>();
- ResourceResponse<Document> doc =
- await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), entity);
- return entity;
- }
- public async Task<string> ReplaceObject<T>(T entity, string key)
- {
- Type t = typeof(T);
- DocumentCollection collection = await InitializeCollection<T>();
- try
- {
- ResourceResponse<Document> doc =
- await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key), entity);
- return key;
- }
- catch (Exception e)
- {
- Console.WriteLine("{0} Exception caught.", e);
- //return false;
- }
- return null;
- }
- public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
- {
- Type t = typeof(T);
- DocumentCollection collection = await InitializeCollection<T>();
- try
- {
- ResourceResponse<Document> doc =
- await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, key),
- entity,
- new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
- return key;
- }
- catch (Exception e)
- {
- throw new BizException(e.Message);
- //Console.WriteLine("{0} Exception caught.", e);
- //return false;
- }
- }
- public async Task<List<T>> FindAll<T>()
- {
- Type t = typeof(T);
- Boolean open = true;
- List<T> objs = new List<T>();
- DocumentCollection collection = await InitializeCollection<T>();
- //查询条数 -1是全部
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), queryOptions).AsDocumentQuery();
- while (query.HasMoreResults)
- {
- foreach (T obj in await query.ExecuteNextAsync())
- {
- objs.Add(obj);
- }
- }
- return objs;
- //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
- }
- public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
- {
- Type t = typeof(T);
- List<T> objs = new List<T>();
- DocumentCollection collection = await InitializeCollection<T>();
- //查询条数 -1是全部
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
- // query.Where();
- return objs;
- //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
- }
- public async Task<List<T>> FindSQL<T>(string sql)
- {
- Type t = typeof(T);
- //List<T> objs = new List<T>();
- DocumentCollection collection = await InitializeCollection<T>();
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database,collection.Id), sql);
- //foreach (var item in query)
- //{
- // objs.Add(item);
- //}
- return query.ToList<T>();
- }
- public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
- {
- Type t = typeof(T);
- //List<T> objs = new List<T>();
- // Boolean open = IsPk;
- DocumentCollection collection = await InitializeCollection<T>();
- //查询条数 -1是全部
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql, queryOptions);
- //foreach (var item in query)
- //{
- // objs.Add(item);
- //}
- return query.ToList<T>();
- }
- public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict)
- {
- DocumentCollection collection = await InitializeCollection<T>();
- Type t = typeof(T);
- Boolean open = true;
- List<Filter> filters = new List<Filter>();
- string PKname = "";
- PropertyInfo[] properties = t.GetProperties();
- foreach (PropertyInfo property in properties)
- {
- object[] attributes = property.GetCustomAttributes(true);
- foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
- {
- if (attribute is PartitionKeyAttribute)
- {
- PKname = property.Name;
- break;
- }
- }
- }
- foreach (string key in dict.Keys)
- {
- //if (t.Name.Equals(key)) {
- // open = false;
- //}
- if (PKname.Equals(key))
- {
- open = false;
- }
- filters.Add(new Filter { Contrast = "and", Key = key, Value = dict[key] != null ? dict[key].ToString() : throw new Exception("参数值不能为null") });
- }
- //List<T> objs = new List<T>();
- await InitializeCollection<T>();
- //查询条数 -1是全部
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), queryOptions);
- List<T> list = DynamicLinq.GenerateFilter<T>(query, filters).ToList();
- return list;
- //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
- }
- public async Task<string> DeleteAsync<T>(string id)
- {
- Type t = typeof(T);
- DocumentCollection collection = await InitializeCollection<T>();
- ResourceResponse<Document> doc =
- await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database,collection.Id, id));
- //Console.WriteLine(doc.ActivityId);
- return id;
- }
- public async Task<T> DeleteAsync<T>(T entity)
- {
- DocumentCollection collection = await InitializeCollection<T>();
- Type t = typeof(T);
- string PartitionKey = GetPartitionKey<T>();
- if (!string.IsNullOrEmpty(PartitionKey))
- {
- string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
- string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
- ResourceResponse<Document> doc =
- await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
- }
- else
- {
- string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
- ResourceResponse<Document> doc =
- await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, idValue));
- }
- //Console.WriteLine(doc.ActivityId);
- return entity;
- }
- public async Task<string> DeleteAsync<T>(string id, string partitionKey)
- {
- Type t = typeof(T);
- DocumentCollection collection = await InitializeCollection<T>();
- ResourceResponse<Document> doc =
- await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, collection.Id, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
- //Console.WriteLine(doc.ActivityId);
- return id;
- }
- public async Task<List<T>> SaveAll<T>(List<T> enyites)
- {
- DocumentCollection dataCollection = await InitializeCollection<T>();
- // Set retry options high for initialization (default values).
- CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
- CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
- IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
- await bulkExecutor.InitializeAsync();
- // Set retries to 0 to pass control to bulk executor.
- CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
- CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
- BulkImportResponse bulkImportResponse = null;
- long totalNumberOfDocumentsInserted = 0;
- double totalRequestUnitsConsumed = 0;
- double totalTimeTakenSec = 0;
- var tokenSource = new CancellationTokenSource();
- var token = tokenSource.Token;
- int pageSize = 100;
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- for (int i = 0; i < pages; i++)
- {
- List<string> documentsToImportInBatch = new List<string>();
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- for (int j = 0; j < lists.Count; j++)
- {
- documentsToImportInBatch.Add(lists[j].ToJson());
- }
- var tasks = new List<Task>
- { Task.Run(async () =>
- {
- do
- {
- //try
- //{
- bulkImportResponse = await bulkExecutor.BulkImportAsync(
- documents: documentsToImportInBatch,
- enableUpsert: true,
- disableAutomaticIdGeneration: true,
- maxConcurrencyPerPartitionKeyRange: null,
- maxInMemorySortingBatchSize: null,
- cancellationToken: token);
- //}
- //catch (DocumentClientException de)
- //{
- // break;
- //}
- //catch (Exception e)
- //{
- // break;
- //}
- } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
- totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
- totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
- totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
- },
- token)
- };
- await Task.WhenAll(tasks);
- }
- return enyites;
- }
- public async Task<List<T>> UpdateAll<T>(Dictionary<string, object> dict, Dictionary<string, object> updateFilters, List<string> deleteKeys = null)
- {
- DocumentCollection dataCollection = await InitializeCollection<T>();
- IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
- await bulkExecutor.InitializeAsync();
- BulkUpdateResponse bulkUpdateResponse = null;
- long totalNumberOfDocumentsUpdated = 0;
- double totalRequestUnitsConsumed = 0;
- double totalTimeTakenSec = 0;
- var tokenSource = new CancellationTokenSource();
- var token = tokenSource.Token;
- // Generate update operations.
- List<UpdateOperation> updateOperations = new List<UpdateOperation>();
- // Unset the description field.
- if (null != updateFilters && updateFilters.Count > 0)
- {
- var keys = updateFilters.Keys;
- foreach (string key in keys)
- {
- // updateOperations.Add(new SetUpdateOperation<string>())
- if (updateFilters[key] != null && !string.IsNullOrEmpty(updateFilters[key].ToString()))
- {
- updateOperations.Add(SwitchType(key, updateFilters[key]));
- }
- }
- }
- if (deleteKeys.IsNotEmpty())
- {
- foreach (string key in deleteKeys)
- {
- updateOperations.Add(new UnsetUpdateOperation(key));
- }
- }
- List<T> list = await FindByParams<T>(dict);
- int pageSize = 100;
- int pages = (int)Math.Ceiling((double)list.Count / pageSize);
- string partitionKey = "/" + GetPartitionKey<T>();
- Type type = typeof(T);
- for (int i = 0; i < pages; i++)
- {
- List<UpdateItem> updateItemsInBatch = new List<UpdateItem>();
- List<T> lists = list.Skip((i) * pageSize).Take(pageSize).ToList();
- for (int j = 0; j < lists.Count; j++)
- {
- string partitionKeyValue = lists[j].GetType().GetProperty(partitionKey).GetValue(lists[j]) + "";
- string id = lists[j].GetType().GetProperty("id").GetValue(lists[j]) + "";
- updateItemsInBatch.Add(new UpdateItem(id, partitionKeyValue, updateOperations));
- }
- var tasks = new List<Task>
- { Task.Run(async () =>
- {
- do
- {
- //try
- //{
- bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
- updateItems: updateItemsInBatch,
- maxConcurrencyPerPartitionKeyRange: null,
- cancellationToken: token);
- //}
- //catch (DocumentClientException de)
- //{
- // break;
- //}
- //catch (Exception e)
- //{
- // break;
- //}
- } while (bulkUpdateResponse.NumberOfDocumentsUpdated < updateItemsInBatch.Count);
- totalNumberOfDocumentsUpdated += bulkUpdateResponse.NumberOfDocumentsUpdated;
- totalRequestUnitsConsumed += bulkUpdateResponse.TotalRequestUnitsConsumed;
- totalTimeTakenSec += bulkUpdateResponse.TotalTimeTaken.TotalSeconds;
- },
- token)
- };
- await Task.WhenAll(tasks);
- }
- return list;
- }
- public async Task<List<T>> DeleteAll<T>(Dictionary<string, object> dict)
- {
- DocumentCollection dataCollection = await InitializeCollection<T>();
- List<T> list = await FindByParams<T>(dict);
- List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
- if (list.IsNotEmpty())
- {
- foreach (T t in list)
- {
- string id = t.GetType().GetProperty("id").GetValue(t) + "";
- pkIdTuplesToDelete.Add(new Tuple<string, string>(id, id));
- }
- }
- else
- {
- return null;
- }
- long totalNumberOfDocumentsDeleted = 0;
- double totalRequestUnitsConsumed = 0;
- double totalTimeTakenSec = 0;
- BulkDeleteResponse bulkDeleteResponse = null;
- BulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
- await bulkExecutor.InitializeAsync();
- bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
- totalNumberOfDocumentsDeleted = bulkDeleteResponse.NumberOfDocumentsDeleted;
- totalRequestUnitsConsumed = bulkDeleteResponse.TotalRequestUnitsConsumed;
- totalTimeTakenSec = bulkDeleteResponse.TotalTimeTaken.TotalSeconds;
- return list;
- }
- private static UpdateOperation SwitchType(string key, object obj)
- {
- Type s = obj.GetType();
- TypeCode typeCode = Type.GetTypeCode(s);
- return typeCode switch
- {
- TypeCode.String => new SetUpdateOperation<string>(key, obj.ToString()),
- TypeCode.Int32 => new SetUpdateOperation<Int32>(key, (Int32)obj),
- TypeCode.Double => new SetUpdateOperation<Double>(key, (Double)obj),
- TypeCode.Byte => new SetUpdateOperation<Byte>(key, (Byte)obj),
- TypeCode.Boolean => new SetUpdateOperation<Boolean>(key, (Boolean)obj),
- TypeCode.DateTime => new SetUpdateOperation<DateTime>(key, (DateTime)obj),
- TypeCode.Int64 => new SetUpdateOperation<Int64>(key, (Int64)obj),
- _ => null,
- };
- }
- public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, bool IsPk=true)
- {
- Type t = typeof(T);
- // List<T> objs = new List<T>();
- DocumentCollection collection= await InitializeCollection<T>();
- StringBuilder sql = new StringBuilder("select * from c where 1=1 ");
- if (dict != null)
- {
- foreach (string key in dict.Keys)
- {
- sql.Append(GenSql(dict[key], key));
- }
- }
- //查询条数 -1是全部
- FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = IsPk };
- var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, collection.Id), sql.ToString(), queryOptions);
- //foreach (var item in query)
- //{
- // objs.Add(item);
- //}
- return query.ToList<T>();
- }
- private static string GenSql(object obj, string key)
- {
- if (obj is JArray array)
- {
- StringBuilder sql = new StringBuilder(" and c." + key + " in (");
- foreach (JValue obja in array) {
- if (obja.Value is string a) {
- sql.Append("'" + a + "',");
- }
- if (obja.Value is int b)
- {
- sql.Append(b + ",");
- }
- if (obja.Value is double c)
- {
- sql.Append(c + ",");
- }
- if (obja.Value is bool d)
- {
- sql.Append(d + ",");
- }
- if (obja.Value is long e)
- {
- sql.Append(e + ",");
- }
- if (obja.Value is DateTime f)
- {
- sql.Append(f + ",");
- }
- }
- string sqls= sql.ToString().Substring(0, sql.Length - 1);
- sqls += ") ";
- return sqls;
- }
- // if(va)
- ///处理JsonElement类型
- else {
- Type s = obj.GetType();
- TypeCode typeCode = Type.GetTypeCode(s);
- return typeCode switch
- {
- TypeCode.String => " and c." + key + "=" + "'" + obj.ToString() + "'",
- TypeCode.Int32 => " and c." + key + "=" + int.Parse(obj.ToString()),
- TypeCode.Double => " and c." + key + "=" + double.Parse(obj.ToString()),
- //case TypeCode.Byte: return "and c." + key + "=" + (Byte)obj ;
- TypeCode.Boolean => " and c." + key + "=" + bool.Parse(obj.ToString()),
- TypeCode.DateTime => " and c." + key + "=" + (DateTime)obj,
- TypeCode.Int64 => " and c." + key + "=" + long.Parse(obj.ToString()),
- _ => null,
- };
- }
-
- }
- public IQueryable<dynamic> FindByDict(string CollectionName, Dictionary<string, object> dict)
- {
- if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
- {
- // collection = await InitializeCollection(CollectionName, "");
- /* StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
- if (dict != null)
- {
- foreach (string key in dict.Keys)
- {
- sql.Append(GenSql(dict[key], key));
- }
- }*/
- StringBuilder sql = new StringBuilder("select value(c) from c");
- if (dict != null)
- {
- string Join = " join ";
- string instring = "in ";
- Dictionary<string, string> keyValues = new Dictionary<string, string>();
- StringBuilder ps = new StringBuilder();
- int heada = 0;
- string[] sqlHead = new string[] { "A", "B", "C", "D", "E", "F" };
- int kslength = 0;
- foreach (KeyValuePair<string, object> item in dict)
- {
- int index = 0;
- string[] ks = item.Key.Split("[*]");
- if (ks.Length > 1)
- {
- kslength += ks.Length;
- if (kslength < (7 + heada))
- {
- StringBuilder sqlitem = new StringBuilder();
- for (int i = 0; i < ks.Length - 1; i++)
- {
- //Console.WriteLine(ks[i]);
- if (i == 0)
- {
- sqlitem.Append(Join);
- string a = sqlHead[heada] + index;
- sqlitem.Append(a + " ");
- //keyValues.Add(ks[i], a);
- keyValues[ks[i]] = a;
- sqlitem.Append(instring);
- sqlitem.Append("c.");
- sqlitem.Append(ks[i]);
- }
- else
- {
- sqlitem.Append(Join + " ");
- string a = sqlHead[heada] + index;
- sqlitem.Append(a + " ");
- //keyValues.Add(ks[i], a);
- keyValues[ks[i]] = a;
- sqlitem.Append(instring);
- sqlitem.Append(keyValues[ks[i - 1]]);
- sqlitem.Append(ks[i]);
- }
- index += 1;
- }
- // Console.WriteLine(sqlitem);
- sql.Append(sqlitem);
- string s;
- s = "and " + sqlHead[heada] + (ks.Length - 2) + ks[index] + " = " + "\"" + item.Value.ToString() + "\"";
- ps.Append(s);
- }
- else
- {
- throw new BizException("数组总共深度不能超过5层",ResponseCode.PARAMS_ERROR);
-
- }
- }
- else
- {
- ps.Append(GenSql(dict[item.Key], item.Key));
- }
- heada += 1;
- }
- sql.Append(" where 1=1 ").Append(ps);
- }
- FeedOptions queryOptions;
- if (collection.PartitionKey.Paths.Count > 0)
- {
- queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
- }
- else
- {
- queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
- }
- //查询条数 -1是全部
- var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
- return query;
- }
- else {
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
- }
-
- }
- public IQueryable<dynamic> FindCountByDict(string CollectionName, Dictionary<string, object> dict)
- {
- if (DocumentCollectionDict.TryGetValue(CollectionName, out DocumentCollection collection))
- {
- // collection = await InitializeCollection(CollectionName, "");
- /* StringBuilder sql = new StringBuilder("select * from " + CollectionName + " c where 1=1 ");
- if (dict != null)
- {
- foreach (string key in dict.Keys)
- {
- sql.Append(GenSql(dict[key], key));
- }
- }*/
- StringBuilder sql = new StringBuilder("select value count(c) from c");
- if (dict != null)
- {
- string Join = " join ";
- string instring = "in ";
- Dictionary<string, string> keyValues = new Dictionary<string, string>();
- StringBuilder ps = new StringBuilder();
- int heada = 0;
- string[] sqlHead = new string[] { "A", "B", "C", "D", "E", "F" };
- int kslength = 0;
- foreach (KeyValuePair<string, object> item in dict)
- {
- int index = 0;
- string[] ks = item.Key.Split("[*]");
- if (ks.Length > 1)
- {
- kslength += ks.Length;
- if (kslength < (7 + heada))
- {
- StringBuilder sqlitem = new StringBuilder();
- for (int i = 0; i < ks.Length - 1; i++)
- {
- //Console.WriteLine(ks[i]);
- if (i == 0)
- {
- sqlitem.Append(Join);
- string a = sqlHead[heada] + index;
- sqlitem.Append(a + " ");
- //keyValues.Add(ks[i], a);
- keyValues[ks[i]] = a;
- sqlitem.Append(instring);
- sqlitem.Append("c.");
- sqlitem.Append(ks[i]);
- }
- else
- {
- sqlitem.Append(Join + " ");
- string a = sqlHead[heada] + index;
- sqlitem.Append(a + " ");
- //keyValues.Add(ks[i], a);
- keyValues[ks[i]] = a;
- sqlitem.Append(instring);
- sqlitem.Append(keyValues[ks[i - 1]]);
- sqlitem.Append(ks[i]);
- }
- index += 1;
- }
- // Console.WriteLine(sqlitem);
- sql.Append(sqlitem);
- string s;
- s = "and " + sqlHead[heada] + (ks.Length - 2) + ks[index] + " = " + "\"" + item.Value.ToString() + "\"";
- ps.Append(s);
- }
- else
- {
- throw new BizException("数组总共深度不能超过5层", ResponseCode.PARAMS_ERROR);
- }
- }
- else
- {
- ps.Append(GenSql(dict[item.Key], item.Key));
- }
- heada += 1;
- }
- sql.Append(" where 1=1 ").Append(ps);
- }
- FeedOptions queryOptions;
- if (collection.PartitionKey.Paths.Count > 0)
- {
- queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = true };
- }
- else
- {
- queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = false };
- }
- //查询条数 -1是全部
- var query = CosmosClient.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, CollectionName), sql.ToString(), queryOptions);
- return query;
- }
- else
- {
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
- }
- }
- }
- }
|