|
@@ -1,7 +1,9 @@
|
|
|
using DocumentFormat.OpenXml.Drawing;
|
|
|
using Microsoft.Azure.Cosmos;
|
|
|
+using Microsoft.Extensions.DependencyInjection;
|
|
|
using System;
|
|
|
using System.Collections.Generic;
|
|
|
+using System.Linq;
|
|
|
using System.Reflection;
|
|
|
using System.Text;
|
|
|
using System.Text.Json;
|
|
@@ -20,8 +22,16 @@ namespace TEAMModelOS.Service.Services.ChangeFeed
|
|
|
azureCosmosDBV3Repository = _IAzureCosmosDBV3Repository;
|
|
|
}
|
|
|
|
|
|
- public async Task MonitorChangeFeed() {
|
|
|
- Dictionary<string,CosmosModelInfo> dict = azureCosmosDBV3Repository.GetCosmosModelInfo();
|
|
|
+ public void MonitorChangeFeed(Dictionary<string , CosmosModelInfo> dict, IServiceCollection _services) {
|
|
|
+ var serviceCollection = new Microsoft.Extensions.DependencyInjection.ServiceCollection();
|
|
|
+ ServiceProvider serviceProvider = _services.BuildServiceProvider();
|
|
|
+ List<ServiceDescriptor> services = new List<ServiceDescriptor>();
|
|
|
+ Type type = typeof(IChangeFeedService<>);
|
|
|
+ foreach (ServiceDescriptor service in _services) {
|
|
|
+ if (type.Name.Equals(service.ServiceType.Name)) {
|
|
|
+ services.Add(service);
|
|
|
+ }
|
|
|
+ }
|
|
|
foreach (string CollectionName in dict.Keys)
|
|
|
{
|
|
|
if (CollectionName.Equals("AleaseContainer"))
|
|
@@ -31,51 +41,25 @@ namespace TEAMModelOS.Service.Services.ChangeFeed
|
|
|
if (dict[CollectionName].monitor) {
|
|
|
ChangeFeedProcessor changeFeedProcessor = dict[CollectionName]
|
|
|
.container
|
|
|
- .GetChangeFeedProcessorBuilder<object>(CollectionName, async (changes, token) => await ProcessChanges(changes, CollectionName, dict[CollectionName].type))
|
|
|
+ .GetChangeFeedProcessorBuilder<object>(CollectionName, async (changes, token) =>
|
|
|
+ await ProcessChanges(changes, dict[CollectionName].type , services, serviceProvider))
|
|
|
.WithInstanceName(CollectionName)
|
|
|
.WithLeaseContainer(dict["AleaseContainer"].container)
|
|
|
.Build();
|
|
|
- await changeFeedProcessor.StartAsync();
|
|
|
+ changeFeedProcessor.StartAsync();
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
- /// <summary>
|
|
|
- /// 获取到监听更改的数据
|
|
|
- /// </summary>
|
|
|
- /// <param name="changes"></param>
|
|
|
- /// <param name="type"></param>
|
|
|
- /// <param name="CollectionName"></param>
|
|
|
- /// <returns></returns>
|
|
|
- private async Task ProcessChanges(IReadOnlyCollection<object> changes, string CollectionName,Type type)
|
|
|
+ private async Task ProcessChanges(IReadOnlyCollection<object> changes, Type type,
|
|
|
+ List<ServiceDescriptor> services, ServiceProvider serviceProvider)
|
|
|
{
|
|
|
- Assembly assembly = Assembly.GetAssembly(typeof(IChangeFeedService<>));
|
|
|
- Type[] types = assembly.GetTypes();
|
|
|
- List<Type> list = new List<Type>();
|
|
|
- foreach (Type item in types)
|
|
|
- {
|
|
|
- if (item.IsInterface) continue;//判断是否是接口
|
|
|
- Type[] ins = item.GetInterfaces();
|
|
|
- foreach (Type ty in ins)
|
|
|
- {
|
|
|
- Type t = typeof(IChangeFeedService<>);
|
|
|
- t = t.MakeGenericType(type);
|
|
|
- if (ty == t)
|
|
|
- {
|
|
|
- list.Add(item);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- foreach (Type item in list) {
|
|
|
- object obj = Activator.CreateInstance(item);//创建一个obj对象
|
|
|
- MethodInfo mi = item.GetMethod("Processor");
|
|
|
+ foreach (ServiceDescriptor service in services) {
|
|
|
+ var obj = serviceProvider.GetService(service.ServiceType);
|
|
|
+ MethodInfo mi = service.ServiceType.GetMethod("Processor");
|
|
|
Type t = typeof(IReadOnlyCollection<>);
|
|
|
t = t.MakeGenericType(type);
|
|
|
- // object objt = Activator.CreateInstance(t);
|
|
|
- object bjt= JsonSerializer.Deserialize(changes.ToApiJson(), t);
|
|
|
- mi.Invoke(obj, new object []{ bjt });//调用方法
|
|
|
- // mi = item.GetMethod("BaseClass_VoidPublic");
|
|
|
- // string s = mi.Invoke(obj, null) as string;//调用有返回值的方法,使用 as 关键字 转换返回的类型
|
|
|
+ object bjt = JsonSerializer.Deserialize(changes.ToApiJson(), t);
|
|
|
+ mi.Invoke(obj, new object[] { bjt });//调用方法
|
|
|
}
|
|
|
}
|
|
|
}
|