using Microsoft.Azure.Cosmos.Table; using Microsoft.Azure.Cosmos.Table.Queryable; using System; using System.Collections; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Linq; using Azure.Messaging.ServiceBus; using TEAMModelOS.SDK.Helper.Common.CollectionHelper; using Grpc.Extension.Common; using TEAMModelOS.SDK.Extension; namespace TEAMModelOS.SDK.DI { public static class AzureServiceBusExtensions { /// /// 發送信息至對列或主題 /// /// QueueName or TopicName /// 訊息 /// public static async Task SendMessageAsync(this ServiceBusClient client, string pathName, ServiceBusMessage message) { try { ServiceBusSender sender = client.CreateSender(pathName); await sender.SendMessageAsync(message); } catch { throw; } } /// /// 批量發送訊息至對列或主題,如果批量失敗返回False /// /// QueueName or TopicName /// 批量訊息 /// public static async Task SendBatchMessageAsync(this ServiceBusClient client, string pathName, IList messages) { try { ServiceBusSender sender = client.CreateSender(pathName); ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync(); foreach (var msg in messages) { if (!messageBatch.TryAddMessage(msg)) return false; } await sender.SendMessagesAsync(messageBatch); return true; } catch { return false; } } /// /// 發送信息至對列或主題(指定時間排程) /// /// QueueName or TopicName /// 訊息 /// 排程訊息的序列號。 public static async Task SendScheduleMessageAsync(this ServiceBusClient client, string pathName, ServiceBusMessage message, DateTimeOffset scheduleTime) { try { ServiceBusSender sender = client.CreateSender(pathName); return await sender.ScheduleMessageAsync(message, scheduleTime); } catch { throw; } } /// /// 批量發送訊息至對列或主題(指定時間排程),如果批量失敗返回False /// /// QueueName or TopicName /// 批量訊息 /// 排程訊息的序列號 public static async Task SendScheduleMessagesAsync(this ServiceBusClient client, string pathName, IList messages, DateTimeOffset scheduleTime) { try { ServiceBusSender sender = client.CreateSender(pathName); List msgs = new List() { }; foreach (var msg in messages) { msgs.Add(msg); } return await sender.ScheduleMessagesAsync(msgs, scheduleTime); } catch { return null; } } public static async Task SendLeamMessage(this ServiceBusClient client, string TopicName, string id, string pk, long startTime) { //微調代碼 var timer = DateTimeOffset.FromUnixTimeMilliseconds(startTime); /*if (type.Equals("start")) { if (timer.CompareTo(DateTimeOffset.UtcNow) < 0) { progress = "going"; } } else if(type.Equals("end")) { if (timer.CompareTo(DateTimeOffset.UtcNow) < 0) { progress = "finish"; } }*/ //设定开始时间 Dictionary dict = new Dictionary() { { "name",typeof(T).Name}, { "id",id}, { "code",pk} }; //var msgId = "1"; string messageBody = $"Message {dict}"; Console.WriteLine($"Sending message: {messageBody}"); long SequenceNumber = await client.SendScheduleMessageAsync(TopicName, new ServiceBusMessage(dict.ToJsonString()), timer); return SequenceNumber; } public static async Task cancelMessage(this ServiceBusClient client, string TopicName, long number) { try { ServiceBusSender sender = client.CreateSender(TopicName); await sender.CancelScheduledMessageAsync(number); } catch { throw; } } } }