using Microsoft.Azure.Cosmos.Table;
using Microsoft.Azure.Cosmos.Table.Queryable;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using Azure.Messaging.ServiceBus;
using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
using Grpc.Extension.Common;
using TEAMModelOS.SDK.Extension;
namespace TEAMModelOS.SDK.DI
{
public static class AzureServiceBusExtensions
{
///
/// 發送信息至對列或主題
///
/// QueueName or TopicName
/// 訊息
///
public static async Task SendMessageAsync(this ServiceBusClient client, string pathName, ServiceBusMessage message)
{
try
{
ServiceBusSender sender = client.CreateSender(pathName);
await sender.SendMessageAsync(message);
}
catch
{
throw;
}
}
///
/// 批量發送訊息至對列或主題,如果批量失敗返回False
///
/// QueueName or TopicName
/// 批量訊息
///
public static async Task SendBatchMessageAsync(this ServiceBusClient client, string pathName, IList messages)
{
try
{
ServiceBusSender sender = client.CreateSender(pathName);
ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
foreach (var msg in messages)
{
if (!messageBatch.TryAddMessage(msg)) return false;
}
await sender.SendMessagesAsync(messageBatch);
return true;
}
catch
{
return false;
}
}
///
/// 發送信息至對列或主題(指定時間排程)
///
/// QueueName or TopicName
/// 訊息
/// 排程訊息的序列號。
public static async Task SendScheduleMessageAsync(this ServiceBusClient client, string pathName, ServiceBusMessage message, DateTimeOffset scheduleTime)
{
try
{
ServiceBusSender sender = client.CreateSender(pathName);
return await sender.ScheduleMessageAsync(message, scheduleTime);
}
catch
{
throw;
}
}
///
/// 批量發送訊息至對列或主題(指定時間排程),如果批量失敗返回False
///
/// QueueName or TopicName
/// 批量訊息
/// 排程訊息的序列號
public static async Task SendScheduleMessagesAsync(this ServiceBusClient client, string pathName, IList messages, DateTimeOffset scheduleTime)
{
try
{
ServiceBusSender sender = client.CreateSender(pathName);
List msgs = new List() { };
foreach (var msg in messages)
{
msgs.Add(msg);
}
return await sender.ScheduleMessagesAsync(msgs, scheduleTime);
}
catch
{
return null;
}
}
public static async Task SendLeamMessage(this ServiceBusClient client, string TopicName, string id, string pk, long startTime)
{
//微調代碼
var timer = DateTimeOffset.FromUnixTimeMilliseconds(startTime);
/*if (type.Equals("start"))
{
if (timer.CompareTo(DateTimeOffset.UtcNow) < 0)
{
progress = "going";
}
}
else if(type.Equals("end"))
{
if (timer.CompareTo(DateTimeOffset.UtcNow) < 0)
{
progress = "finish";
}
}*/
//设定开始时间
Dictionary dict = new Dictionary() {
{ "name",typeof(T).Name},
{ "id",id},
{ "code",pk}
};
//var msgId = "1";
string messageBody = $"Message {dict}";
Console.WriteLine($"Sending message: {messageBody}");
long SequenceNumber = await client.SendScheduleMessageAsync(TopicName, new ServiceBusMessage(dict.ToJsonString()), timer);
return SequenceNumber;
}
public static async Task cancelMessage(this ServiceBusClient client, string TopicName, long number)
{
try
{
ServiceBusSender sender = client.CreateSender(TopicName);
await sender.CancelScheduledMessageAsync(number);
}
catch
{
throw;
}
}
}
}