AzureServiceBusExtensions.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. using Microsoft.Azure.Cosmos.Table;
  2. using Microsoft.Azure.Cosmos.Table.Queryable;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using System.Linq;
  10. using Azure.Messaging.ServiceBus;
  11. using TEAMModelOS.SDK;
  12. using TEAMModelOS.SDK.Extension;
  13. using System.Collections.Concurrent;
  14. namespace TEAMModelOS.SDK.DI
  15. {
  16. public static class AzureServiceBusExtensions
  17. {
  18. private static ConcurrentDictionary<string, ServiceBusSender> ServiceBusSenders { get; } = new ConcurrentDictionary<string, ServiceBusSender>();
  19. /// <summary>
  20. /// 發送信息至對列或主題
  21. /// </summary>
  22. /// <param name="name">QueueName or TopicName</param>
  23. /// <param name="message">訊息</param>
  24. /// <returns></returns>
  25. public static async Task SendMessageAsync(this ServiceBusClient client, string name, ServiceBusMessage message)
  26. {
  27. try
  28. {
  29. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  30. await sender.SendMessageAsync(message);
  31. }
  32. catch
  33. {
  34. throw;
  35. }
  36. }
  37. /// <summary>
  38. /// 批量發送訊息至對列或主題,如果批量失敗返回False
  39. /// </summary>
  40. /// <param name="name">QueueName or TopicName</param>
  41. /// <param name="messages">批量訊息</param>
  42. /// <returns></returns>
  43. public static async Task<bool> SendBatchMessageAsync(this ServiceBusClient client, string name, IList<ServiceBusMessage> messages)
  44. {
  45. try
  46. {
  47. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  48. ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
  49. foreach (var msg in messages)
  50. {
  51. if (!messageBatch.TryAddMessage(msg)) return false;
  52. }
  53. await sender.SendMessagesAsync(messageBatch);
  54. return true;
  55. }
  56. catch
  57. {
  58. return false;
  59. }
  60. }
  61. /// <summary>
  62. /// 發送信息至對列或主題(指定時間排程)
  63. /// </summary>
  64. /// <param name="name">QueueName or TopicName</param>
  65. /// <param name="message">訊息</param>
  66. /// <returns>排程訊息的序列號。</returns>
  67. public static async Task<long> SendScheduleMessageAsync(this ServiceBusClient client, string name, ServiceBusMessage message, DateTimeOffset scheduleTime)
  68. {
  69. try
  70. {
  71. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  72. long num= await sender.ScheduleMessageAsync(message, scheduleTime);
  73. return num;
  74. }
  75. catch
  76. {
  77. throw;
  78. }
  79. }
  80. /// <summary>
  81. /// 批量發送訊息至對列或主題(指定時間排程),如果批量失敗返回False
  82. /// </summary>
  83. /// <param name="name">QueueName or TopicName</param>
  84. /// <param name="messages">批量訊息</param>
  85. /// <returns>排程訊息的序列號</returns>
  86. public static async Task<IReadOnlyList<long>> SendScheduleMessagesAsync(this ServiceBusClient client, string name, IList<ServiceBusMessage> messages, DateTimeOffset scheduleTime)
  87. {
  88. try
  89. {
  90. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  91. List<ServiceBusMessage> msgs = new() { };
  92. foreach (var msg in messages)
  93. {
  94. msgs.Add(msg);
  95. }
  96. var nums= await sender.ScheduleMessagesAsync(msgs, scheduleTime);
  97. return nums;
  98. }
  99. catch
  100. {
  101. return null;
  102. }
  103. }
  104. public static async Task<long> SendLeamMessage<T>(this ServiceBusClient client, string name, string id, string pk, long startTime)
  105. {
  106. //微調代碼
  107. var timer = DateTimeOffset.FromUnixTimeMilliseconds(startTime);
  108. /*if (type.Equals("start"))
  109. {
  110. if (timer.CompareTo(DateTimeOffset.UtcNow) < 0)
  111. {
  112. progress = "going";
  113. }
  114. }
  115. else if(type.Equals("end"))
  116. {
  117. if (timer.CompareTo(DateTimeOffset.UtcNow) < 0)
  118. {
  119. progress = "finish";
  120. }
  121. }*/
  122. //设定开始时间
  123. Dictionary<string, object> dict = new() {
  124. { "name",typeof(T).Name},
  125. { "id",id},
  126. { "code",pk}
  127. };
  128. //var msgId = "1";
  129. string messageBody = $"Message {dict}";
  130. long SequenceNumber = await client.SendScheduleMessageAsync(name, new ServiceBusMessage(dict.ToJsonString()), timer);
  131. return SequenceNumber;
  132. }
  133. public static async Task CancelMessageAsync(this ServiceBusClient client, string name, long number)
  134. {
  135. try
  136. {
  137. ServiceBusSender sender = ServiceBusSenders.GetOrAdd(name, x => client.CreateSender(name));
  138. await sender.CancelScheduledMessageAsync(number);
  139. }
  140. catch
  141. {
  142. throw;
  143. }
  144. }
  145. }
  146. }