AzureServiceBusExtensions.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using System.Linq;
  8. using Azure.Messaging.ServiceBus;
  9. using TEAMModelOS.SDK;
  10. using TEAMModelOS.SDK.Extension;
  11. using System.Collections.Concurrent;
  12. namespace TEAMModelOS.SDK.DI
  13. {
  14. public static class AzureServiceBusExtensions
  15. {
  16. private static ConcurrentDictionary<string, ServiceBusSender> ServiceBusSenders { get; } = new ConcurrentDictionary<string, ServiceBusSender>();
  17. /// <summary>
  18. /// 發送信息至對列或主題
  19. /// </summary>
  20. /// <param name="name">QueueName or TopicName</param>
  21. /// <param name="message">訊息</param>
  22. /// <returns></returns>
  23. public static async Task SendMessageAsync(this ServiceBusClient client, string name, ServiceBusMessage message)
  24. {
  25. try
  26. {
  27. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  28. await sender.SendMessageAsync(message);
  29. }
  30. catch
  31. {
  32. throw;
  33. }
  34. }
  35. /// <summary>
  36. /// 批量發送訊息至對列或主題,如果批量失敗返回False
  37. /// </summary>
  38. /// <param name="name">QueueName or TopicName</param>
  39. /// <param name="messages">批量訊息</param>
  40. /// <returns></returns>
  41. public static async Task<bool> SendBatchMessageAsync(this ServiceBusClient client, string name, IList<ServiceBusMessage> messages)
  42. {
  43. try
  44. {
  45. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  46. ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
  47. foreach (var msg in messages)
  48. {
  49. if (!messageBatch.TryAddMessage(msg)) return false;
  50. }
  51. await sender.SendMessagesAsync(messageBatch);
  52. return true;
  53. }
  54. catch
  55. {
  56. return false;
  57. }
  58. }
  59. /// <summary>
  60. /// 發送信息至對列或主題(指定時間排程)
  61. /// </summary>
  62. /// <param name="name">QueueName or TopicName</param>
  63. /// <param name="message">訊息</param>
  64. /// <returns>排程訊息的序列號。</returns>
  65. public static async Task<long> SendScheduleMessageAsync(this ServiceBusClient client, string name, ServiceBusMessage message, DateTimeOffset scheduleTime)
  66. {
  67. try
  68. {
  69. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  70. long num= await sender.ScheduleMessageAsync(message, scheduleTime);
  71. return num;
  72. }
  73. catch
  74. {
  75. throw;
  76. }
  77. }
  78. /// <summary>
  79. /// 批量發送訊息至對列或主題(指定時間排程),如果批量失敗返回False
  80. /// </summary>
  81. /// <param name="name">QueueName or TopicName</param>
  82. /// <param name="messages">批量訊息</param>
  83. /// <returns>排程訊息的序列號</returns>
  84. public static async Task<IReadOnlyList<long>> SendScheduleMessagesAsync(this ServiceBusClient client, string name, IList<ServiceBusMessage> messages, DateTimeOffset scheduleTime)
  85. {
  86. try
  87. {
  88. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  89. List<ServiceBusMessage> msgs = new() { };
  90. foreach (var msg in messages)
  91. {
  92. msgs.Add(msg);
  93. }
  94. var nums= await sender.ScheduleMessagesAsync(msgs, scheduleTime);
  95. return nums;
  96. }
  97. catch
  98. {
  99. return null;
  100. }
  101. }
  102. public static async Task<long> SendLeamMessage<T>(this ServiceBusClient client, string name, string id, string pk, long startTime)
  103. {
  104. //微調代碼
  105. var timer = DateTimeOffset.FromUnixTimeMilliseconds(startTime);
  106. /*if (type.Equals("start"))
  107. {
  108. if (timer.CompareTo(DateTimeOffset.UtcNow) < 0)
  109. {
  110. progress = "going";
  111. }
  112. }
  113. else if(type.Equals("end"))
  114. {
  115. if (timer.CompareTo(DateTimeOffset.UtcNow) < 0)
  116. {
  117. progress = "finish";
  118. }
  119. }*/
  120. //设定开始时间
  121. Dictionary<string, object> dict = new() {
  122. { "name",typeof(T).Name},
  123. { "id",id},
  124. { "code",pk}
  125. };
  126. //var msgId = "1";
  127. string messageBody = $"Message {dict}";
  128. long SequenceNumber = await client.SendScheduleMessageAsync(name, new ServiceBusMessage(dict.ToJsonString()), timer);
  129. return SequenceNumber;
  130. }
  131. public static async Task CancelMessageAsync(this ServiceBusClient client, string name, long number)
  132. {
  133. try
  134. {
  135. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  136. await sender.CancelScheduledMessageAsync(number);
  137. }
  138. catch
  139. {
  140. throw;
  141. }
  142. }
  143. }
  144. }