123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- using DocumentFormat.OpenXml.Drawing;
- using Microsoft.Azure.Cosmos;
- using Microsoft.Extensions.DependencyInjection;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection;
- using System.Text;
- using System.Text.Json;
- using System.Threading;
- using System.Threading.Tasks;
- using TEAMModelOS.SDK.Context.Configuration;
- using TEAMModelOS.SDK.Helper.Common.JsonHelper;
- using TEAMModelOS.SDK.Module.AzureCosmosDBV3;
- namespace TEAMModelOS.Service.Services.ChangeFeed
- {
- public class ChangeFeedInvoke : IChangeFeedInvoke
- {
- private readonly IAzureCosmosDBV3Repository azureCosmosDBV3Repository;
- public ChangeFeedInvoke(IAzureCosmosDBV3Repository _IAzureCosmosDBV3Repository) {
- azureCosmosDBV3Repository = _IAzureCosmosDBV3Repository;
- }
- public void MonitorChangeFeed(Dictionary<string , CosmosModelInfo> dict, IServiceCollection _services) {
- var serviceCollection = new Microsoft.Extensions.DependencyInjection.ServiceCollection();
- ServiceProvider serviceProvider = _services.BuildServiceProvider();
- List<ServiceDescriptor> services = new List<ServiceDescriptor>();
- Type type = typeof(IChangeFeedService<>);
- foreach (ServiceDescriptor service in _services) {
- if (type.Name.Equals(service.ServiceType.Name)) {
- services.Add(service);
- }
- }
- foreach (string CollectionName in dict.Keys)
- {
- if (CollectionName.Equals("AleaseContainer"))
- {
- continue;
- }
- if (dict[CollectionName].monitor) {
- ChangeFeedProcessor changeFeedProcessor = dict[CollectionName]
- .container
- .GetChangeFeedProcessorBuilder<object>(CollectionName, async (changes, token) =>
- await ProcessChanges(changes, dict[CollectionName].type , services, serviceProvider))
- .WithInstanceName(CollectionName)
- .WithLeaseContainer(dict["AleaseContainer"].container)
- .Build();
- changeFeedProcessor.StartAsync();
- }
- }
- }
- private async Task ProcessChanges(IReadOnlyCollection<object> changes, Type type,
- List<ServiceDescriptor> services, ServiceProvider serviceProvider)
- {
- foreach (ServiceDescriptor service in services) {
- var obj = serviceProvider.GetService(service.ServiceType);
- MethodInfo mi = service.ServiceType.GetMethod("Processor");
- Type t = typeof(IReadOnlyCollection<>);
- t = t.MakeGenericType(type);
- object bjt = JsonSerializer.Deserialize(changes.ToApiJson(), t);
- mi.Invoke(obj, new object[] { bjt });//调用方法
- }
- }
- }
- }
|