Procházet zdrojové kódy

消息推送以及接受底层方法调整

zhouj1203@hotmail.com před 4 roky
rodič
revize
c24a307900

+ 6 - 4
TEAMModelOS.SDK/Helper/Common/DateTimeHelper/DateTimeHelper.cs

@@ -7,15 +7,17 @@ namespace TEAMModelOS.SDK.Helper.Common.DateTimeHelper
     public static class DateTimeHelper
     {
         /// <summary>
-        /// 时间戳转换为日期(时间戳单位秒)
+        /// 时间戳转换为日期(时间戳单位秒)
         /// </summary>
         /// <param name="TimeStamp"></param>
         /// <returns></returns>
         public static DateTime ConvertToDateTime(long timeStamp)
         {
-            var dtStart = TimeZoneInfo.ConvertTimeToUtc(new DateTime(1970, 1, 1));
-            TimeSpan toNow = new TimeSpan(timeStamp);
-            return dtStart.Add(toNow);
+            DateTime dtStart = TimeZoneInfo.ConvertTime(new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc), TimeZoneInfo.Local);
+            return dtStart.AddMilliseconds(timeStamp);
+            /* var dtStart = TimeZoneInfo.ConvertTimeToUtc(new DateTime(1970, 1, 1));
+             TimeSpan toNow = new TimeSpan(timeStamp);
+             return dtStart.Add(toNow);*/
             //var start = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
             //return start.AddMilliseconds(timeStamp).AddHours(8);
         }

+ 61 - 4
TEAMModelOS.SDK/Module/AzureCosmosDBV3/AzureCosmosDBV3Repository.cs

@@ -120,7 +120,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
             {
                 foreach (ContainerProperties container in await resultSetIterator.ReadNextAsync())
                 {
-                   CosmosDict.nameCosmos.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), cache = false, monitor = false });
+                   CosmosDict.nameCosmos.TryAdd(container.Id, new CosmosModelInfo { container = database.GetContainer(container.Id), partitionKey= container.PartitionKeyPath.Replace("/",""), cache = false, monitor = false });
                 }
             }
             bool isMonitor = false;
@@ -172,7 +172,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
                     {
                         await CosmosClient.GetDatabase(DatabaseId).GetContainer(cosmosModelInfo.container.Id).ReplaceThroughputAsync(RU);
                     }
-                    CosmosModelInfo cosmos = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type };
+                    CosmosModelInfo cosmos = new CosmosModelInfo { container = container, cache = cache, monitor = monitor, type = type,partitionKey=PartitionKey };
                     CosmosDict.nameCosmos[CollectionName] = cosmos;
                     CosmosDict.typeCosmos.Add(type.Name, cosmos);
                 }
@@ -187,7 +187,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
                         CollectionThroughput = RU;
                     }
                     Container containerWithConsistentIndexing = await database.CreateContainerIfNotExistsAsync(containerProperties, throughput: CollectionThroughput);
-                    CosmosModelInfo cosmos = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type };
+                    CosmosModelInfo cosmos = new CosmosModelInfo { container = containerWithConsistentIndexing, cache = cache, monitor = monitor, type = type, partitionKey = PartitionKey };
                     CosmosDict.nameCosmos[CollectionName] = cosmos;
                     CosmosDict.typeCosmos.Add(type.Name, cosmos);
                 }
@@ -196,7 +196,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
             {
                 ContainerProperties leaseProperties = new ContainerProperties { Id = leaseId, PartitionKeyPath = "/id", DefaultTimeToLive = -1 };
                 Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseProperties, throughput: CollectionThroughput);
-                CosmosDict.nameCosmos.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false });
+                CosmosDict.nameCosmos.TryAdd(leaseId, new CosmosModelInfo { container = leaseContainer, cache = false, monitor = false, partitionKey = "/id" });
             }
             return CosmosDict;
         }
@@ -970,6 +970,63 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
             stopwatch.Stop();
             return enyites;
         }
