瀏覽代碼

消息推送

zhouj1203@hotmail.com 4 年之前
父節點
當前提交
45ae88ef5e

+ 94 - 0
TEAMModelOS.Service/Services/Learn/Implements/ServiceBusService.cs

@@ -0,0 +1,94 @@
+using Microsoft.Azure.ServiceBus;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using TEAMModelOS.SDK.Helper.Common.DateTimeHelper;
+using TEAMModelOS.SDK.Helper.Common.JsonHelper;
+using TEAMModelOS.SDK.Module.AzureServiceBus;
+using TEAMModelOS.Service.Services.Learn.Interfaces;
+
+namespace TEAMModelOS.Service.Services.Learn.Implements
+{
+    
+    public class ServiceBusService : IServiceBusService
+    {
+        private readonly IAzureServiceBusService _serviceBus;
+        private static ISubscriptionClient subscriptionClient;
+        public ServiceBusService(IAzureServiceBusService azureServiceBus)
+        {
+            _serviceBus = azureServiceBus;
+        }
+
+        public async Task ReciveMessageAsync<T>(string TopicName)
+        {
+            subscriptionClient = _serviceBus.GetSubClient(TopicName).subscriptionClient;
+            RegisterOnMessageHandlerAndReceiveMessages();
+            await subscriptionClient.CloseAsync();
+            //return "";
+        }
+
+        public static void RegisterOnMessageHandlerAndReceiveMessages() {
+            // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
+            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
+            {
+                // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
+                // Set it according to how many messages the application wants to process in parallel.
+                MaxConcurrentCalls = 1,
+
+                // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
+                // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
+                AutoComplete = false
+            };
+            subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
+
+        }
+        public static async Task ProcessMessagesAsync(Message message, CancellationToken token)
+        {
+            // Process the message.
+            Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
+
+            // Complete the message so that it is not received again.
+            // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
+            await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
+            //return message;
+
+            // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
+            // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
+            // to avoid unnecessary exceptions.
+        }
+
+        public static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
+        {
+            Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
+            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
+            Console.WriteLine("Exception context for troubleshooting:");
+            Console.WriteLine($"- Endpoint: {context.Endpoint}");
+            Console.WriteLine($"- Entity Path: {context.EntityPath}");
+            Console.WriteLine($"- Executing Action: {context.Action}");
+            return Task.CompletedTask;
+        }
+
+        public async Task<long> SendMessage<T>(string TopicName, string info, long startTime)
+        {
+            ITopicClient topicClient = _serviceBus.GetTopClient(TopicName).topicClient;
+            //设定开始时间
+            Dictionary<string, object> dict = new Dictionary<string, object>() {
+                    { "name",typeof(T).Name},
+                    { "info",info},
+                    { "status",200}
+                };
+            var message = new Message(Encoding.UTF8.GetBytes(dict.ToApiJson()));
+            long time = startTime - new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
+            if (time <= 0)
+            {
+                return -1;
+            }
+            long SequenceNumber = await topicClient.ScheduleMessageAsync(message, new DateTimeOffset(DateTimeHelper.ConvertToDateTime(startTime)));
+            await topicClient.SendAsync(message);
+            await topicClient.CloseAsync();
+            return SequenceNumber;
+        }
+    }
+}

+ 16 - 0
TEAMModelOS.Service/Services/Learn/Interfaces/IServiceBusService.cs

@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using TEAMModelOS.SDK.Context.Configuration;
+
+namespace TEAMModelOS.Service.Services.Learn.Interfaces
+{
+    public interface IServiceBusService : IBusinessService
+    {
+        public Task<long> SendMessage<T>(string TopicName, string info, long startTime);
+
+        public Task ReciveMessageAsync<T>(string TopicName);
+
+    }
+}

+ 8 - 17
TEAMModelOS/Controllers/Task/HomeworkController.cs

@@ -17,6 +17,7 @@ using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
 using TEAMModelOS.SDK.Helper.Common.DateTimeHelper;
 using TEAMModelOS.SDK.Helper.Common.JsonHelper;
 using TEAMModelOS.SDK.Module.AzureCosmosDBV3;
+using TEAMModelOS.SDK.Module.AzureServiceBus;
 using TEAMModelOS.Service.Models;
 using TEAMModelOS.Service.Services.Learn;
 using TEAMModelOS.Service.Services.Learn.Interfaces;
@@ -32,13 +33,15 @@ namespace TEAMModelOS.Controllers.Learn
     {
         private readonly IAzureCosmosDBV3Repository _cosmos;
         private readonly ITimerWorkService _timerWorkService;
-        private static ITopicClient topicClient;
+        private readonly IServiceBusService _serviceBus;
+        //private static ITopicClient topicClient;
         readonly string ServiceBusConnectionString = BaseConfigModel.Configuration["HaBookAuth:ServiceBus"];
 
-        public HomeworkController(IAzureCosmosDBV3Repository cosmos, ITimerWorkService timerWorkService)
+        public HomeworkController(IAzureCosmosDBV3Repository cosmos, ITimerWorkService timerWorkService, IServiceBusService serviceBus)
         {
             _cosmos = cosmos;
             _timerWorkService = timerWorkService;
+            _serviceBus = serviceBus;
         }
 
 
@@ -96,8 +99,7 @@ namespace TEAMModelOS.Controllers.Learn
         [HttpPost("upsert")]
         public async Task<BaseJosnRPCResponse> Upsert(JosnRPCRequest<HomeworkDto> request)
         {
-            JsonRPCResponseBuilder builder = JsonRPCResponseBuilder.custom();           
-            topicClient = new TopicClient(ServiceBusConnectionString, Constants.TopicName);
+            JsonRPCResponseBuilder builder = JsonRPCResponseBuilder.custom();
             //新增
             if (string.IsNullOrEmpty(request.@params.homeWork.id))
 
@@ -117,19 +119,8 @@ namespace TEAMModelOS.Controllers.Learn
             else if (request.@params.homeWork.publishModel.Equals("1"))
             {
                 //TimerWork<HomeWork>(request.@params.homeWork.startTime,new Dictionary<string, object> { { "id", request.@params.homeWork.id } });
-                //设定开始时间
-                Dictionary<string, object> dict = new Dictionary<string, object>() {
-                    { "name",typeof(Homework).Name},
-                    { "id",request.@params.homeWork.id},
-                    { "status",200}
-                };
-                var message = new Message(Encoding.UTF8.GetBytes(dict.ToApiJson()));
-                long time = request.@params.homeWork.startTime - new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
-                if (time <= 0) {
-                    return builder.Data("请重新设置时间").build();
-                }
-                await topicClient.ScheduleMessageAsync(message, new DateTimeOffset(DateTimeHelper.ConvertToDateTime(request.@params.homeWork.startTime)));
-                await topicClient.SendAsync(message);
+                //设定开始时间               
+                long SequenceNumber =  await _serviceBus.SendMessage<Homework>(Constants.TopicName, request.@params.homeWork.id, request.@params.homeWork.startTime);
                 _timerWorkService.TimerWork<Homework>(request.@params.homeWork.startTime,200, new Dictionary<string, object> { { "id", request.@params.homeWork.id } });
             }
 

+ 1 - 4
TEAMModelOS/Models/Constants.cs

@@ -1,7 +1,4 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
+
 
 namespace TEAMModelOS.Models
 {