using Microsoft.Azure.Cosmos.Table;
using Microsoft.Azure.Cosmos.Table.Queryable;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using Azure;
using TEAMModelOS.SDK.DI.AzureCosmos.Inner;
using System.IO;
using TEAMModelOS.SDK.DI;
using System.Diagnostics;
using Azure.Cosmos;
using System.Text.Json;
using System.Net;
using TEAMModelOS.SDK.Context.Exception;
using System.Linq.Expressions;
using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
using TEAMModelOS.SDK.Helper.Common.LogHelper;
using TEAMModelOS.SDK.Helper.Common.JsonHelper;
namespace TEAMModelOS.SDK.DI
{
public static class AzureCosmosExtensions
{
///
/// 缓存前缀
///
private const string CacheCosmosPrefix = "cosmos:";
///
/// ttl时长 1秒
///
private const int ttl = 1;
///
/// 分页大小
///
private const int pageSize = 200;
///
/// 超时时间
///
private const int timeoutSeconds = 86400;
public static int RU(this Response response)
{
try
{
response.Headers.TryGetValue("x-ms-request-charge", out var value);
var ru = Convert.ToInt32(value);
return ru;
}
catch (Exception)
{
return 0;
}
}
public static async Task FindByIdPk(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
{
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
ItemResponse response = await container.container.ReadItemAsync(id: id, partitionKey: new PartitionKey(pk));
return response.Value;
}
public static async Task> FindByIds(this AzureCosmosFactory azureCosmosFactory, List ids) where T : ID
{
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
if (container.cache && RedisHelper.Instance != null)
{
List list = new List();
List NotIn = new List();
foreach (string id in ids)
{
if (!await RedisHelper.HExistsAsync(CacheCosmosPrefix + container.container.Id, id))
{
NotIn.Add(id);
}
else
{
list.Add(await RedisHelper.HGetAsync(CacheCosmosPrefix + container.container.Id, id));
}
}
if (NotIn.IsNotEmpty())
{
List noInList = await FindByDict(azureCosmosFactory, new Dictionary { { "id", NotIn.ToArray() } });
noInList.ForEach(x => { RedisHelper.HSet(CacheCosmosPrefix + container.container.Id, x.id, x); RedisHelper.Expire(CacheCosmosPrefix + container.container.Id, timeoutSeconds); });
list.AddRange(noInList);
}
return list;
}
else
{
return await FindByDict(azureCosmosFactory, new Dictionary { { "id", ids.ToArray() } });
}
}
public static async Task Save(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
{
AzureCosmosModel container = await azureCosmosFactory. InitializeCollection();
entity.pk = container.type.Name;
entity.ttl = null;
ItemResponse response = await container.container.CreateItemAsync(entity);
if (container.cache && RedisHelper.Instance != null)
{
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Value;
}
public static async Task> SaveAll(this AzureCosmosFactory azureCosmosFactory, List enyites) where T : ID
{
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
AzureCosmosModel container = await azureCosmosFactory. InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey =AzureCosmosUtil.GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(async x =>
{
x.pk = type.Name;
x.ttl = null;
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(partitionKey).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.CreateItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
using (Response response = task.Result)
{
//if (!response.IsSuccessStatusCode)
//{
//}
}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public static async Task SaveOrUpdate(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
{
AzureCosmosModel container = await azureCosmosFactory. InitializeCollection();
entity.pk = container.type.Name;
entity.ttl = null;
ItemResponse response = await container.container.UpsertItemAsync(item: entity);
if (container.cache && RedisHelper.Instance != null)
{
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Value;
}
public static async Task> SaveOrUpdateAll(this AzureCosmosFactory azureCosmosFactory, List enyites) where T : ID
{
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
AzureCosmosModel container = await azureCosmosFactory. InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey =AzureCosmosUtil.GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(async x =>
{
x.pk = type.Name;
x.ttl = null;
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(partitionKey).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public static async Task> UpdateAll(this AzureCosmosFactory azureCosmosFactory, string typeName, List enyites)
{
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
if (azureCosmosFactory. CosmosDict.typeCosmos.TryGetValue(typeName, out AzureCosmosModel container))
{
}
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey = container.partitionKey;
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List- itemsToInsert = new List
- ();
lists.ForEach(async x =>
{
/* x.pk = typeName;
x.ttl = null;*/
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = x[partitionKey];
Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public static async Task
> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, Dictionary dict)
{
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
string pk = typeof(T).Name;
StringBuilder sql = new StringBuilder("select value count(c) from c");
AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
if (cosmosDbQuery == null)
{
return new List { 0 };
}
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
AsyncPageable query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query);
}
public static async Task> FindCountByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary dict)
{
if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
{
string pk = container.type.Name;
dict.Remove("@CURRPAGE");
dict.Remove("@PAGESIZE");
dict.Remove("@ASC");
dict.Remove("@DESC");
StringBuilder sql = new StringBuilder("select value count(c) from c");
AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
if (cosmosDbQuery == null)
{
return new List { 0 };
}
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
AsyncPageable query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public static async Task Update(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
{
AzureCosmosModel container = await azureCosmosFactory. InitializeCollection();
string partitionKey =AzureCosmosUtil.GetPartitionKey();
Type type = typeof(T);
entity.pk = type.Name;
entity.ttl = null;
object o = type.GetProperty(partitionKey).GetValue(entity, null);
ItemResponse response = await container.container.ReplaceItemAsync(entity, entity.id, new PartitionKey(o.ToString()));
if (container.cache && RedisHelper.Instance != null)
{
if (!RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
}
else
{
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, entity.id, entity);
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
}
return response.Value;
}
internal class Item
{
public string id { get; set; }
public string pk { get; set; }
public MemoryStream stream { get; set; }
}
public static async Task> UpdateAll(this AzureCosmosFactory azureCosmosFactory, List enyites) where T : ID
{
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
//{
// Task.WaitAll(Update(item));
//}));
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
AzureCosmosModel container = await azureCosmosFactory. InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey =AzureCosmosUtil.GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List- itemsToInsert = new List
- ();
lists.ForEach(async x =>
{
x.pk = type.Name;
x.ttl = null;
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(partitionKey).GetValue(x, null);
Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
public static async Task
> FindByDict(this AzureCosmosFactory azureCosmosFactory, Dictionary dict, List propertys = null) where T : ID
{
StringBuilder sql;
sql = SQLHelper.GetSQLSelect(propertys);
string pk = typeof(T).Name;
AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
return await ResultsFromQueryAndOptions(azureCosmosFactory,cosmosDbQuery, queryRequestOptions);
}
public static async Task> FindByDict(this AzureCosmosFactory azureCosmosFactory, string CollectionName, Dictionary dict, List propertys = null)
{
if (azureCosmosFactory.CosmosDict.typeCosmos.TryGetValue(CollectionName, out AzureCosmosModel container))
{
string pk = container.type.Name;
StringBuilder sql;
sql = SQLHelper.GetSQLSelect(propertys);
AzureCosmosQuery cosmosDbQuery = SQLHelper.GetSQL(dict, sql, pk);
QueryRequestOptions queryRequestOptions = GetDefaultQueryRequestOptions(itemsPerPage: GetEffectivePageSize(-1, null));
AsyncPageable query = container.container.GetItemQueryIterator(queryDefinition: cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryRequestOptions);
return await ResultsFromFeedIterator(query);
}
else
{
throw new BizException("CollectionName named:" + CollectionName + " dose not exsit in Database!");
}
}
public static async Task> DeleteAll(this AzureCosmosFactory azureCosmosFactory, List ids) where T : ID
{
// string pk = GetPartitionKey();
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
List idPks = new List();
if (container.monitor)
{
List list = await azureCosmosFactory.FindByDict(new Dictionary() { { "id", ids.Select(x => x.id).ToArray() } });
list = await azureCosmosFactory. DeleteTTL(list);
return ids;
}
else
{
int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
List tasks = new List(lists.Count);
lists.ForEach(item =>
{
tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
.ContinueWith((Task task) =>
{
using (Response response = task.Result)
{
idPks.Add(new IdPk { id = item.id, pk = item.pk, Status = response.Status });
// if (!response.IsSuccessStatusCode)
// {
// }
}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
});
}
}
stopwatch.Stop();
return idPks;
}
}
public static async Task> DeleteAll(this AzureCosmosFactory azureCosmosFactory, Dictionary dict) where T : ID
{
if (dict.Keys.Count > 0)
{
List list = await azureCosmosFactory.FindByDict(dict);
return await azureCosmosFactory.DeleteAll(list);
}
else
{
throw new BizException("参数为空", 500);
}
}
public static async Task> DeleteAll(this AzureCosmosFactory azureCosmosFactory, List enyites) where T : ID
{
Type type = typeof(T);
string pk =AzureCosmosUtil. GetPartitionKey();
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
List idPks = new List();
//log4net 日志記錄
string uuidKey = Guid.NewGuid().ToString();
string logkey = "\r\n【" + uuidKey + "】\r\n";
LogHelper.Info(default(object),
logkey
+ "删除------->>\r\n"
+ "表:"
+ type.Name + "\r\n"
+ "数据:"
+ enyites.ToApiJson()
+ "\r\n" + logkey);
if (container.monitor)
{
enyites = await azureCosmosFactory.DeleteTTL(enyites);
foreach (T t in enyites)
{
object o = type.GetProperty(pk).GetValue(t, null);
idPks.Add(new IdPk { id = t.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent });
}
return idPks;
}
else
{
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(x =>
{
object o = type.GetProperty(pk).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), x.id);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
using (Response response = task.Result)
{
idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), Status = response.Status });
}
}
));
});
await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
});
}
}
stopwatch.Stop();
return idPks;
}
}
public static async Task DeleteAsync(this AzureCosmosFactory azureCosmosFactory, IdPk idPk) where T : ID
{
return await DeleteAsync(azureCosmosFactory, idPk.id, idPk.pk);
}
public static async Task DeleteAsync(this AzureCosmosFactory azureCosmosFactory, string id, string pk) where T : ID
{
// pk =AzureCosmosUtil.GetPartitionKey();
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
if (container.monitor)
{
List list = await FindByDict(azureCosmosFactory,new Dictionary() { { "id", id } });
if (list.Count > 0)
{
await DeleteTTL(azureCosmosFactory, list);
return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
}
else
{
throw new BizException("未找到ID匹配的数据,删除失败");
}
}
else
{
Response response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
if (container.cache && RedisHelper.Instance != null)
{
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
}
return new IdPk { id = id, pk = pk, Status = response.Status };
}
}
public static async Task DeleteAsync(this AzureCosmosFactory azureCosmosFactory, T entity) where T : ID
{
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
string partitionKey = AzureCosmosUtil.GetPartitionKey();
Type type = typeof(T);
object o = type.GetProperty(partitionKey).GetValue(entity, null);
if (container.monitor)
{
await DeleteTTL(azureCosmosFactory, new List() { entity });
return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
}
else
{
Response response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
if (container.cache && RedisHelper.Instance != null)
{
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
}
return new IdPk { id = entity.id, pk = partitionKey, Status = response.Status };
}
}
public static async Task> FindSQL(this AzureCosmosFactory azureCosmosFactory, string sql, Dictionary Parameters = null) where T : ID
{
if (sql.Contains(".pk"))
{
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
QueryRequestOptions queryOptions = GetQueryRequestOptions(GetEffectivePageSize(-1, null));
if (Parameters != null)
{
AzureCosmosQuery cosmosDbQuery = new AzureCosmosQuery
{
QueryText = sql,
Parameters = Parameters
};
AsyncPageable feedIterator = container.container
.GetItemQueryIterator(cosmosDbQuery.CosmosQueryDefinition, requestOptions: queryOptions);
return await ResultsFromFeedIterator(feedIterator);
}
else
{
QueryDefinition queryDefinition = new QueryDefinition(sql);
return await ResultsFromFeedIterator(container.container.GetItemQueryIterator(queryDefinition));
}
}
else
{
throw new BizException("查询参数必须设置 .pk ", ResponseCode.PARAMS_ERROR);
}
}
///
/// 按TTL删除
///
///
///
///
private static async Task> DeleteTTL(this AzureCosmosFactory azureCosmosFactory ,List list) where T : ID
{
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
list.ForEach(x => { x.ttl = ttl; });
list = await DeleteTTlALL(azureCosmosFactory,list);
if (container.cache && RedisHelper.Instance != null)
{
list.ForEach(async x => {
await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
});
}
return list;
}
private static async Task> DeleteTTlALL(this AzureCosmosFactory azureCosmosFactory, List enyites) where T : ID
{
int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
AzureCosmosModel container = await azureCosmosFactory.InitializeCollection();
bool flag = false;
if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
{
flag = true;
}
string partitionKey = AzureCosmosUtil.GetPartitionKey();
Type type = typeof(T);
Stopwatch stopwatch = Stopwatch.StartNew();
for (int i = 0; i < pages; i++)
{
List lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
List> itemsToInsert = new List>();
lists.ForEach(async x =>
{
x.pk = type.Name;
//x.ttl = null;
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
object o = type.GetProperty(partitionKey).GetValue(x, null);
KeyValuePair keyValue = new KeyValuePair(new PartitionKey(o.ToString()), stream);
itemsToInsert.Add(keyValue);
});
List tasks = new List(lists.Count);
itemsToInsert.ForEach(item =>
{
tasks.Add(container.container.UpsertItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task task) =>
{
//using (ResponseMessage response = task.Result)
//{
// if (!response.IsSuccessStatusCode)
// {
// }
//}
}
));
});
await Task.WhenAll(tasks);
if (container.cache && RedisHelper.Instance != null)
{
lists.ForEach(async x => {
await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
});
}
}
if (container.cache && RedisHelper.Instance != null && !flag)
{
await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
}
stopwatch.Stop();
return enyites;
}
private static QueryRequestOptions GetDefaultQueryRequestOptions(int? itemsPerPage = null, int? maxBufferedItemCount = null, int? maxConcurrency = null)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage == -1 ? 1000 : itemsPerPage,
MaxBufferedItemCount = maxBufferedItemCount ?? 100,
MaxConcurrency = maxConcurrency ?? 50
};
return queryRequestOptions;
}
private static int GetEffectivePageSize(int itemsPerPage, int? maxItemCount)
{
return itemsPerPage == -1 ? maxItemCount ?? itemsPerPage : Math.Min(maxItemCount ?? itemsPerPage, itemsPerPage);
}
private static async Task> ResultsFromQueryAndOptions(this AzureCosmosFactory azureCosmosFactory, AzureCosmosQuery cosmosDbQuery, QueryRequestOptions queryOptions)
{
if (cosmosDbQuery == null)
{
return null;
}
AzureCosmosModel container = await azureCosmosFactory. InitializeCollection();
AsyncPageable query = container.container.GetItemQueryIterator(
queryDefinition: cosmosDbQuery.CosmosQueryDefinition,
requestOptions: queryOptions);
return await ResultsFromFeedIterator(query);
}
private static async Task> ResultsFromFeedIterator(AsyncPageable query, int? maxItemCount = null)
{
List results = new List();
await foreach (T t in query)
{
results.Add(t);
if (results.Count == maxItemCount)
{
return results;
}
}
return results;
}
private static QueryRequestOptions GetQueryRequestOptions(int itemsPerPage)
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = itemsPerPage
};
return queryRequestOptions;
}
}
}