Просмотр исходного кода

CosmosDB批量保存缓慢的问题处理为Task,Stream模式。
并解决 Operation will NOT be retried. Current attempt 9 maxAttempts 9 Cumulative delay ,CosmosDB重试次数问题。

CrazyIter 5 лет назад
Родитель
Сommit
88d4b18673

+ 5 - 5
TEAMModelOS.SDK/Helper/Common/JsonHelper/JsonPatchHelper.cs

@@ -7,35 +7,35 @@ namespace System
 {
     public static class JsonPatchHelper
     {
-        public static T Add<T>(this T t, string path, object value) where T : class
+        public static T add<T>(this T t, string path, object value) where T : class
         {
             JsonPatchDocument<T> jsonPatch = new JsonPatchDocument<T>();
             jsonPatch.Operations.Add(new Operation<T>("add", path, null, value));
             jsonPatch.ApplyTo(t);
             return t;
         }
-        public static T Remove<T>(this T t, string path) where T : class
+        public static T remove<T>(this T t, string path) where T : class
         {
             JsonPatchDocument<T> jsonPatch = new JsonPatchDocument<T>();
             jsonPatch.Operations.Add(new Operation<T>("remove", path, null, null));
             jsonPatch.ApplyTo(t);
             return t;
         }
-        public static T Replace<T>(this T t, string path, object value) where T : class
+        public static T replace<T>(this T t, string path, object value) where T : class
         {
             JsonPatchDocument<T> jsonPatch = new JsonPatchDocument<T>();
             jsonPatch.Operations.Add(new Operation<T>("replace", path, null, value));
             jsonPatch.ApplyTo(t);
             return t;
         }
-        public static T Move<T>(this T t, string from, string path) where T : class
+        public static T move<T>(this T t, string from, string path) where T : class
         {
             JsonPatchDocument<T> jsonPatch = new JsonPatchDocument<T>();
             jsonPatch.Operations.Add(new Operation<T>("move", path, from));
             jsonPatch.ApplyTo(t);
             return t;
         }
-        public static T Copy<T>(this T t, string from, string path) where T : class
+        public static T copy<T>(this T t, string from, string path) where T : class
         {
             JsonPatchDocument<T> jsonPatch = new JsonPatchDocument<T>();
             jsonPatch.Operations.Add(new Operation<T>("copy", path, from));

+ 1 - 11
TEAMModelOS.SDK/Module/AzureCosmosDB/Implements/AzureCosmosDBRepository.cs

@@ -501,8 +501,6 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
                     {
                         do
                         {
-                            //try
-                            //{
                             bulkImportResponse = await bulkExecutor.BulkImportAsync(
                                 documents: documentsToImportInBatch,
                                 enableUpsert: true,
@@ -510,15 +508,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
                                 maxConcurrencyPerPartitionKeyRange: null,
                                 maxInMemorySortingBatchSize: null,
                                 cancellationToken: token);
-                            //}
-                            //catch (DocumentClientException de)
-                            //{
-                            //    break;
-                            //}
-                            //catch (Exception e)
-                            //{
-                            //    break;
-                            //}
+                            
                         } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
                         totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
                         totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;

+ 140 - 20
TEAMModelOS.SDK/Module/AzureCosmosDBV3/AzureCosmosDBV3Repository.cs

@@ -3,11 +3,14 @@ using Microsoft.Azure.Cosmos.Linq;
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
 using System.Linq;
 using System.Linq.Expressions;
 using System.Net;
 using System.Reflection;
 using System.Text;
+using System.Text.Json;
 using System.Threading.Tasks;
 using TEAMModelOS.SDK.Context.Attributes.Azure;
 using TEAMModelOS.SDK.Context.Exception;
@@ -233,22 +236,79 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
 
         public async Task DeleteAll<T>(List<KeyValuePair<string, string>> ids) where T : ID
         {
-            string partitionKey = GetPartitionKey<T>();
-            await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
-            {
-                Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
-            }));
+            Container container = await InitializeCollection<T>();
+            //string partitionKey = GetPartitionKey<T>();
+            //await Task.Run(() => Parallel.ForEach(ids, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
+            //{
+            //    Task.WaitAll(DeleteAsync<T>(item.Value, item.Key));
+            //}));
+
+
+            Stopwatch stopwatch = Stopwatch.StartNew();
+            List<Task> tasks = new List<Task>(ids.Count);
+            ids.ForEach(item => {
+                tasks.Add(container.DeleteItemStreamAsync(item.Value,new PartitionKey(item.Key))
+                    .ContinueWith((Task<ResponseMessage> task) =>
+                    {
+                        //using (ResponseMessage response = task.Result)
+                        //{
+                        //    if (!response.IsSuccessStatusCode)
+                        //    {
+                        //    }
+                        //}
+                    }
+                    ));
+            });
+            //await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
+            //{
+            //    Task.WaitAll(Save(item));
+            //}));
+            await Task.WhenAll(tasks);
+            stopwatch.Stop();
         }
 
         public async Task DeleteAll<T>(List<T> entities) where T : ID
         {
-            string partitionKey = GetPartitionKey<T>();
+            //string partitionKey = GetPartitionKey<T>();
+            //Type type = typeof(T);
+            //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
+            //{
+            //    object o = type.GetProperty(partitionKey).GetValue(item, null);
+            //    Task.WaitAll(DeleteAsync<T>(item.id, o.ToString()));
+            //}));
+
+            Container container = await InitializeCollection<T>();
+            string pk = GetPartitionKey<T>();
             Type type = typeof(T);
-            await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
-            {
-                object o = type.GetProperty(partitionKey).GetValue(item, null);
-                Task.WaitAll(DeleteAsync<T>(item.id, o.ToString()));
-            }));
+            List<KeyValuePair<PartitionKey, string>> itemsToInsert = new List<KeyValuePair<PartitionKey, string>>();
+            entities.ForEach( x => {
+                //MemoryStream stream = new MemoryStream();
+                //await JsonSerializer.SerializeAsync(stream, x);
+                object o = type.GetProperty(pk).GetValue(x, null);
+                KeyValuePair<PartitionKey, string> keyValue = new KeyValuePair<PartitionKey, string>(new PartitionKey(o.ToString()), x.id);
+                itemsToInsert.Add(keyValue);
+            });
+            Stopwatch stopwatch = Stopwatch.StartNew();
+            List<Task> tasks = new List<Task>(entities.Count);
+            itemsToInsert.ForEach(item => {
+                tasks.Add(container.DeleteItemStreamAsync(item.Value, item.Key)
+                    .ContinueWith((Task<ResponseMessage> task) =>
+                    {
+                    //using (ResponseMessage response = task.Result)
+                    //{
+                    //    if (!response.IsSuccessStatusCode)
+                    //    {
+                    //    }
+                    //}
+                    }
+                    ));
+            });
+            //await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
+            //{
+            //    Task.WaitAll(Save(item));
+            //}));
+            await Task.WhenAll(tasks);
+            stopwatch.Stop();
         }
         public async Task<T> DeleteAsync<T>(string id, string pk) where T : ID
         {
@@ -555,11 +615,38 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
 
         public async Task<List<T>> SaveAll<T>(List<T> enyites) where T : ID
         {
-
-            await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
-            {
-                Task.WaitAll(Save(item));
-            }));
+            Container container = await InitializeCollection<T>();
+            string pk= GetPartitionKey<T>();
+            Type type = typeof(T);
+            List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
+            enyites.ForEach(async x => {
+                MemoryStream stream = new MemoryStream();
+                await JsonSerializer.SerializeAsync(stream, x);
+                object o = type.GetProperty(pk).GetValue(x, null);
+                KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
+                itemsToInsert.Add(keyValue);
+            });
+            int count=0;
+            Stopwatch stopwatch = Stopwatch.StartNew();
+            List<Task> tasks = new List<Task>(enyites.Count);
+            itemsToInsert.ForEach(item => {
+            tasks.Add(container.CreateItemStreamAsync(item.Value, item.Key)
+                .ContinueWith((Task<ResponseMessage> task) =>
+                {
+                    using (ResponseMessage response = task.Result)
+                    {
+                        if (!response.IsSuccessStatusCode)
+                        {
+                            count++;
+                            Console.WriteLine(response.RequestMessage);
+                        }
+                    }
+                }
+                ));
+            });
+            await Task.WhenAll(tasks);
+            stopwatch.Stop();
+            Console.WriteLine(count);
             return enyites;
         }
 