+        public async Task<List<dynamic>> UpdateAll (string typeName,List<dynamic> enyites)
+        {
+            int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
+            if (CosmosDict.typeCosmos.TryGetValue(typeName, out CosmosModelInfo container)) { 
+            }
+                bool flag = false;
+            if (RedisHelper.Exists(CacheCosmosPrefix + container.container.Id))
+            {
+                flag = true;
+            }
+            string partitionKey = container.partitionKey;
+            
+            Stopwatch stopwatch = Stopwatch.StartNew();
+            for (int i = 0; i < pages; i++)
+            {
+                List<dynamic> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
+                List<Item> itemsToInsert = new List<Item>();
+                lists.ForEach(async x =>
+                {
+                    x.pk = typeName;
+                    x.ttl = null;
+                    MemoryStream stream = new MemoryStream();
+                    await JsonSerializer.SerializeAsync(stream, x, new JsonSerializerOptions { IgnoreNullValues = true });
+                    object o = x[partitionKey];
+                    Item keyValue = new Item { id = x.id, pk = o.ToString(), stream = stream };
+                    itemsToInsert.Add(keyValue);
+                });
+                List<Task> tasks = new List<Task>(lists.Count);
+                itemsToInsert.ForEach(item =>
+                {
+                    tasks.Add(container.container.ReplaceItemStreamAsync(item.stream, item.id, new PartitionKey(item.pk))
+                        .ContinueWith((Task<ResponseMessage> task) =>
+                        {
+                            //using (ResponseMessage response = task.Result)
+                            //{
+                            //    if (!response.IsSuccessStatusCode)
+                            //    {
+                            //    }
+                            //}
+                        }
+                        ));
+                });
+                await Task.WhenAll(tasks);
+                if (container.cache && RedisHelper.Instance != null)
+                {
+                    lists.ForEach(async x => {
+                        await RedisHelper.HSetAsync(CacheCosmosPrefix + container.container.Id, x.id, x);
+                    });
+                }
+            }
+            if (container.cache && RedisHelper.Instance != null && !flag)
+            {
+                await RedisHelper.ExpireAsync(CacheCosmosPrefix + container.container.Id, timeoutSeconds);
+            }
+            stopwatch.Stop();
+            return enyites;
+        }
         public async Task<T> Update<T>(T entity) where T : ID
         {
             CosmosModelInfo container = await InitializeCollection<T>();

+ 1 - 0
TEAMModelOS.SDK/Module/AzureCosmosDBV3/CosmosModelInfo.cs

@@ -11,5 +11,6 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
         public bool cache { get; set; }
         public bool monitor { get; set; } = false;
         public Type type { get; set; }
+        public string partitionKey { get; set; }
     }
 }

+ 1 - 0
TEAMModelOS.SDK/Module/AzureCosmosDBV3/IAzureCosmosDBV3Repository.cs

@@ -34,6 +34,7 @@ namespace TEAMModelOS.SDK.Module.AzureCosmosDBV3
         Task<List<IdPk>> DeleteAll<T>(Dictionary<string, object> dict) where T : ID;
         Task<T> Update<T>(T entity) where T : ID;
         Task<List<T>> UpdateAll<T>(List<T> entities) where T : ID;
+        Task<List<dynamic>> UpdateAll(string typeName,List<dynamic> entities) ;
         Task<T> SaveOrUpdate<T>(T entity) where T : ID;
         Task<List<T>> SaveOrUpdateAll<T>(List<T> entities) where T : ID;
      //   Task<T> FindById<T>(string id) where T : ID;

+ 4 - 1
TEAMModelOS.Service/Models/TeacherInfo/Homework.cs

@@ -61,8 +61,11 @@ namespace TEAMModelOS.Service.Models
         /// </summary>
         [ProtoMember(7)]
         public long endTime { get; set; }
+        //记录ServiceBus每条消息的编号
+
+        public long sequenceNumber { get; set; }
+
 
-        
 
 
 

+ 96 - 0
TEAMModelOS.Service/Services/Learn/Implements/ServiceBusReviceService.cs

