using Microsoft.Azure.ServiceBus; using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; using TEAMModelOS.SDK.Context.Configuration; using TEAMModelOS.SDK.Helper.Common.CollectionHelper; using TEAMModelOS.SDK.Helper.Common.JsonHelper; using TEAMModelOS.SDK.Module.AzureCosmosDBV3; using TEAMModelOS.SDK.Module.AzureServiceBus; using TEAMModelOS.Service.Services.Learn.Interfaces; namespace TEAMModelOS.Service.Services.Learn.Implements { public class ServiceBusReviceService : IServiceBusReviceService { private readonly IAzureServiceBusService _serviceBus; private readonly IAzureCosmosDBV3Repository _cosmos; private static ISubscriptionClient subscriptionClient; //private readonly string Topic = BaseConfigModel.Configuration["HaBookAuth:ServiceBus:Topics"]; public ServiceBusReviceService(IAzureServiceBusService azureServiceBus, IAzureCosmosDBV3Repository cosmos) { _serviceBus = azureServiceBus; _cosmos = cosmos; } public async Task ReciveMessageAsync() { string SubName = "test_topic_ReciveTask"; subscriptionClient = _serviceBus.GetSubClient(SubName).subscriptionClient; RegisterOnMessageHandlerAndReceiveMessages(); await subscriptionClient.CloseAsync(); //return ""; } public void RegisterOnMessageHandlerAndReceiveMessages() { // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc. var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) { // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity. // Set it according to how many messages the application wants to process in parallel. MaxConcurrentCalls = 1, // Indicates whether MessagePump should automatically complete the messages after returning from User Callback. // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below. AutoComplete = false }; subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); } public async Task ProcessMessagesAsync(Message message, CancellationToken token) { // Process the message. Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}"); // Complete the message so that it is not received again. // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default). await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); Dictionary dict = Encoding.UTF8.GetString(message.Body).FromApiJson>(); Dictionary obj = new Dictionary(); dict.TryGetValue("id", out object info); dict.TryGetValue("name", out object name); dict.TryGetValue("status", out object status); obj.Add("id", info); var bus = await _cosmos.FindByDict(name.ToString(), obj); if (bus.IsNotEmpty()) { PropertyInfo propertyInfo = bus[0].GetType().GetProperty("status"); for (int i = 0; i < bus.Count; i++) propertyInfo.SetValue(bus[i], status); await _cosmos.UpdateAll(name.ToString(), bus); } //return message; // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed. // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc. // to avoid unnecessary exceptions. } public static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}."); var context = exceptionReceivedEventArgs.ExceptionReceivedContext; Console.WriteLine("Exception context for troubleshooting:"); Console.WriteLine($"- Endpoint: {context.Endpoint}"); Console.WriteLine($"- Entity Path: {context.EntityPath}"); Console.WriteLine($"- Executing Action: {context.Action}"); return Task.CompletedTask; } } }