AzureServiceBusExtensions.cs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. using Microsoft.Azure.Cosmos.Table;
  2. using Microsoft.Azure.Cosmos.Table.Queryable;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using System.Linq;
  10. using Azure.Messaging.ServiceBus;
  11. using TEAMModelOS.SDK;
  12. using TEAMModelOS.SDK.Extension;
  13. namespace TEAMModelOS.SDK.DI
  14. {
  15. public static class AzureServiceBusExtensions
  16. {
  17. /// <summary>
  18. /// 發送信息至對列或主題
  19. /// </summary>
  20. /// <param name="pathName">QueueName or TopicName</param>
  21. /// <param name="message">訊息</param>
  22. /// <returns></returns>
  23. public static async Task SendMessageAsync(this ServiceBusClient client, string pathName, ServiceBusMessage message)
  24. {
  25. try
  26. {
  27. ServiceBusSender sender = client.CreateSender(pathName);
  28. await sender.SendMessageAsync(message);
  29. if (!sender.IsClosed)
  30. {
  31. await sender.CloseAsync();
  32. await sender.DisposeAsync();
  33. }
  34. }
  35. catch
  36. {
  37. throw;
  38. }
  39. }
  40. /// <summary>
  41. /// 批量發送訊息至對列或主題,如果批量失敗返回False
  42. /// </summary>
  43. /// <param name="pathName">QueueName or TopicName</param>
  44. /// <param name="messages">批量訊息</param>
  45. /// <returns></returns>
  46. public static async Task<bool> SendBatchMessageAsync(this ServiceBusClient client, string pathName, IList<ServiceBusMessage> messages)
  47. {
  48. try
  49. {
  50. ServiceBusSender sender = client.CreateSender(pathName);
  51. ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
  52. foreach (var msg in messages)
  53. {
  54. if (!messageBatch.TryAddMessage(msg)) return false;
  55. }
  56. await sender.SendMessagesAsync(messageBatch);
  57. if (!sender.IsClosed)
  58. {
  59. await sender.CloseAsync();
  60. await sender.DisposeAsync();
  61. }
  62. return true;
  63. }
  64. catch
  65. {
  66. return false;
  67. }
  68. }
  69. /// <summary>
  70. /// 發送信息至對列或主題(指定時間排程)
  71. /// </summary>
  72. /// <param name="pathName">QueueName or TopicName</param>
  73. /// <param name="message">訊息</param>
  74. /// <returns>排程訊息的序列號。</returns>
  75. public static async Task<long> SendScheduleMessageAsync(this ServiceBusClient client, string pathName, ServiceBusMessage message, DateTimeOffset scheduleTime)
  76. {
  77. try
  78. {
  79. ServiceBusSender sender = client.CreateSender(pathName);
  80. long num= await sender.ScheduleMessageAsync(message, scheduleTime);
  81. if (!sender.IsClosed) {
  82. await sender.CloseAsync();
  83. await sender.DisposeAsync();
  84. }
  85. return num;
  86. }
  87. catch
  88. {
  89. throw;
  90. }
  91. }
  92. /// <summary>
  93. /// 批量發送訊息至對列或主題(指定時間排程),如果批量失敗返回False
  94. /// </summary>
  95. /// <param name="pathName">QueueName or TopicName</param>
  96. /// <param name="messages">批量訊息</param>
  97. /// <returns>排程訊息的序列號</returns>
  98. public static async Task<IReadOnlyList<long>> SendScheduleMessagesAsync(this ServiceBusClient client, string pathName, IList<ServiceBusMessage> messages, DateTimeOffset scheduleTime)
  99. {
  100. try
  101. {
  102. ServiceBusSender sender = client.CreateSender(pathName);
  103. List<ServiceBusMessage> msgs = new List<ServiceBusMessage>() { };
  104. foreach (var msg in messages)
  105. {
  106. msgs.Add(msg);
  107. }
  108. var nums= await sender.ScheduleMessagesAsync(msgs, scheduleTime);
  109. if (!sender.IsClosed)
  110. {
  111. await sender.CloseAsync();
  112. await sender.DisposeAsync();
  113. }
  114. return nums;
  115. }
  116. catch
  117. {
  118. return null;
  119. }
  120. }
  121. public static async Task<long> SendLeamMessage<T>(this ServiceBusClient client, string TopicName, string id, string pk, long startTime)
  122. {
  123. //微調代碼
  124. var timer = DateTimeOffset.FromUnixTimeMilliseconds(startTime);
  125. /*if (type.Equals("start"))
  126. {
  127. if (timer.CompareTo(DateTimeOffset.UtcNow) < 0)
  128. {
  129. progress = "going";
  130. }
  131. }
  132. else if(type.Equals("end"))
  133. {
  134. if (timer.CompareTo(DateTimeOffset.UtcNow) < 0)
  135. {
  136. progress = "finish";
  137. }
  138. }*/
  139. //设定开始时间
  140. Dictionary<string, object> dict = new Dictionary<string, object>() {
  141. { "name",typeof(T).Name},
  142. { "id",id},
  143. { "code",pk}
  144. };
  145. //var msgId = "1";
  146. string messageBody = $"Message {dict}";
  147. long SequenceNumber = await client.SendScheduleMessageAsync(TopicName, new ServiceBusMessage(dict.ToJsonString()), timer);
  148. return SequenceNumber;
  149. }
  150. public static async Task cancelMessage(this ServiceBusClient client, string TopicName, long number)
  151. {
  152. try
  153. {
  154. ServiceBusSender sender = client.CreateSender(TopicName);
  155. await sender.CancelScheduledMessageAsync(number);
  156. if (!sender.IsClosed)
  157. {
  158. await sender.CloseAsync();
  159. await sender.DisposeAsync();
  160. }
  161. }
  162. catch
  163. {
  164. throw;
  165. }
  166. }
  167. }
  168. }