@@ -0,0 +1,96 @@
+using Microsoft.Azure.ServiceBus;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using TEAMModelOS.SDK.Context.Configuration;
+using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
+using TEAMModelOS.SDK.Helper.Common.JsonHelper;
+using TEAMModelOS.SDK.Module.AzureCosmosDBV3;
+using TEAMModelOS.SDK.Module.AzureServiceBus;
+using TEAMModelOS.Service.Services.Learn.Interfaces;
+
+namespace TEAMModelOS.Service.Services.Learn.Implements
+{
+    public class ServiceBusReviceService : IServiceBusReviceService
+    {
+        private readonly IAzureServiceBusService _serviceBus;
+        private readonly IAzureCosmosDBV3Repository _cosmos;
+        private static ISubscriptionClient subscriptionClient;
+        //private readonly string Topic = BaseConfigModel.Configuration["HaBookAuth:ServiceBus:Topics"];
+        public ServiceBusReviceService(IAzureServiceBusService azureServiceBus, IAzureCosmosDBV3Repository cosmos)
+        {
+            _serviceBus = azureServiceBus;
+            _cosmos = cosmos;
+        }
+
+        public async Task ReciveMessageAsync()
+        {
+            string SubName = "test_topic_ReciveTask";
+            subscriptionClient = _serviceBus.GetSubClient(SubName).subscriptionClient;
+            RegisterOnMessageHandlerAndReceiveMessages();
+            await subscriptionClient.CloseAsync();
+            //return "";
+        }
+        public void RegisterOnMessageHandlerAndReceiveMessages()
+        {
+            // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
+            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
+            {
+                // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
+                // Set it according to how many messages the application wants to process in parallel.
+                MaxConcurrentCalls = 1,
+
+                // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
+                // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
+                AutoComplete = false
+            };
+            subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
+
+        }
+        public async Task ProcessMessagesAsync(Message message, CancellationToken token)
+        {
+            // Process the message.
+            Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
+
+            // Complete the message so that it is not received again.
+            // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
+            await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
+            Dictionary<string, object> dict = Encoding.UTF8.GetString(message.Body).FromApiJson<Dictionary<string, object>>();
+            Dictionary<string, object> obj = new Dictionary<string, object>();
+            dict.TryGetValue("id", out object info);
+            dict.TryGetValue("name", out object name);
+            dict.TryGetValue("status", out object status);
+            obj.Add("id", info);
+
+            var bus = await _cosmos.FindByDict(name.ToString(), obj);
+            if (bus.IsNotEmpty())
+            {
+                PropertyInfo propertyInfo = bus[0].GetType().GetProperty("status");
+                for (int i = 0; i < bus.Count; i++)
+                    propertyInfo.SetValue(bus[i], status);
+                await _cosmos.UpdateAll(name.ToString(), bus);
+
+            }
+            //return message;
+
+            // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
+            // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
+            // to avoid unnecessary exceptions.
+        }
+
+        public static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
+        {
+            Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
+            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
+            Console.WriteLine("Exception context for troubleshooting:");
+            Console.WriteLine($"- Endpoint: {context.Endpoint}");
+            Console.WriteLine($"- Entity Path: {context.EntityPath}");
+            Console.WriteLine($"- Executing Action: {context.Action}");
+            return Task.CompletedTask;
+        }
+    }
+}

+ 10 - 55
TEAMModelOS.Service/Services/Learn/Implements/ServiceBusService.cs

@@ -1,11 +1,14 @@
 using Microsoft.Azure.ServiceBus;
 using System;
 using System.Collections.Generic;
+using System.Reflection;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
 using TEAMModelOS.SDK.Helper.Common.DateTimeHelper;
 using TEAMModelOS.SDK.Helper.Common.JsonHelper;
+using TEAMModelOS.SDK.Module.AzureCosmosDBV3;
 using TEAMModelOS.SDK.Module.AzureServiceBus;
 using TEAMModelOS.Service.Services.Learn.Interfaces;
 