@@ -572,10 +659,43 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
 
         public async Task<List<T>> UpdateAll<T>(List<T> entities) where T : ID
         {
-            await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
-            {
-                Task.WaitAll(Update(item));
-            }));
+            //await Task.Run(() => Parallel.ForEach(entities, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
+            //{
+            //    Task.WaitAll(Update(item));
+            //}));
+
+            Container container = await InitializeCollection<T>();
+            string pk = GetPartitionKey<T>();
+            Type type = typeof(T);
+            List<KeyValuePair<PartitionKey, Stream>> itemsToInsert = new List<KeyValuePair<PartitionKey, Stream>>();
+            entities.ForEach(async x => {
+                MemoryStream stream = new MemoryStream();
+                await JsonSerializer.SerializeAsync(stream, x);
+                object o = type.GetProperty(pk).GetValue(x, null);
+                KeyValuePair<PartitionKey, Stream> keyValue = new KeyValuePair<PartitionKey, Stream>(new PartitionKey(o.ToString()), stream);
+                itemsToInsert.Add(keyValue);
+            });
+            Stopwatch stopwatch = Stopwatch.StartNew();
+            List<Task> tasks = new List<Task>(entities.Count);
+            itemsToInsert.ForEach(item => {
+                tasks.Add(container.UpsertItemStreamAsync(item.Value, item.Key)
+                    .ContinueWith((Task<ResponseMessage> task) =>
+                    {
+                    //using (ResponseMessage response = task.Result)
+                    //{
+                    //    if (!response.IsSuccessStatusCode)
+                    //    {
+                    //    }
+                    //}
+                }
+                    ));
+            });
+            //await Task.Run(() => Parallel.ForEach(enyites, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (item) =>
+            //{
+            //    Task.WaitAll(Save(item));
+            //}));
+            await Task.WhenAll(tasks);
+            stopwatch.Stop();
             return entities;
         }
 

