123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- using Microsoft.Azure.ServiceBus;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using TEAMModelOS.SDK.Context.Configuration;
- using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
- using TEAMModelOS.SDK.Helper.Common.JsonHelper;
- using TEAMModelOS.SDK.Module.AzureCosmosDBV3;
- using TEAMModelOS.SDK.Module.AzureServiceBus;
- using TEAMModelOS.Service.Services.Learn.Interfaces;
- namespace TEAMModelOS.Service.Services.Learn.Implements
- {
- public class ServiceBusReviceService : IServiceBusReviceService
- {
- private readonly IAzureServiceBusService _serviceBus;
- private readonly IAzureCosmosDBV3Repository _cosmos;
- private static ISubscriptionClient subscriptionClient;
- //private readonly string Topic = BaseConfigModel.Configuration["HaBookAuth:ServiceBus:Topics"];
- public ServiceBusReviceService(IAzureServiceBusService azureServiceBus, IAzureCosmosDBV3Repository cosmos)
- {
- _serviceBus = azureServiceBus;
- _cosmos = cosmos;
- }
- public async Task ReciveMessageAsync()
- {
- string SubName = "test_topic_ReciveTask";
- subscriptionClient = _serviceBus.GetSubClient(SubName).subscriptionClient;
- RegisterOnMessageHandlerAndReceiveMessages();
- await subscriptionClient.CloseAsync();
- //return "";
- }
- public void RegisterOnMessageHandlerAndReceiveMessages()
- {
- // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
- var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
- {
- // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
- // Set it according to how many messages the application wants to process in parallel.
- MaxConcurrentCalls = 1,
- // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
- // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
- AutoComplete = false
- };
- subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
- }
- public async Task ProcessMessagesAsync(Message message, CancellationToken token)
- {
- // Process the message.
- Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
- // Complete the message so that it is not received again.
- // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
- await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
- Dictionary<string, object> dict = Encoding.UTF8.GetString(message.Body).FromApiJson<Dictionary<string, object>>();
- Dictionary<string, object> obj = new Dictionary<string, object>();
- dict.TryGetValue("id", out object info);
- dict.TryGetValue("name", out object name);
- dict.TryGetValue("status", out object status);
- obj.Add("id", info);
- var bus = await _cosmos.FindByDict(name.ToString(), obj);
- if (bus.IsNotEmpty())
- {
- PropertyInfo propertyInfo = bus[0].GetType().GetProperty("status");
- for (int i = 0; i < bus.Count; i++)
- propertyInfo.SetValue(bus[i], status);
- await _cosmos.UpdateAll(name.ToString(), bus);
- }
- //return message;
- // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
- // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
- // to avoid unnecessary exceptions.
- }
- public static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
- {
- Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
- var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
- Console.WriteLine("Exception context for troubleshooting:");
- Console.WriteLine($"- Endpoint: {context.Endpoint}");
- Console.WriteLine($"- Entity Path: {context.EntityPath}");
- Console.WriteLine($"- Executing Action: {context.Action}");
- return Task.CompletedTask;
- }
- }
- }
|