|
@@ -45,7 +45,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
private const string CacheCosmosPrefix = "cosmos:";
|
|
|
private string[] ScanModel { get; set; }
|
|
|
private const int timeoutSeconds = 86400;
|
|
|
-
|
|
|
+ private const int ttl = 10;
|
|
|
private string leaseId = "AleaseContainer";
|
|
|
public AzureCosmosDBV3Repository(AzureCosmosDBOptions options, CosmosSerializer cosmosSerializer)
|
|
|
{
|
|
@@ -167,7 +167,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName };
|
|
|
+ ContainerProperties containerProperties = new ContainerProperties { Id = CollectionName ,DefaultTimeToLive=-1 };
|
|
|
|
|
|
if (!string.IsNullOrEmpty(PartitionKey))
|
|
|
{
|
|
@@ -183,7 +183,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
}
|
|
|
if (isMonitor)
|
|
|
{
|
|
|
- ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id" };
|
|
|
+ ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
|
|
|
Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
|
|
|
DocumentCollectionDict.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
|
|
|
}
|
|
@@ -275,99 +275,138 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
return cosmosModel;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+ /// <summary>
|
|
|
+ /// 按TTL删除
|
|
|
+ /// </summary>
|
|
|
+ /// <typeparam name="T"></typeparam>
|
|
|
+ /// <param name="list"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ private async Task<List<T>> DeleteTTL<T>(List<T> list)where T:ID {
|
|
|
+ CosmosModelInfo container = await InitializeCollection<T>();
|
|
|
+ list.ForEach(x=> { x.ttl = ttl; });
|
|
|
+ list= await SaveOrUpdateAll(list);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ list.ForEach(async x => {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return list;
|
|
|
+ }
|
|
|
public async Task<List<IdPk>> DeleteAll<T>(List<IdPk> ids) where T : ID
|
|
|
{
|
|
|
CosmosModelInfo container = await InitializeCollection<T>();
|
|
|
- //string partitionKey = GetPartitionKey<T>();
|
|
|
- //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
|
|
|
- //{
|
|
|
- // Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
|
|
|
- //}));
|
|
|
-
|
|
|
List<IdPk> idPks = new List<IdPk>();
|
|
|
- int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
|
|
|
- Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
+ if (container.monitor)
|
|
|
{
|
|
|
- List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<Task> tasks = new List<Task>(lists.Count);
|
|
|
- lists.ForEach(item =>
|
|
|
+ List<T> list= await FindByDict<T>(new Dictionary<string, object>() { { "id", ids.Select(x => x.id).ToArray() } });
|
|
|
+ list= await DeleteTTL(list);
|
|
|
+ return ids;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ int pages = (int)Math.Ceiling((double)ids.Count / pageSize);
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
{
|
|
|
- tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
|
|
|
- .ContinueWith((Task<ResponseMessage> task) =>
|
|
|
- {
|
|
|
- using (ResponseMessage response = task.Result)
|
|
|
+ List<IdPk> lists = ids.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ lists.ForEach(item =>
|
|
|
+ {
|
|
|
+ tasks.Add(container.container.DeleteItemStreamAsync(item.id, new PartitionKey(item.pk))
|
|
|
+ .ContinueWith((Task<ResponseMessage> task) =>
|
|
|
{
|
|
|
- idPks.Add(new IdPk { id = item.id, pk = item.pk.ToString(), StatusCode = response.StatusCode });
|
|
|
+ using (ResponseMessage response = task.Result)
|
|
|
+ {
|
|
|
+ idPks.Add(new IdPk { id = item.id, pk = item.pk.ToString(), StatusCode = response.StatusCode });
|
|
|
// if (!response.IsSuccessStatusCode)
|
|
|
// {
|
|
|
// }
|
|
|
}
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
- await Task.WhenAll(tasks);
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- lists.ForEach(async x => {
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
+ }
|
|
|
+ ));
|
|
|
});
|
|
|
+ await Task.WhenAll(tasks);
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ lists.ForEach(async x => {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
+ stopwatch.Stop();
|
|
|
+ return idPks;
|
|
|
}
|
|
|
- stopwatch.Stop();
|
|
|
- return idPks;
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
+
|
|
|
public async Task<List<IdPk>> DeleteAll<T>(Dictionary<string, object> dict) where T : ID
|
|
|
{
|
|
|
- List<T> list = await FindByDict<T>(dict);
|
|
|
- return await DeleteAll(list);
|
|
|
+ if (dict.Keys.Count > 0)
|
|
|
+ {
|
|
|
+ List<T> list = await FindByDict<T>(dict);
|
|
|
+ return await DeleteAll(list);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ throw new BizException("参数为空", 500);
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
public async Task<List<IdPk>> DeleteAll<T>(List<T> enyites) where T : ID
|
|
|
{
|
|
|
-
|
|
|
- List<IdPk> idPks = new List<IdPk>();
|
|
|
- CosmosModelInfo container = await InitializeCollection<T>();
|
|
|
- string pk = GetPartitionKey<T>();
|
|
|
Type type = typeof(T);
|
|
|
- int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
- Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
- for (int i = 0; i < pages; i++)
|
|
|
+ string pk = GetPartitionKey<T>();
|
|
|
+ CosmosModelInfo container = await InitializeCollection<T>();
|
|
|
+ List<IdPk> idPks = new List<IdPk>();
|
|
|
+
|
|
|
+ if (container.monitor)
|
|
|
{
|
|
|
- List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
|
|
|
- lists.ForEach(x =>
|
|
|
+ enyites = await DeleteTTL(enyites);
|
|
|
+ foreach (T t in enyites) {
|
|
|
+ object o = type.GetProperty(pk).GetValue(t, null);
|
|
|
+ idPks.Add(new IdPk { id = t.id, pk =o.ToString(), StatusCode = HttpStatusCode .NoContent});
|
|
|
+ }
|
|
|
+ return idPks;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
{
|
|
|
- object o = type.GetProperty(pk).GetValue(x, null);
|
|
|
- KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
|
|
|
- itemsToInsert.Add(keyValue);
|
|
|
- });
|
|
|
+ List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
|
|
|
+ lists.ForEach(x =>
|
|
|
+ {
|
|
|
+ object o = type.GetProperty(pk).GetValue(x, null);
|
|
|
+ KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
|
|
|
+ itemsToInsert.Add(keyValue);
|
|
|
+ });
|
|
|
|
|
|
- List<Task> tasks = new List<Task>(lists.Count);
|
|
|
- itemsToInsert.ForEach(item =>
|
|
|
- {
|
|
|
- tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
|
|
|
- .ContinueWith((Task<ResponseMessage> task) =>
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ itemsToInsert.ForEach(item =>
|
|
|
{
|
|
|
- using (ResponseMessage response = task.Result)
|
|
|
+ tasks.Add(container.container.DeleteItemStreamAsync(item.Value, item.Key)
|
|
|
+ .ContinueWith((Task<ResponseMessage> task) =>
|
|
|
{
|
|
|
+ using (ResponseMessage response = task.Result)
|
|
|
+ {
|
|
|
|
|
|
- idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
|
|
|
+ idPks.Add(new IdPk { id = item.Value, pk = item.Key.ToString(), StatusCode = response.StatusCode });
|
|
|
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
- await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
|
|
|
- {
|
|
|
- lists.ForEach(async x => {
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
+ ));
|
|
|
});
|
|
|
+ await Task.WhenAll(tasks); if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ lists.ForEach(async x => {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, x.id);
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
+ stopwatch.Stop();
|
|
|
+ return idPks;
|
|
|
}
|
|
|
- stopwatch.Stop();
|
|
|
- return idPks;
|
|
|
}
|
|
|
|
|
|
public async Task<IdPk> DeleteAsync<T>(IdPk idPk) where T : ID
|
|
@@ -377,12 +416,27 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
public async Task<IdPk> DeleteAsync<T>(string id, string pk) where T : ID
|
|
|
{
|
|
|
CosmosModelInfo container = await InitializeCollection<T>();
|
|
|
- ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
+ if (container.monitor)
|
|
|
{
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
|
|
|
+ List<T> list= await FindByDict<T>(new Dictionary<string, object>() { { "id", id } });
|
|
|
+ if (list.Count > 0)
|
|
|
+ {
|
|
|
+ await DeleteTTL<T>(list);
|
|
|
+ return new IdPk { id = id, pk = pk, StatusCode = HttpStatusCode.NoContent };
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ throw new BizException("未找到ID匹配的数据,删除失败");
|
|
|
+ }
|
|
|
}
|
|
|
- return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
|
|
|
+ else {
|
|
|
+ ResponseMessage response = await container.container.DeleteItemStreamAsync(id: id, partitionKey: new PartitionKey(pk));
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, id);
|
|
|
+ }
|
|
|
+ return new IdPk { id = id, pk = pk, StatusCode = response.StatusCode };
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
public async Task<IdPk> DeleteAsync<T>(T entity) where T : ID
|
|
@@ -391,13 +445,19 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
string partitionKey = GetPartitionKey<T>();
|
|
|
Type type = typeof(T);
|
|
|
object o = type.GetProperty(partitionKey).GetValue(entity, null);
|
|
|
- ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
|
|
|
- if (container.cache && RedisHelper.Instance != null)
|
|
|
+ if (container.monitor)
|
|
|
{
|
|
|
- await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
|
|
|
+ await DeleteTTL<T>(new List<T>() { entity});
|
|
|
+ return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = HttpStatusCode.NoContent };
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ ResponseMessage response = await container.container.DeleteItemStreamAsync(id: entity.id, partitionKey: new PartitionKey(o.ToString()));
|
|
|
+ if (container.cache && RedisHelper.Instance != null)
|
|
|
+ {
|
|
|
+ await RedisHelper.HDelAsync(CacheCosmosPrefix + container.container.Id, entity.id);
|
|
|
+ }
|
|
|
+ return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
|
|
|
}
|
|
|
- return new IdPk { id = entity.id, pk = o.ToString(), StatusCode = response.StatusCode };
|
|
|
-
|
|
|
}
|
|
|
//public async Task<T> DeleteAsync<T>(string id) where T : ID
|
|
|
//{
|