using System; using System.Collections; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Linq; using Azure.Messaging.ServiceBus; using TEAMModelOS.SDK; using TEAMModelOS.SDK.Extension; using System.Collections.Concurrent; namespace TEAMModelOS.SDK.DI { public static class AzureServiceBusExtensions { private static ConcurrentDictionary ServiceBusSenders { get; } = new ConcurrentDictionary(); /// /// 發送信息至對列或主題 /// /// QueueName or TopicName /// 訊息 /// public static async Task SendMessageAsync(this ServiceBusClient client, string name, ServiceBusMessage message) { try { ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name)); await sender.SendMessageAsync(message); } catch { throw; } } /// /// 批量發送訊息至對列或主題,如果批量失敗返回False /// /// QueueName or TopicName /// 批量訊息 /// public static async Task SendBatchMessageAsync(this ServiceBusClient client, string name, IList messages) { try { ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name)); ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync(); foreach (var msg in messages) { if (!messageBatch.TryAddMessage(msg)) return false; } await sender.SendMessagesAsync(messageBatch); return true; } catch { return false; } } /// /// 發送信息至對列或主題(指定時間排程) /// /// QueueName or TopicName /// 訊息 /// 排程訊息的序列號。 public static async Task SendScheduleMessageAsync(this ServiceBusClient client, string name, ServiceBusMessage message, DateTimeOffset scheduleTime) { try { ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name)); long num= await sender.ScheduleMessageAsync(message, scheduleTime); return num; } catch { throw; } } /// /// 批量發送訊息至對列或主題(指定時間排程),如果批量失敗返回False /// /// QueueName or TopicName /// 批量訊息 /// 排程訊息的序列號 public static async Task> SendScheduleMessagesAsync(this ServiceBusClient client, string name, IList messages, DateTimeOffset scheduleTime) { try { ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name)); List msgs = new() { }; foreach (var msg in messages) { msgs.Add(msg); } var nums= await sender.ScheduleMessagesAsync(msgs, scheduleTime); return nums; } catch { return null; } } public static async Task SendLeamMessage(this ServiceBusClient client, string name, string id, string pk, long startTime) { //微調代碼 var timer = DateTimeOffset.FromUnixTimeMilliseconds(startTime); /*if (type.Equals("start")) { if (timer.CompareTo(DateTimeOffset.UtcNow) < 0) { progress = "going"; } } else if(type.Equals("end")) { if (timer.CompareTo(DateTimeOffset.UtcNow) < 0) { progress = "finish"; } }*/ //设定开始时间 Dictionary dict = new() { { "name",typeof(T).Name}, { "id",id}, { "code",pk} }; //var msgId = "1"; string messageBody = $"Message {dict}"; long SequenceNumber = await client.SendScheduleMessageAsync(name, new ServiceBusMessage(dict.ToJsonString()), timer); return SequenceNumber; } public static async Task CancelMessageAsync(this ServiceBusClient client, string name, long number) { try { ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name)); await sender.CancelScheduledMessageAsync(number); } catch { throw; } } } }