@@ -15,69 +18,19 @@ namespace TEAMModelOS.Service.Services.Learn.Implements
     public class ServiceBusService : IServiceBusService
     {
         private readonly IAzureServiceBusService _serviceBus;
-        private static ISubscriptionClient subscriptionClient;
         public ServiceBusService(IAzureServiceBusService azureServiceBus)
         {
             _serviceBus = azureServiceBus;
-        }
-
-        public async Task ReciveMessageAsync<T>(string TopicName)
-        {
-            subscriptionClient = _serviceBus.GetSubClient(TopicName).subscriptionClient;
-            RegisterOnMessageHandlerAndReceiveMessages();
-            await subscriptionClient.CloseAsync();
-            //return "";
-        }
-
-        public static void RegisterOnMessageHandlerAndReceiveMessages() {
-            // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
-            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
-            {
-                // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
-                // Set it according to how many messages the application wants to process in parallel.
-                MaxConcurrentCalls = 1,
-
-                // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
-                // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
-                AutoComplete = false
-            };
-            subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
-
-        }
-        public static async Task ProcessMessagesAsync(Message message, CancellationToken token)
-        {
-            // Process the message.
-            Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
-
-            // Complete the message so that it is not received again.
-            // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
-            await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
-            //return message;
-
-            // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
-            // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
-            // to avoid unnecessary exceptions.
-        }
-
-        public static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
-        {
-            Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
-            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
-            Console.WriteLine("Exception context for troubleshooting:");
-            Console.WriteLine($"- Endpoint: {context.Endpoint}");
-            Console.WriteLine($"- Entity Path: {context.EntityPath}");
-            Console.WriteLine($"- Executing Action: {context.Action}");
-            return Task.CompletedTask;
-        }
+        }        
 
-        public async Task<long> SendMessage<T>(string TopicName, string info, long startTime)
+        public async Task<long> SendMessage<T>(string TopicName, string info, long startTime, int status)
         {
             ITopicClient topicClient = _serviceBus.GetTopClient(TopicName).topicClient;
             //设定开始时间
             Dictionary<string, object> dict = new Dictionary<string, object>() {
                     { "name",typeof(T).Name},
                     { "info",info},
-                    { "status",200}
+                    { "status",status}
                 };
             var message = new Message(Encoding.UTF8.GetBytes(dict.ToApiJson()));
             long time = startTime - new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
@@ -85,9 +38,11 @@ namespace TEAMModelOS.Service.Services.Learn.Implements
             {
                 return -1;
             }
-            long SequenceNumber = await topicClient.ScheduleMessageAsync(message, new DateTimeOffset(DateTimeHelper.ConvertToDateTime(startTime)));
+            message.MessageId = Guid.NewGuid().ToString();
+            DateTime timer = DateTimeHelper.ConvertToDateTime(startTime);
+            long SequenceNumber = await topicClient.ScheduleMessageAsync(message, new DateTimeOffset(timer));
             await topicClient.SendAsync(message);
-            await topicClient.CloseAsync();
+            //await topicClient.CloseAsync();
             return SequenceNumber;
         }
     }

+ 13 - 0
TEAMModelOS.Service/Services/Learn/Interfaces/IServiceBusReviceService.cs

@@ -0,0 +1,13 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using TEAMModelOS.SDK.Context.Configuration;
+
+namespace TEAMModelOS.Service.Services.Learn.Interfaces
+{
+    public interface IServiceBusReviceService
+    {
+        public Task ReciveMessageAsync();
+    }
+}

+ 2 - 2
TEAMModelOS.Service/Services/Learn/Interfaces/IServiceBusService.cs

@@ -8,9 +8,9 @@ namespace TEAMModelOS.Service.Services.Learn.Interfaces
 {
     public interface IServiceBusService : IBusinessService
     {
-        public Task<long> SendMessage<T>(string TopicName, string info, long startTime);
+        public Task<long> SendMessage<T>(string TopicName, string info, long startTime,int status);
 
-        public Task ReciveMessageAsync<T>(string TopicName);
+  
 
     }
 }

+ 5 - 0
TEAMModelOS.Service/TEAMModelOS.Model.xml

@@ -1015,6 +1015,11 @@
             助教
             </summary>
         </member>
