Browse Source

優化 ServiceBusSender 依照不同名稱共享單一實例,移除關閉及釋放代碼,部分名稱調整

JAELYS 3 years ago
parent
commit
5790e1f8e5

+ 3 - 3
TEAMModelOS.FunctionV4/CosmosDB/TriggerCorrect.cs

@@ -34,7 +34,7 @@ namespace TEAMModelOS.FunctionV4
                 {
                     try
                     {
-                        await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
+                        await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
                         await table_cancel.DeleteSingle<ChangeRecord>(record.PartitionKey, record.RowKey);
                     }
                     catch (Exception)
@@ -77,7 +77,7 @@ namespace TEAMModelOS.FunctionV4
                                 long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageCorrect, DateTimeOffset.FromUnixTimeMilliseconds(tdata.startTime));
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), correctRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), correctRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {
@@ -366,7 +366,7 @@ namespace TEAMModelOS.FunctionV4
                                 long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageCorrectEnd, DateTimeOffset.FromUnixTimeMilliseconds(tdata.endTime));
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), correctRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), correctRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {

+ 3 - 3
TEAMModelOS.FunctionV4/CosmosDB/TriggerExam.cs

@@ -38,7 +38,7 @@ namespace TEAMModelOS.FunctionV4
                     {
                         try
                         {
-                            await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
+                            await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
                             await table_cancel.DeleteSingle<ChangeRecord>(record.PartitionKey, record.RowKey);
                         }
                         catch (Exception)
@@ -100,7 +100,7 @@ namespace TEAMModelOS.FunctionV4
                             {
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), records[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), records[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {
@@ -362,7 +362,7 @@ namespace TEAMModelOS.FunctionV4
                                     long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageEnd, DateTimeOffset.FromUnixTimeMilliseconds(data.endTime));
                                     try
                                     {
-                                        await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), records[0].sequenceNumber);
+                                        await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), records[0].sequenceNumber);
                                     }
                                     catch (Exception)
                                     {

+ 3 - 3
TEAMModelOS.FunctionV4/CosmosDB/TriggerExamLite.cs

@@ -36,7 +36,7 @@ namespace TEAMModelOS.FunctionV4
                     {
                         try
                         {
-                            await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
+                            await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
                             await table_cancel.DeleteSingle<ChangeRecord>(record.PartitionKey, record.RowKey);
                         }
                         catch (Exception)
@@ -75,7 +75,7 @@ namespace TEAMModelOS.FunctionV4
                             {
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {
@@ -158,7 +158,7 @@ namespace TEAMModelOS.FunctionV4
                                 long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageWorkEnd, DateTimeOffset.FromUnixTimeMilliseconds(tdata.endTime));
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {

+ 3 - 3
TEAMModelOS.FunctionV4/CosmosDB/TriggerHomework.cs

@@ -36,7 +36,7 @@ namespace TEAMModelOS.FunctionV4
                     {
                         try
                         {
-                            await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
+                            await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
                             await table_cancel.DeleteSingle<ChangeRecord>(record.PartitionKey, record.RowKey);
                         }
                         catch (Exception)
@@ -75,7 +75,7 @@ namespace TEAMModelOS.FunctionV4
                             {
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {
@@ -107,7 +107,7 @@ namespace TEAMModelOS.FunctionV4
                                 long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageWorkEnd, DateTimeOffset.FromUnixTimeMilliseconds(tdata.endTime));
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {

+ 3 - 3
TEAMModelOS.FunctionV4/CosmosDB/TriggerStudy.cs

@@ -36,7 +36,7 @@ namespace TEAMModelOS.FunctionV4
                     {
                         try
                         {
-                            await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
+                            await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
                             await table_cancel.DeleteSingle<ChangeRecord>(record.PartitionKey, record.RowKey);
                         }
                         catch (Exception ) {
@@ -75,7 +75,7 @@ namespace TEAMModelOS.FunctionV4
                             {
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {
@@ -164,7 +164,7 @@ namespace TEAMModelOS.FunctionV4
                                     long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageWorkEnd, DateTimeOffset.FromUnixTimeMilliseconds(tdata.endTime));
                                     try
                                     {
-                                        await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
+                                        await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
                                     }
                                     catch (Exception)
                                     {

+ 3 - 3
TEAMModelOS.FunctionV4/CosmosDB/TriggerSurvey.cs

@@ -44,7 +44,7 @@ namespace TEAMModelOS.FunctionV4
                     {
                         try
                         {
-                            await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
+                            await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
                             await table_cancel.DeleteSingle<ChangeRecord>(record.PartitionKey, record.RowKey);
                         }
                         catch (Exception)
@@ -83,7 +83,7 @@ namespace TEAMModelOS.FunctionV4
                             {
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {
@@ -278,7 +278,7 @@ namespace TEAMModelOS.FunctionV4
                                 long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageSurveyEnd, DateTimeOffset.FromUnixTimeMilliseconds(tdata.endTime));
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), changeRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {

+ 3 - 3
TEAMModelOS.FunctionV4/CosmosDB/TriggerVote.cs

@@ -41,7 +41,7 @@ namespace TEAMModelOS.FunctionV4
                     {
                         try
                         {
-                            await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
+                            await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), record.sequenceNumber);
                             await table_cancel.DeleteSingle<ChangeRecord>(record.PartitionKey, record.RowKey);
                         }
                         catch (Exception)
@@ -81,7 +81,7 @@ namespace TEAMModelOS.FunctionV4
                             {
                                 long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageVote, DateTimeOffset.FromUnixTimeMilliseconds(tdata.startTime));
                                 try {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), voteRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), voteRecords[0].sequenceNumber);
                                 } catch (Exception) { 
                                 }
                                 voteRecords[0].sequenceNumber = start;
@@ -282,7 +282,7 @@ namespace TEAMModelOS.FunctionV4
                                 long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), messageVoteEnd, DateTimeOffset.FromUnixTimeMilliseconds(tdata.endTime));
                                 try
                                 {
-                                    await _serviceBus.GetServiceBusClient().cancelMessage(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), voteRecords[0].sequenceNumber);
+                                    await _serviceBus.GetServiceBusClient().CancelMessageAsync(Environment.GetEnvironmentVariable("Azure:ServiceBus:ActiveTask"), voteRecords[0].sequenceNumber);
                                 }
                                 catch (Exception)
                                 {

+ 26 - 48
TEAMModelOS.SDK/DI/AzureServiceBus/AzureServiceBusExtensions.cs

@@ -10,29 +10,26 @@ using System.Linq;
 using Azure.Messaging.ServiceBus;
 using TEAMModelOS.SDK;
 using TEAMModelOS.SDK.Extension;
+using System.Collections.Concurrent;
 
 namespace TEAMModelOS.SDK.DI
 {
     public static class AzureServiceBusExtensions
     {
-
+        private static ConcurrentDictionary<string, ServiceBusSender> ServiceBusSenders { get; } = new ConcurrentDictionary<string, ServiceBusSender>();
+        
         /// <summary>
         /// 發送信息至對列或主題
         /// </summary>       
-        /// <param name="pathName">QueueName or TopicName</param>
+        /// <param name="name">QueueName or TopicName</param>
         /// <param name="message">訊息</param>
         /// <returns></returns>
-        public static async Task SendMessageAsync(this ServiceBusClient client, string pathName, ServiceBusMessage message)
+        public static async Task SendMessageAsync(this ServiceBusClient client, string name, ServiceBusMessage message)
         {
             try
             {
-                ServiceBusSender sender = client.CreateSender(pathName);
-                await sender.SendMessageAsync(message);
-                if (!sender.IsClosed)
-                {
-                    await sender.CloseAsync();
-                    await sender.DisposeAsync();
-                }
+                ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));                
+                await sender.SendMessageAsync(message);                
             }
             catch
             {
@@ -43,25 +40,20 @@ namespace TEAMModelOS.SDK.DI
         /// <summary>
         /// 批量發送訊息至對列或主題,如果批量失敗返回False
         /// </summary>       
-        /// <param name="pathName">QueueName or TopicName</param>
+        /// <param name="name">QueueName or TopicName</param>
         /// <param name="messages">批量訊息</param>
         /// <returns></returns>
-        public static async Task<bool> SendBatchMessageAsync(this ServiceBusClient client, string pathName, IList<ServiceBusMessage> messages)
+        public static async Task<bool> SendBatchMessageAsync(this ServiceBusClient client, string name, IList<ServiceBusMessage> messages)
         {
             try
             {
-                ServiceBusSender sender = client.CreateSender(pathName);
+                ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
                 ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
                 foreach (var msg in messages)
                 {
                     if (!messageBatch.TryAddMessage(msg)) return false;
                 }
-                await sender.SendMessagesAsync(messageBatch);
-                if (!sender.IsClosed)
-                {
-                    await sender.CloseAsync();
-                    await sender.DisposeAsync();
-                }
+                await sender.SendMessagesAsync(messageBatch);                
                 return true;
             }
             catch
@@ -73,19 +65,15 @@ namespace TEAMModelOS.SDK.DI
         /// <summary>
         /// 發送信息至對列或主題(指定時間排程)
         /// </summary>       
-        /// <param name="pathName">QueueName or TopicName</param>
+        /// <param name="name">QueueName or TopicName</param>
         /// <param name="message">訊息</param>
         /// <returns>排程訊息的序列號。</returns>
-        public static async Task<long> SendScheduleMessageAsync(this ServiceBusClient client, string pathName, ServiceBusMessage message, DateTimeOffset scheduleTime)
+        public static async Task<long> SendScheduleMessageAsync(this ServiceBusClient client, string name, ServiceBusMessage message, DateTimeOffset scheduleTime)
         {
             try
             {
-                ServiceBusSender sender = client.CreateSender(pathName);
-                long num= await sender.ScheduleMessageAsync(message, scheduleTime);
-                if (!sender.IsClosed) {
-                    await sender.CloseAsync();
-                    await sender.DisposeAsync();
-                }
+                ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
+                long num= await sender.ScheduleMessageAsync(message, scheduleTime);                
                 return num;
             }
             catch
@@ -97,25 +85,20 @@ namespace TEAMModelOS.SDK.DI
         /// <summary>
         /// 批量發送訊息至對列或主題(指定時間排程),如果批量失敗返回False
         /// </summary>       
-        /// <param name="pathName">QueueName or TopicName</param>
+        /// <param name="name">QueueName or TopicName</param>
         /// <param name="messages">批量訊息</param>
         /// <returns>排程訊息的序列號</returns>
-        public static async Task<IReadOnlyList<long>> SendScheduleMessagesAsync(this ServiceBusClient client, string pathName, IList<ServiceBusMessage> messages, DateTimeOffset scheduleTime)
+        public static async Task<IReadOnlyList<long>> SendScheduleMessagesAsync(this ServiceBusClient client, string name, IList<ServiceBusMessage> messages, DateTimeOffset scheduleTime)
         {
             try
             {
-                ServiceBusSender sender = client.CreateSender(pathName);
-                List<ServiceBusMessage> msgs = new List<ServiceBusMessage>() { };
+                ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
+                List<ServiceBusMessage> msgs = new() { };
                 foreach (var msg in messages)
                 {
                     msgs.Add(msg);
                 }
-                var nums= await sender.ScheduleMessagesAsync(msgs, scheduleTime);
-                if (!sender.IsClosed)
-                {
-                    await sender.CloseAsync();
-                    await sender.DisposeAsync();
-                }
+                var nums= await sender.ScheduleMessagesAsync(msgs, scheduleTime);                
                 return nums;
             }
             catch
@@ -125,7 +108,7 @@ namespace TEAMModelOS.SDK.DI
         }
 
 
-        public static async Task<long> SendLeamMessage<T>(this ServiceBusClient client, string TopicName, string id, string pk, long startTime)
+        public static async Task<long> SendLeamMessage<T>(this ServiceBusClient client, string name, string id, string pk, long startTime)
         {
             //微調代碼
             var timer = DateTimeOffset.FromUnixTimeMilliseconds(startTime);
@@ -146,7 +129,7 @@ namespace TEAMModelOS.SDK.DI
             
             
             //设定开始时间
-            Dictionary<string, object> dict = new Dictionary<string, object>() {
+            Dictionary<string, object> dict = new() {
                     { "name",typeof(T).Name},
                     { "id",id},
                     { "code",pk}
@@ -154,22 +137,17 @@ namespace TEAMModelOS.SDK.DI
             //var msgId = "1";
             string messageBody = $"Message {dict}";
 
-            long SequenceNumber = await client.SendScheduleMessageAsync(TopicName, new ServiceBusMessage(dict.ToJsonString()), timer);
+            long SequenceNumber = await client.SendScheduleMessageAsync(name, new ServiceBusMessage(dict.ToJsonString()), timer);
 
             return SequenceNumber;
         }
 
-        public static async Task cancelMessage(this ServiceBusClient client, string TopicName, long number)
+        public static async Task CancelMessageAsync(this ServiceBusClient client, string name, long number)
         {
             try
             {
-                ServiceBusSender sender = client.CreateSender(TopicName);
-                await sender.CancelScheduledMessageAsync(number);
-                if (!sender.IsClosed)
-                {
-                    await sender.CloseAsync();
-                    await sender.DisposeAsync();
-                }
+                ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
+                await sender.CancelScheduledMessageAsync(number);                
             }
             catch
             {