123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804 |
- using Microsoft.Azure.Cosmos;
- using Microsoft.Azure.Cosmos.Linq;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.IO;
- using System.Linq;
- using System.Linq.Expressions;
- using System.Net;
- using System.Reflection;
- using System.Text;
- using System.Text.Json;
- using System.Threading.Tasks;
- using TEAMModelOS.SDK.Context.Attributes.Azure;
- using TEAMModelOS.SDK.Context.Exception;
- using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
- using TEAMModelOS.SDK.Helper.Common.ReflectorExtensions;
- using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
- namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
- {
- public class AzureCosmosDBV3Repository : IAzureCosmosDBV3Repository
- {
- private CosmosClient CosmosClient { get; set; }
- /// <summary>
- /// 线程安全的dict类型
- /// </summary>
- private Dictionary<string, Container> DocumentCollectionDict { get; set; } = new Dictionary<string, Container>();
- private string DatabaseId { get; set; }
- private int CollectionThroughput { get; set; }
- private Database database { get; set; }
- int pageSize = 200;
- private string[] ScanModel { get; set; }
- public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
- {
- try
- {
- if (!string.IsNullOrEmpty(options.ConnectionString))
- {
- CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, cosmosSerializer).GetCosmosDBClient();
- }
- else
- {
- throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
- }
- DatabaseId = options.Database;
- CollectionThroughput = options.CollectionThroughput;
- ScanModel = options.ScanModel;
- // InitializeDatabase().GetAwaiter().GetResult();
- }
- catch (CosmosException e)
- {
- // Dispose(true);
- throw new BizException(e.Message, 500, e.StackTrace);
- }
- }
- public AzureCosmosDBV3Repository(AzureCosmosDBOptions options)
- {
- try
- {
- if (!string.IsNullOrEmpty(options.ConnectionString))
- {
- CosmosClient = CosmosDBV3ClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey, null).GetCosmosDBClient();
- }
- else
- {
- throw new BizException("请设置正确的AzureCosmosDB数据库配置信息!");
- }
- DatabaseId = options.Database;
- CollectionThroughput = options.CollectionThroughput;
- ScanModel = options.ScanModel;
- // InitializeDatabase().GetAwaiter().GetResult();
- }
- catch (CosmosException e)
- {
- // Dispose(true);
- throw new BizException(e.Message, 500, e.StackTrace);
- }
- }
- public async Task InitializeDatabase()
- {
- try
- {
- database = await CosmosClient.CreateDatabaseIfNotExistsAsync(DatabaseId, CollectionThroughput);
- FeedIterator<ContainerProperties> resultSetIterator = database.GetContainerQueryIterator<ContainerProperties>();
- while (resultSetIterator.HasMoreResults)
- {
- foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
- {
- DocumentCollectionDict.TryAdd(container.Id, database.GetContainer(container.Id));
- }
- }
- //获取数据库所有的表
- List<Type> types = ReflectorExtensions.GetAllTypeAsAttribute<CosmosDBAttribute>(ScanModel);
- foreach (Type type in types)
- {
- string PartitionKey = GetPartitionKey(type);
- string CollectionName = "";
- int RU = 0;
- IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
- if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
- {
- CollectionName = attributes.First<CosmosDBAttribute>().Name;
- }
- else
- {
- CollectionName = type.Name;
- }
- if (attributes.First<CosmosDBAttribute>().RU > 400)
- {
- RU = attributes.First<CosmosDBAttribute>().RU;
- }
- else
- {
- RU = CollectionThroughput;
- }
- //如果表存在于数据则检查RU是否变动,如果不存在则执行创建DocumentCollection
- if (DocumentCollectionDict.TryGetValue(CollectionName, out Container collection))
- { //更新RU
- int? throughputResponse = await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReadThroughputAsync();
- if (throughputResponse < RU)
- {
- await CosmosClient.GetDatabase(DatabaseId).GetContainer(collection.Id).ReplaceThroughputAsync(RU);
- }
- }
- else
- {
- ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
- if (!string.IsNullOrEmpty(PartitionKey))
- {
- containerProperties.PartitionKeyPath = "/" + PartitionKey;
- }
- if (RU > CollectionThroughput)
- {
- CollectionThroughput = RU;
- }
- Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
- DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
- }
- }
- }
- catch (CosmosException e)
- {
- throw new BizException(e.Message, 500, e.StackTrace);
- }
- }
- private string GetPartitionKey<T>()
- {
- Type type = typeof(T);
- return GetPartitionKey(type);
- }
- private string GetPartitionKey(Type type)
- {
- PropertyInfo[] properties = type.GetProperties();
- List<PropertyInfo> attrProperties = new List<PropertyInfo>();
- foreach (PropertyInfo property in properties)
- {
- if (property.Name.Equals("PartitionKey"))
- {
- attrProperties.Add(property);
- break;
- }
- object[] attributes = property.GetCustomAttributes(true);
- foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
- {
- if (attribute is PartitionKeyAttribute)
- {
- attrProperties.Add(property);
- }
- }
- }
- if (attrProperties.Count <= 0)
- {
- throw new BizException(type.Name + "has no PartitionKey !");
- }
- else
- {
- if (attrProperties.Count == 1)
- {
- return attrProperties[0].Name;
- }
- else { throw new BizException("PartitionKey can only be single!"); }
- }
- }
- private async Task<Container> InitializeCollection<T>()
- {
- Type type = typeof(T);
- string partitionKey = GetPartitionKey<T>();
- string CollectionName;
- IEnumerable<CosmosDBAttribute> attributes = type.GetCustomAttributes<CosmosDBAttribute>(true);
- if (!string.IsNullOrEmpty(attributes.First<CosmosDBAttribute>().Name))
- {
- CollectionName = attributes.First<CosmosDBAttribute>().Name;
- }
- else
- {
- CollectionName = type.Name;
- }
- return await InitializeCollection(CollectionName, partitionKey);
- }
- private async Task<Container> InitializeCollection(string CollectionName, string PartitionKey)
- {
- /////内存中已经存在这个表则直接返回
- if (DocumentCollectionDict.TryGetValue(CollectionName, out Container DocumentCollection))
- {
- return DocumentCollection;
- }///如果没有则尝试默认创建
- else
- {
- ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
- if (!string.IsNullOrEmpty(PartitionKey))
- {
- containerProperties.PartitionKeyPath = "/" + PartitionKey;
- }
- Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
- DocumentCollectionDict.TryAdd(CollectionName, containerWithConsistentIndexing);
- return containerWithConsistentIndexing;
- }
- }
- public async Task<List<IdPk>> DeleteAll<T>(List<KeyValuePair<string, string>> ids) where T : ID
- {
- Container container = await InitializeCollection<T>();
- //string partitionKey = GetPartitionKey<T>();
- //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
- //{
- // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
- //}));
- List<IdPk> idPks = new List<IdPk>();
- int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<KeyValuePair<string, string>> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
- List<Task> tasks = new List<Task>(lists.Count);
- lists.ForEach(item =>
- {
- tasks.Add(container.DeleteItemStreamAsync(item.Value, new PartitionKey(item.Key))
- .ContinueWith((Task<ResponseMessage> task) =>
- {
- using (ResponseMessage response = task.Result)
- {
- idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
- // if (!response.IsSuccessStatusCode)
- // {
- // }
- }
- }
- ));
- });
- await Task.WhenAll(tasks);
- }
- stopwatch.Stop();
- return idPks;
- }
- public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string,object> dict) where T : ID
- {
- List<T> list= await FindByDict<T>(dict);
- return await DeleteAll(list);
- }
- public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
- {
- List<IdPk> idPks = new List<IdPk>();
- Container container = await InitializeCollection<T>();
- string pk = GetPartitionKey<T>();
- Type type = typeof(T);
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
- lists.ForEach(x =>
- {
- object o = type.GetProperty(pk).GetValue(x, null);
- KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.DeleteItemStreamAsync(item.Value, item.Key)
- .ContinueWith((Task<ResponseMessage> task) =>
- {
- using (ResponseMessage response = task.Result)
- {
-
- idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
-
- }
- }
- ));
- });
- await Task.WhenAll(tasks);
- }
- stopwatch.Stop();
- return idPks;
- }
- public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID {
- return await DeleteAsync<T>(idPk.id, idPk.pk);
- }
- public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
- {
- Container container = await InitializeCollection<T>();
- ResponseMessage response = await container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
-
- return new IdPk { id =id, pk = pk, StatusCode = response.StatusCode };
- }
- public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
- {
- Container container = await InitializeCollection<T>();
- string partitionKey = GetPartitionKey<T>();
- Type type = typeof(T);
- object o = type.GetProperty(partitionKey).GetValue(entity, null);
- ResponseMessage response = await container.DeleteItemStreamAsync (id: entity.id, partitionKey: new PartitionKey(o.ToString()));
- return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
- }
- //public async Task<T> DeleteAsync<T>(string id) where T : ID
- //{
- // Container container = await InitializeCollection<T>();
- // ItemResponse<T> response = await container.DeleteItemAsync<T>(id: id, partitionKey: new PartitionKey(GetPartitionKey<T>()));
- // return response.Resource;
- //}
- public async Task<List<T>> FindAll<T>(List<string> propertys = null) where T : ID
- {
- Container container = await InitializeCollection<T>();
- StringBuilder sql;
- sql = SQLHelperParametric.GetSQLSelect(propertys);
- CosmosDbQuery cosmosDbQuery = new CosmosDbQuery {QueryText = sql.ToString() };
- FeedIterator<T> query = container.GetItemQueryIterator<T>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition);
- return await ResultsFromFeedIterator(query);
- }
- private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, int? maxItemCount = null)
- {
- List<T> results = new List<T>();
- while (query.HasMoreResults)
- {
- foreach (T t in await query.ReadNextAsync())
- {
- results.Add(t);
- if (results.Count == maxItemCount)
- {
- return results;
- }
- }
- }
- return results;
- }
- private async Task<List<T>> ResultsFromFeedIterator<T>(FeedIterator<T> query, Func<List<T>, Task> batchAction, int itemsPerPage)
- {
- List<T> results = new List<T>();
- while (query.HasMoreResults)
- {
- if (results.Count() >= itemsPerPage)
- {
- await batchAction(results);
- results.Clear();
- }
- results.AddRange(await query.ReadNextAsync());
- }
- if (results.Count() > 0)
- {
- await batchAction(results);
- results.Clear();
- }
- return results;
- }
- public async Task<List<dynamic>> FindByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List<string> propertys = null)
- {
- if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
- {
- //StringBuilder sql = new StringBuilder("select value(c) from c");
- //SQLHelper.GetSQL(dict, ref sql);
- //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
- //{
- // QueryText = sql.ToString()
- //};
- StringBuilder sql;
- sql = SQLHelperParametric.GetSQLSelect(propertys);
- CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
- FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
- return await ResultsFromFeedIterator(query, maxItemCount);
- }
- else
- {
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
- }
- }
- public async Task<List<dynamic>> FindCountByDict(string CollectionName, Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null)
- {
- if (DocumentCollectionDict.TryGetValue(CollectionName, out Container container))
- {
- //StringBuilder sql = new StringBuilder("select value count(c) from c");
- //SQLHelper.GetSQL(dict, ref sql);
- //CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
- //{
- // QueryText = sql.ToString()
- //};
- dict.Remove("@CURRPAGE");
- dict.Remove("@PAGESIZE");
- dict.Remove("@ASC");
- dict.Remove("@DESC");
- StringBuilder sql = new StringBuilder("select value count(c) from c");
- CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
- FeedIterator<dynamic> query = container.GetItemQueryIterator<dynamic>(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
- return await ResultsFromFeedIterator(query, maxItemCount);
- }
- else
- {
- throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
- }
- }
- public async Task<List<T>> FindByParams<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null, List<string> propertys = null) where T : ID
- {
- return await FindByDict<T>(dict, itemsPerPage, maxItemCount, partitionKey, propertys);
- }
- public async Task<List<T>> FindByDict<T>(Dictionary<string, object> dict, int itemsPerPage = -1, int? maxItemCount = null, string partitionKey = null,List<string> propertys = null) where T : ID
- {
- StringBuilder sql;
- sql = SQLHelperParametric.GetSQLSelect(propertys);
- CosmosDbQuery cosmosDbQuery = SQLHelperParametric.GetSQL(dict, sql);
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
- return await ResultsFromQueryAndOptions<T>(cosmosDbQuery, queryRequestOptions);
- }
-
- private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, QueryRequestOptions queryOptions, int? maxItemCount = null)
- {
- Container container = await InitializeCollection<T>();
- FeedIterator<T> query = container.GetItemQueryIterator<T>(
- queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
- requestOptions: queryOptions);
- return await ResultsFromFeedIterator(query, maxItemCount);
- }
- private int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
- {
- return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
- }
- private QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null,
- int? maxBufferedItemCount = null,
- int? maxConcurrency = null)
- {
- QueryRequestOptions queryRequestOptions = new QueryRequestOptions
- {
- MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
- MaxBufferedItemCount = maxBufferedItemCount ?? 100,
- MaxConcurrency = maxConcurrency ?? 50
- };
- return queryRequestOptions;
- }
- private async Task<List<T>> ResultsFromQueryAndOptions<T>(CosmosDbQuery cosmosDbQuery, Func<List<T>, Task> batchAction, QueryRequestOptions queryOptions)
- {
- Container container = await InitializeCollection<T>();
- FeedIterator<T> query = container.GetItemQueryIterator<T>(
- queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
- requestOptions: queryOptions);
- return await ResultsFromFeedIterator(query, batchAction, queryOptions.MaxItemCount ?? 0);
- }
- private QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
- {
- QueryRequestOptions queryRequestOptions = new QueryRequestOptions
- {
- MaxItemCount = itemsPerPage
- };
- return queryRequestOptions;
- }
- public async Task<List<T>> FindLinq<T>(Expression<Func<T, bool>> query = null, Expression<Func<T, object>> order = null, bool isDesc = false, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
- {
- //QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(itemsPerPage);
- QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(itemsPerPage, maxItemCount));
- FeedIterator<T> feedIterator;
- Container container = await InitializeCollection<T>();
- if (query == null)
- {
- if (order != null)
- {
- if (isDesc)
- {
- feedIterator = container
- .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderByDescending(order)
- .ToFeedIterator();
- }
- else
- {
- feedIterator = container
- .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions).OrderBy(order)
- .ToFeedIterator();
- }
- }
- else
- {
- feedIterator = container
- .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
- .ToFeedIterator();
- }
- }
- else
- {
- if (order != null)
- {
- if (isDesc)
- {
- feedIterator = container
- .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
- .Where(query).OrderByDescending(order)
- .ToFeedIterator();
- }
- else
- {
- feedIterator = container
- .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
- .Where(query).OrderBy(order)
- .ToFeedIterator();
- }
- }
- else
- {
- feedIterator = container
- .GetItemLinqQueryable<T>(requestOptions: queryRequestOptions)
- .Where(query)
- .ToFeedIterator();
- }
- }
- return await ResultsFromFeedIterator<T>(feedIterator);
- }
- public async Task<List<T>> FindSQL<T>(string sql, Dictionary<string, object> Parameters = null, int itemsPerPage = -1, int? maxItemCount = null) where T : ID
- {
- Container container = await InitializeCollection<T>();
- QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(itemsPerPage, maxItemCount));
- if (Parameters != null)
- {
- CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
- {
- QueryText = sql,
- Parameters = Parameters
- };
- FeedIterator<T> feedIterator = container
- .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
- return await ResultsFromFeedIterator(feedIterator);
- }
- else
- {
- QueryDefinition queryDefinition = new QueryDefinition(sql);
- return await ResultsFromFeedIterator<T>(container.GetItemQueryIterator<T>(queryDefinition));
- }
- }
- public async Task<T> Save<T>(T entity) where T : ID
- {
- try
- {
- Container container = await InitializeCollection<T>();
- ItemResponse<T> response = await container.CreateItemAsync<T>(entity);
- return response.Resource;
- }
- catch (Exception e)
- {
- throw new BizException(e.Message);
- }
- }
- public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
- {
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- Container container = await InitializeCollection<T>();
- string pk = GetPartitionKey<T>();
- Type type = typeof(T);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
- lists.ForEach(async x =>
- {
- MemoryStream stream = new MemoryStream();
- await JsonSerializer.SerializeAsync(stream, x ,new JsonSerializerOptions { IgnoreNullValues=true});
- object o = type.GetProperty(pk).GetValue(x, null);
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.CreateItemStreamAsync(item.Value, item.Key)
- .ContinueWith((Task<ResponseMessage> task) =>
- {
- using (ResponseMessage response = task.Result)
- {
- if (!response.IsSuccessStatusCode)
- {
- }
- }
- }
- ));
- });
- await Task.WhenAll(tasks);
- }
- stopwatch.Stop();
- return enyites;
- }
- public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
- {
- Container container = await InitializeCollection<T>();
- ItemResponse<T> response = await container.UpsertItemAsync(item: entity);
- return response.Resource;
- }
- public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
- {
- //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
- //{
- // Task.WaitAll(Update(item));
- //}));
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- Container container = await InitializeCollection<T>();
- string pk = GetPartitionKey<T>();
- Type type = typeof(T);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
- lists.ForEach(async x =>
- {
- MemoryStream stream = new MemoryStream();
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
- object o = type.GetProperty(pk).GetValue(x, null);
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
- .ContinueWith((Task<ResponseMessage> task) =>
- {
- //using (ResponseMessage response = task.Result)
- //{
- // if (!response.IsSuccessStatusCode)
- // {
- // }
- //}
- }
- ));
- });
- await Task.WhenAll(tasks);
- }
- stopwatch.Stop();
- return enyites;
- }
- public async Task<T> Update<T>(T entity) where T : ID
- {
- Container container = await InitializeCollection<T>();
- string pk = GetPartitionKey<T>();
- object o = typeof(T).GetProperty(pk).GetValue(entity, null);
- ItemResponse<T> response = await container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
- return response.Resource;
- }
- internal class Item
- {
- public string id { get; set; }
- public string pk { get; set; }
- public MemoryStream stream { get; set; }
- }
- public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
- {
- //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
- //{
- // Task.WaitAll(Update(item));
- //}));
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
- Container container = await InitializeCollection<T>();
- string pk = GetPartitionKey<T>();
- Type type = typeof(T);
- Stopwatch stopwatch = Stopwatch.StartNew();
- for (int i = 0; i < pages; i++)
- {
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
- List<Item> itemsToInsert = new List<Item>();
- lists.ForEach(async x =>
- {
- MemoryStream stream = new MemoryStream();
- await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
- object o = type.GetProperty(pk).GetValue(x, null);
- Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
- itemsToInsert.Add(keyValue);
- });
- List<Task> tasks = new List<Task>(lists.Count);
- itemsToInsert.ForEach(item =>
- {
- tasks.Add(container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
- .ContinueWith((Task<ResponseMessage> task) =>
- {
- //using (ResponseMessage response = task.Result)
- //{
- // if (!response.IsSuccessStatusCode)
- // {
- // }
- //}
- }
- ));
- });
- await Task.WhenAll(tasks);
- }
- stopwatch.Stop();
- return enyites;
- }
- //public void Dispose()
- //{
- // Dispose(true);
- //}
- //protected virtual void Dispose(bool disposing)
- //{
- // if (disposing)
- // {
- // CosmosClient?.Dispose();
- // }
- //}
- public async Task<T> FindById<T>(string id) where T : ID
- {
- Container container = await InitializeCollection<T>();
- CosmosDbQuery cosmosDbQuery = new CosmosDbQuery
- {
- QueryText = @"SELECT *
- FROM c
- WHERE c.id = @id",
- Parameters = new Dictionary<string, object>
- {
- { "@id",id}
- }
- };
- FeedIterator<T> feedIterator = container
- .GetItemQueryIterator<T>(cosmosDbQuery.CosmosQueryDefinition);
- return (await ResultsFromFeedIterator(feedIterator)).SingleOrDefault();
- }
- public async Task<T> FindByIdPk<T>(string id, string pk) where T : ID
- {
- Container container = await InitializeCollection<T>();
- ItemResponse<T> response = await container.ReadItemAsync<T>(id: id, partitionKey: new PartitionKey(pk));
- return response.Resource;
- }
- }
- }
|