ServiceBusReviceService.cs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. using Microsoft.Azure.ServiceBus;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reflection;
  6. using System.Text;
  7. using System.Text.Json;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using TEAMModelOS.SDK.Context.Configuration;
  11. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  12. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  13. using TEAMModelOS.SDK.DI;
  14. using TEAMModelOS.SDK.Module.AzureServiceBus;
  15. using TEAMModelOS.Service.Models;
  16. using TEAMModelOS.Service.Services.Learn.Interfaces;
  17. namespace TEAMModelOS.Service.Services.Learn.Implements
  18. {
  19. public class ServiceBusReviceService : IServiceBusReviceService
  20. {
  21. private readonly IAzureServiceBusService _serviceBus;
  22. private readonly AzureCosmosFactory _cosmos;
  23. private static ISubscriptionClient subscriptionClient;
  24. public ServiceBusReviceService(IAzureServiceBusService azureServiceBus, AzureCosmosFactory cosmos)
  25. {
  26. _serviceBus = azureServiceBus;
  27. _cosmos = cosmos;
  28. }
  29. public void ReciveMessageAsync()
  30. {
  31. string SubName = "test_topic_ReciveTask";
  32. subscriptionClient = _serviceBus.GetSubClient(SubName).subscriptionClient;
  33. RegisterOnMessageHandlerAndReceiveMessages();
  34. // await subscriptionClient.CloseAsync();
  35. //return "";
  36. }
  37. public void RegisterOnMessageHandlerAndReceiveMessages()
  38. {
  39. // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
  40. var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
  41. {
  42. // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
  43. // Set it according to how many messages the application wants to process in parallel.
  44. MaxConcurrentCalls = 1,
  45. // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
  46. // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
  47. AutoComplete = false
  48. };
  49. subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
  50. }
  51. public async Task ProcessMessagesAsync(Message message, CancellationToken token)
  52. {
  53. // Process the message.
  54. Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
  55. // Complete the message so that it is not received again.
  56. // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
  57. await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
  58. Dictionary<string, object> dict = Encoding.UTF8.GetString(message.Body).FromApiJson<Dictionary<string, object>>();
  59. Dictionary<string, object> obj = new Dictionary<string, object>();
  60. dict.TryGetValue("id", out object id);
  61. dict.TryGetValue("pk", out object pk);
  62. dict.TryGetValue("name", out object name);
  63. dict.TryGetValue("status", out object status);
  64. if (name != null && id!=null && pk!=null && status!=null ) {
  65. if (name.ToString() == typeof(Homework).Name) {
  66. Homework data= await _cosmos.FindByIdPk<Homework>(id.ToString(), pk.ToString());
  67. data.status = int.Parse(status.ToString());
  68. await _cosmos.Update(data);
  69. }
  70. if (name.ToString() == typeof(Vote).Name)
  71. {
  72. Vote data = await _cosmos.FindByIdPk<Vote>(id.ToString(), pk.ToString());
  73. data.status = int.Parse(status.ToString());
  74. await _cosmos.Update(data);
  75. }
  76. if (name.ToString() == typeof(Survey).Name)
  77. {
  78. Survey data= await _cosmos.FindByIdPk<Survey>(id.ToString(), pk.ToString());
  79. data.status = int.Parse(status.ToString());
  80. await _cosmos.Update(data);
  81. }
  82. if (name.ToString() == typeof(ExamInfo).Name)
  83. {
  84. ExamInfo data= await _cosmos.FindByIdPk<ExamInfo>(id.ToString(), pk.ToString());
  85. data.status = int.Parse(status.ToString());
  86. await _cosmos.Update(data);
  87. }
  88. if (name.ToString() == typeof(LearnTask).Name)
  89. {
  90. LearnTask data = await _cosmos.FindByIdPk<LearnTask>(id.ToString(), pk.ToString());
  91. data.status = int.Parse(status.ToString());
  92. await _cosmos.Update(data);
  93. }
  94. }
  95. //return message;
  96. // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
  97. // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
  98. // to avoid unnecessary exceptions.
  99. }
  100. public static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
  101. {
  102. Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
  103. var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
  104. Console.WriteLine("Exception context for troubleshooting:");
  105. Console.WriteLine($"- Endpoint: {context.Endpoint}");
  106. Console.WriteLine($"- Entity Path: {context.EntityPath}");
  107. Console.WriteLine($"- Executing Action: {context.Action}");
  108. return Task.CompletedTask;
  109. }
  110. }
  111. }