+ 28 - 4
TEAMModelOS/Controllers/Evaluation/ExamController.cs

@@ -115,9 +115,10 @@ namespace TEAMModelOS.Controllers.Evaluation
         /// <param name="request"></param>
         /// <returns></returns>
         [HttpPost("Manual")]
-        public async Task<BaseJosnRPCResponse> Manual(JosnRPCRequest<Compose> request) {
+        public async Task<BaseJosnRPCResponse> Manual(JosnRPCRequest<Dictionary<string,object>> request) {
             JsonRPCResponseBuilder builder = JsonRPCResponseBuilder.custom();
-            return builder.Data(null).build();
+            List<ItemInfo> items = await cosmosDBV3Repository.FindByDict<ItemInfo>(request.@params);
+            return builder.Data(items).build();
         }
         /// <summary>
         /// 自动组题
@@ -237,7 +238,7 @@ namespace TEAMModelOS.Controllers.Evaluation
                 }
             }
             itemInfos = itemInfos.OrderBy(x=>Guid.NewGuid()).ToList();
-            tempItems =tempItems.OrderBy(x => Guid.NewGuid()).ToList();
+            tempItems = tempItems.OrderBy(x => Guid.NewGuid()).ToList();
             foreach (TempItem temp in tempItems) {
                 ItemInfo itemInfo= itemInfos.Where(x => x.level == temp.level && x.type==temp.type).OrderBy(x => Guid.NewGuid()).FirstOrDefault();
                 if (itemInfo != null) {
@@ -245,7 +246,30 @@ namespace TEAMModelOS.Controllers.Evaluation
                     itemInfos.Remove(itemInfo);
                 }
             }
-            return builder.Data(retnInfos).build();
+            List<List<ItemInfo>>   listInfo = new  List<List<ItemInfo>> ();
+            foreach (IGrouping<string, ItemInfo> group in retnInfos.GroupBy(c => c.type))
+            {
+                Dictionary<string, object>  dictInfo = new   Dictionary<string, object>();
+                listInfo.Add(group.ToList());
+            }
+            List<Dictionary<string, object>> list = new List<Dictionary<string, object>>();
+            foreach (List<ItemInfo> infos in listInfo) {
+                List<Dictionary<string, object>> dict = new List<Dictionary<string, object>>();
+                foreach (IGrouping<int, ItemInfo> group in infos.GroupBy(c => c.level))
+                {
+                    Dictionary<string, object> dictInfo = new Dictionary<string, object>();
+                    dictInfo.Add("level", group.Key);
+                    dictInfo.Add("count", group.Count());
+                    dict.Add(dictInfo);
+                }
+                Dictionary<string, object> typeDict = new Dictionary<string, object>();
+                typeDict.Add("info", dict);
+                typeDict.Add("item", infos);
+                typeDict.Add("count", infos.Count);
+                typeDict.Add("type", infos.FirstOrDefault().type);
+                list.Add(typeDict);
+            }
+            return builder.Data(list).build();
         }
     }
 

+ 1 - 1
TEAMModelOS/Startup.cs

@@ -112,7 +112,7 @@ namespace TEAMModelOS
             services.AddAzureBlobStorage().AddConnection(Configuration.GetSection("Azure:Blob").Get<AzureBlobOptions>());
             //使用CosmosDB
             services.AddAzureCosmosDBV3().AddCosmosDBV3Connection(Configuration.GetSection("Azure:CosmosDB").Get<AzureCosmosDBOptions>())
-                .AddCosmosSerializer(new SystemTextJsonCosmosSerializer(new JsonSerializerOptions()));
+                .AddCosmosSerializer(new SystemTextJsonCosmosSerializer(new JsonSerializerOptions() { IgnoreNullValues=true}));
             //HttpContextAccessor,并用来访问HttpContext。
             services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
             //引入Jwt配置

+ 2 - 2
TEAMModelOS/appsettings.Development.json

@@ -16,8 +16,8 @@
       "Container": "teammodelos"
     },
     "CosmosDB": {
-      "ConnectionString": "https://192.168.8.128:8081",
-      "ConnectionKey": "ddwAeGSf8Lsf1kxPXmdqnyzzi3CkJ0KW2BTPZ7Zq1N7qbJic5j7AaQ+WbF86F3rnzuDgGM1yg8O7BUFo93iA8w==",
+      "ConnectionString": "https://teammodelos.documents.azure.cn:443/",
+      "ConnectionKey": "clF73GwPECfP1lKZTCvs8gLMMyCZig1HODFbhDUsarsAURO7TcOjVz6ZFfPqr1HzYrfjCXpMuVD5TlEG5bFGGg==",
       "Database": "TEAMModelOS",
       "CollectionThroughput": 400,
       "ScanModel": [ "TEAMModelOS.Service" ]