ServiceBusReviceService.cs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. using Microsoft.Azure.ServiceBus;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reflection;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using TEAMModelOS.SDK.Context.Configuration;
  10. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  11. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  12. using TEAMModelOS.SDK.Module.AzureCosmosDBV3;
  13. using TEAMModelOS.SDK.Module.AzureServiceBus;
  14. using TEAMModelOS.Service.Services.Learn.Interfaces;
  15. namespace TEAMModelOS.Service.Services.Learn.Implements
  16. {
  17. public class ServiceBusReviceService : IServiceBusReviceService
  18. {
  19. private readonly IAzureServiceBusService _serviceBus;
  20. private readonly IAzureCosmosDBV3Repository _cosmos;
  21. private static ISubscriptionClient subscriptionClient;
  22. //private readonly string Topic = BaseConfigModel.Configuration["HaBookAuth:ServiceBus:Topics"];
  23. public ServiceBusReviceService(IAzureServiceBusService azureServiceBus, IAzureCosmosDBV3Repository cosmos)
  24. {
  25. _serviceBus = azureServiceBus;
  26. _cosmos = cosmos;
  27. }
  28. public async Task ReciveMessageAsync()
  29. {
  30. string SubName = "test_topic_ReciveTask";
  31. subscriptionClient = _serviceBus.GetSubClient(SubName).subscriptionClient;
  32. RegisterOnMessageHandlerAndReceiveMessages();
  33. await subscriptionClient.CloseAsync();
  34. //return "";
  35. }
  36. public void RegisterOnMessageHandlerAndReceiveMessages()
  37. {
  38. // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
  39. var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
  40. {
  41. // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
  42. // Set it according to how many messages the application wants to process in parallel.
  43. MaxConcurrentCalls = 1,
  44. // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
  45. // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
  46. AutoComplete = false
  47. };
  48. subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
  49. }
  50. public async Task ProcessMessagesAsync(Message message, CancellationToken token)
  51. {
  52. // Process the message.
  53. Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
  54. // Complete the message so that it is not received again.
  55. // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
  56. await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
  57. Dictionary<string, object> dict = Encoding.UTF8.GetString(message.Body).FromApiJson<Dictionary<string, object>>();
  58. Dictionary<string, object> obj = new Dictionary<string, object>();
  59. dict.TryGetValue("id", out object info);
  60. dict.TryGetValue("name", out object name);
  61. dict.TryGetValue("status", out object status);
  62. obj.Add("id", info);
  63. var bus = await _cosmos.FindByDict(name.ToString(), obj);
  64. if (bus.IsNotEmpty())
  65. {
  66. PropertyInfo propertyInfo = bus[0].GetType().GetProperty("status");
  67. for (int i = 0; i < bus.Count; i++)
  68. propertyInfo.SetValue(bus[i], status);
  69. await _cosmos.UpdateAll(name.ToString(), bus);
  70. }
  71. //return message;
  72. // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
  73. // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
  74. // to avoid unnecessary exceptions.
  75. }
  76. public static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
  77. {
  78. Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
  79. var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
  80. Console.WriteLine("Exception context for troubleshooting:");
  81. Console.WriteLine($"- Endpoint: {context.Endpoint}");
  82. Console.WriteLine($"- Entity Path: {context.EntityPath}");
  83. Console.WriteLine($"- Executing Action: {context.Action}");
  84. return Task.CompletedTask;
  85. }
  86. }
  87. }