123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- using Microsoft.Azure.ServiceBus;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection;
- using System.Text;
- using System.Text.Json;
- using System.Threading;
- using System.Threading.Tasks;
- using TEAMModelOS.SDK.Context.Configuration;
- using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
- using TEAMModelOS.SDK.Helper.Common.JsonHelper;
- using TEAMModelOS.SDK.DI;
- using TEAMModelOS.SDK.Module.AzureServiceBus;
- using TEAMModelOS.Service.Models;
- using TEAMModelOS.Service.Services.Learn.Interfaces;
- namespace TEAMModelOS.Service.Services.Learn.Implements
- {
- public class ServiceBusReviceService : IServiceBusReviceService
- {
- private readonly IAzureServiceBusService _serviceBus;
- private readonly AzureCosmosFactory _cosmos;
- private static ISubscriptionClient subscriptionClient;
- public ServiceBusReviceService(IAzureServiceBusService azureServiceBus, AzureCosmosFactory cosmos)
- {
- _serviceBus = azureServiceBus;
- _cosmos = cosmos;
- }
- public void ReciveMessageAsync()
- {
- string SubName = "test_topic_ReciveTask";
- subscriptionClient = _serviceBus.GetSubClient(SubName).subscriptionClient;
- RegisterOnMessageHandlerAndReceiveMessages();
- // await subscriptionClient.CloseAsync();
- //return "";
- }
- public void RegisterOnMessageHandlerAndReceiveMessages()
- {
- // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
- var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
- {
- // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
- // Set it according to how many messages the application wants to process in parallel.
- MaxConcurrentCalls = 1,
- // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
- // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
- AutoComplete = false
- };
- subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
- }
- public async Task ProcessMessagesAsync(Message message, CancellationToken token)
- {
- // Process the message.
- Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
- // Complete the message so that it is not received again.
- // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
- await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
- Dictionary<string, object> dict = Encoding.UTF8.GetString(message.Body).FromApiJson<Dictionary<string, object>>();
- Dictionary<string, object> obj = new Dictionary<string, object>();
- dict.TryGetValue("id", out object id);
- dict.TryGetValue("pk", out object pk);
- dict.TryGetValue("name", out object name);
- dict.TryGetValue("status", out object status);
- if (name != null && id!=null && pk!=null && status!=null ) {
- if (name.ToString() == typeof(Homework).Name) {
- Homework data= await _cosmos.FindByIdPk<Homework>(id.ToString(), pk.ToString());
- data.status = int.Parse(status.ToString());
- await _cosmos.Update(data);
- }
- if (name.ToString() == typeof(Vote).Name)
- {
- Vote data = await _cosmos.FindByIdPk<Vote>(id.ToString(), pk.ToString());
- data.status = int.Parse(status.ToString());
- await _cosmos.Update(data);
- }
- if (name.ToString() == typeof(Survey).Name)
- {
- Survey data= await _cosmos.FindByIdPk<Survey>(id.ToString(), pk.ToString());
- data.status = int.Parse(status.ToString());
- await _cosmos.Update(data);
- }
- if (name.ToString() == typeof(ExamInfo).Name)
- {
- ExamInfo data= await _cosmos.FindByIdPk<ExamInfo>(id.ToString(), pk.ToString());
- data.status = int.Parse(status.ToString());
- await _cosmos.Update(data);
- }
- if (name.ToString() == typeof(LearnTask).Name)
- {
- LearnTask data = await _cosmos.FindByIdPk<LearnTask>(id.ToString(), pk.ToString());
- data.status = int.Parse(status.ToString());
- await _cosmos.Update(data);
- }
- }
-
- //return message;
- // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
- // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
- // to avoid unnecessary exceptions.
- }
- public static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
- {
- Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
- var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
- Console.WriteLine("Exception context for troubleshooting:");
- Console.WriteLine($"- Endpoint: {context.Endpoint}");
- Console.WriteLine($"- Entity Path: {context.EntityPath}");
- Console.WriteLine($"- Executing Action: {context.Action}");
- return Task.CompletedTask;
- }
- }
- }
|