|
@@ -564,36 +564,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- //public async Task<List<T>> FindSQL<T>(string sql, bool isPK) where T : ID
|
|
|
- //{
|
|
|
- // Container container = await InitializeCollection<T>();
|
|
|
- // QueryDefinition queryDefinition = new QueryDefinition(sql);
|
|
|
- // return await ResultsFromFeedIterator<T>(container.GetItemQueryIterator<T>(queryDefinition));
|
|
|
- //}
|
|
|
-
|
|
|
- public async Task<T> ReplaceObject<T>(T entity) where T : ID
|
|
|
- {
|
|
|
- Container container = await InitializeCollection<T>();
|
|
|
- ItemResponse<T> response = await container.ReplaceItemAsync(item: entity, id: entity.id);
|
|
|
- if (response.StatusCode.Equals(HttpStatusCode.OK))
|
|
|
- {
|
|
|
- return response.Resource;
|
|
|
- }
|
|
|
- else { throw new BizException("error"); }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- //public async Task<T> ReplaceObject<T>(T entity, string key, string partitionKey) where T : ID
|
|
|
- //{
|
|
|
- // Container container = await InitializeCollection<T>();
|
|
|
- // ItemResponse<T> response = await container.ReplaceItemAsync(item: entity, id: entity.id);
|
|
|
- // if (response.StatusCode.Equals(HttpStatusCode.OK))
|
|
|
- // {
|
|
|
- // return response.Resource;
|
|
|
- // }
|
|
|
- // else { throw new BizException("error"); }
|
|
|
-
|
|
|
- //}
|
|
|
+
|
|
|
|
|
|
public async Task<T> Save<T>(T entity) where T : ID
|
|
|
{
|
|
@@ -649,14 +620,77 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
stopwatch.Stop();
|
|
|
return enyites;
|
|
|
}
|
|
|
+ public async Task<T> SaveOrUpdate<T>(T entity) where T : ID
|
|
|
+ {
|
|
|
+ Container container = await InitializeCollection<T>();
|
|
|
+ ItemResponse<T> response = await container.UpsertItemAsync(item: entity);
|
|
|
+ if (response.StatusCode.Equals(HttpStatusCode.OK))
|
|
|
+ {
|
|
|
+ return response.Resource;
|
|
|
+ }
|
|
|
+ else { throw new BizException("error"); }
|
|
|
+ }
|
|
|
+ public async Task<List<T>> SaveOrUpdateAll<T>(List<T> enyites) where T : ID
|
|
|
+ {
|
|
|
+ //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
|
|
|
+ //{
|
|
|
+ // Task.WaitAll(Update(item));
|
|
|
+ //}));
|
|
|
+
|
|
|
+ int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
|
|
|
+ Container container = await InitializeCollection<T>();
|
|
|
+ string pk = GetPartitionKey<T>();
|
|
|
+ Type type = typeof(T);
|
|
|
+ Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
+ for (int i = 0; i < pages; i++)
|
|
|
+ {
|
|
|
+ List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
+ List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
|
|
|
+ lists.ForEach(async x =>
|
|
|
+ {
|
|
|
+ MemoryStream stream = new MemoryStream();
|
|
|
+ await JsonSerializer.SerializeAsync(stream, x);
|
|
|
+ object o = type.GetProperty(pk).GetValue(x, null);
|
|
|
+ KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
|
|
|
+ itemsToInsert.Add(keyValue);
|
|
|
+ });
|
|
|
+ List<Task> tasks = new List<Task>(lists.Count);
|
|
|
+ itemsToInsert.ForEach(item =>
|
|
|
+ {
|
|
|
+ tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
|
|
|
+ .ContinueWith((Task<ResponseMessage> task) =>
|
|
|
+ {
|
|
|
+ //using (ResponseMessage response = task.Result)
|
|
|
+ //{
|
|
|
+ // if (!response.IsSuccessStatusCode)
|
|
|
+ // {
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ });
|
|
|
+ await Task.WhenAll(tasks);
|
|
|
+ }
|
|
|
+ stopwatch.Stop();
|
|
|
+ return enyites;
|
|
|
+ }
|
|
|
|
|
|
public async Task<T> Update<T>(T entity) where T : ID
|
|
|
{
|
|
|
Container container = await InitializeCollection<T>();
|
|
|
- ItemResponse<T> response = await container.UpsertItemAsync(entity);
|
|
|
+ string pk = GetPartitionKey<T>();
|
|
|
+ object o = typeof(T).GetProperty(pk).GetValue(entity, null);
|
|
|
+ ItemResponse<T> response = await container.ReplaceItemAsync(entity,entity.id,new PartitionKey(o.ToString()));
|
|
|
return response.Resource;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ internal class Item {
|
|
|
+ public string id { get; set; }
|
|
|
+ public string pk { get; set; }
|
|
|
+ public MemoryStream stream { get; set; }
|
|
|
+ }
|
|
|
+
|
|
|
public async Task<List<T>> UpdateAll<T>(List<T> enyites) where T : ID
|
|
|
{
|
|
|
//await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
|
|
@@ -672,20 +706,19 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
for (int i = 0; i < pages; i++)
|
|
|
{
|
|
|
List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
|
|
|
- List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
|
|
|
+ List<Item> itemsToInsert = new List<Item>();
|
|
|
lists.ForEach(async x =>
|
|
|
{
|
|
|
MemoryStream stream = new MemoryStream();
|
|
|
await JsonSerializer.SerializeAsync(stream, x);
|
|
|
object o = type.GetProperty(pk).GetValue(x, null);
|
|
|
- KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
|
|
|
+ Item keyValue = new Item { id=x.id,pk=o.ToString(),stream=stream};
|
|
|
itemsToInsert.Add(keyValue);
|
|
|
});
|
|
|
-
|
|
|
List<Task> tasks = new List<Task>(lists.Count);
|
|
|
itemsToInsert.ForEach(item =>
|
|
|
{
|
|
|
- tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
|
|
|
+ tasks.Add(container.ReplaceItemStreamAsync(item.stream, item.id,new PartitionKey(item.pk))
|
|
|
.ContinueWith((Task<ResponseMessage> task) =>
|
|
|
{
|
|
|
//using (ResponseMessage response = task.Result)
|
|
@@ -694,7 +727,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
|
|
|
// {
|
|
|
// }
|
|
|
//}
|
|
|
- }
|
|
|
+ }
|
|
|
));
|
|
|
});
|
|
|
await Task.WhenAll(tasks);
|