+        <member name="P:TEAMModelOS.Service.Models.CoursePlan.semesterCode">
+            <summary>
+            学期代码
+            </summary>
+        </member>
         <member name="P:TEAMModelOS.Service.Models.Assistant.TEAMModelId">
             <summary>
             助教 醍摩豆id

+ 6 - 3
TEAMModelOS/Controllers/Task/HomeworkController.cs

@@ -120,8 +120,10 @@ namespace TEAMModelOS.Controllers.Learn
             {
                 //TimerWork<HomeWork>(request.@params.homeWork.startTime,new Dictionary<string, object> { { "id", request.@params.homeWork.id } });
                 //设定开始时间               
-                long SequenceNumber =  await _serviceBus.SendMessage<Homework>(Constants.TopicName, request.@params.homeWork.id, request.@params.homeWork.startTime);
-                _timerWorkService.TimerWork<Homework>(request.@params.homeWork.startTime,200, new Dictionary<string, object> { { "id", request.@params.homeWork.id } });
+                long SequenceNumber =  await _serviceBus.SendMessage<Homework>(Constants.TopicName, request.@params.homeWork.id, request.@params.homeWork.startTime,200);
+                request.@params.homeWork.sequenceNumber = SequenceNumber;
+                //await _serviceBus.ReciveMessageAsync<Homework>(Constants.SubName);
+                //_timerWorkService.TimerWork<Homework>(request.@params.homeWork.startTime,200, new Dictionary<string, object> { { "id", request.@params.homeWork.id } });
             }
 
             if (request.@params.homeWork.status == 0) {
@@ -133,7 +135,8 @@ namespace TEAMModelOS.Controllers.Learn
             Homework homeWork = await _cosmos.SaveOrUpdate<Homework>(request.@params.homeWork);
 
             //设定结束时间
-            _timerWorkService.TimerWork<Homework>(request.@params.homeWork.endTime, 300, new Dictionary<string, object> { { "id", request.@params.homeWork.id } });
+            await _serviceBus.SendMessage<Homework>(Constants.TopicName, request.@params.homeWork.id, request.@params.homeWork.endTime,300);
+            //_timerWorkService.TimerWork<Homework>(request.@params.homeWork.endTime, 300, new Dictionary<string, object> { { "id", request.@params.homeWork.id } });
             //清除作业
             if (request.@params.reset)
             {

+ 1 - 0
TEAMModelOS/Models/Constants.cs

@@ -5,5 +5,6 @@ namespace TEAMModelOS.Models
     public static class Constants
     {
         public const string TopicName = "test_topic_ActiveTask";
+        public const string SubName = "test_topic_ReciveTask";
     }
 }

+ 4 - 1
TEAMModelOS/Startup.cs

@@ -28,6 +28,8 @@ using TEAMModelOS.SDK.Module.AzureServiceBus;
 using TEAMModelOS.SDK.Module.AzureTable.Implements;
 using TEAMModelOS.SDK.Module.AzureTable.Interfaces;
 using TEAMModelOS.Service.Services.ChangeFeed;
+using TEAMModelOS.Service.Services.Learn.Implements;
+using TEAMModelOS.Service.Services.Learn.Interfaces;
 using VueCliMiddleware;
 
 namespace TEAMModelOS
@@ -122,8 +124,9 @@ namespace TEAMModelOS
             services.AddServiceBus().AddServiceBusOptions(Configuration.GetSection("HaBookAuth:ServiceBus").Get<AzureServiceBusOptions>());
             //HttpContextAccessor,并用来访问HttpContext。
             services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
+            services.AddSingleton<IServiceBusReviceService, ServiceBusReviceService>();
             //引入Jwt配置
-           //services.JwtAuth(Configuration.GetSection("JwtSetting"));
+            //services.JwtAuth(Configuration.GetSection("JwtSetting"));
             //注入CSRedis
             var csredis = new CSRedis.CSRedisClient(Configuration.GetSection("Azure:Redis:ConnectionString").Get<string>());
            // CSRedis.CSRedisClient.Serialize = obj =>System.Text.Json.JsonSerializer.Serialize(obj);