using Grpc.Core; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using Microsoft.Extensions.Caching.Memory; using Grpc.Extension.Abstract.Model; using Grpc.Extension.Abstract.Discovery; using Grpc.Extension.Abstract; using Grpc.Extension.Client.Model; using Microsoft.Extensions.Options; namespace Grpc.Extension.Client.Internal { /// /// Channel统一管理 /// internal class ChannelPool { private ConcurrentDictionary _channels = new ConcurrentDictionary(); private IServiceDiscovery _serviceDiscovery; private ILoadBalancer _loadBalancer; private IMemoryCache _memoryCache; private GrpcClientOptions _grpcClientOptions; /// /// Channel统一管理 /// /// /// /// /// public ChannelPool(IServiceDiscovery serviceDiscovery, ILoadBalancer loadBalancer,IMemoryCache memoryCache, IOptions grpcClientOptions) { this._serviceDiscovery = serviceDiscovery; this._loadBalancer = loadBalancer; this._memoryCache = memoryCache; this._grpcClientOptions = grpcClientOptions.Value; } internal static List Configs { get; set; } = new List(); /// /// 根据客户端代理类型获取channel /// public Channel GetChannel(string grpcServiceName) { var config = Configs?.FirstOrDefault(q => q.GrpcServiceName == grpcServiceName?.Trim()); if (config == null) { throw new InternalException(GrpcErrorCode.Internal, $"{grpcServiceName ?? ""} client has not config,please call AddGrpcClient method"); } if (config.UseDirect) { return GetChannelCore(config.DirectEndpoint,config); } else//from discovery { var discoveryUrl = !string.IsNullOrWhiteSpace(config.DiscoveryUrl) ? config.DiscoveryUrl : _grpcClientOptions.DiscoveryUrl; var endPoint = GetEndpoint(config.DiscoveryServiceName, discoveryUrl, config.DiscoveryServiceTag); return GetChannelCore(endPoint,config); } } /// /// 根据服务名称返回服务地址 /// private string GetEndpoint(string serviceName, string dicoveryUrl, string serviceTag) { //获取健康的endpoints var isCache = true; var healthEndpoints = _memoryCache.GetOrCreate(serviceName, cacheEntry => { isCache = false; cacheEntry.SetAbsoluteExpiration(TimeSpan.FromSeconds(_grpcClientOptions.ServiceAddressCacheTime)); return _serviceDiscovery.GetEndpoints(serviceName, dicoveryUrl, serviceTag); }); if (healthEndpoints == null || healthEndpoints.Count == 0) { throw new InternalException(GrpcErrorCode.Internal,$"get endpoints from discovery of {serviceName} is null"); } //只有重新拉取了健康结点才需要去关闭不健康的Channel if (isCache == false) ShutdownErrorChannel(healthEndpoints, serviceName); return _loadBalancer.SelectEndpoint(serviceName, healthEndpoints); } private Channel GetChannelCore(string endpoint,ChannelConfig config) { //获取channel,不存在就添加 var channel = _channels.GetOrAdd(endpoint, (key) => CreateChannel(key,config)).Channel; //检查channel状态 if (channel.State != ChannelState.Ready) { //状态异常就关闭后重建 channel.ShutdownAsync(); _channels.TryRemove(config.DiscoveryServiceName, out var tmp); //新增或者修改channel return _channels.AddOrUpdate(endpoint, (key) => CreateChannel(key, config), (key, value) => CreateChannel(key,config)).Channel; } else { return channel; } } private ChannelInfo CreateChannel(string endPoint, ChannelConfig config) { var channel = new Channel(endPoint, ChannelCredentials.Insecure, config.ChannelOptions); var tryCount = 0;//重试计数 //检查channel状态 while (channel.State != ChannelState.Ready) { try { channel.ConnectAsync(DateTime.UtcNow.AddSeconds(1)).Wait(); } catch (Exception ex) { tryCount++; var exMsg = $"create channel for {config.DiscoveryServiceName} service failed {tryCount},status:{channel.State},endpoint:{endPoint}"; var exeption = new InternalException(GrpcErrorCode.Internal, exMsg, ex); if (tryCount > 2) { throw exeption; } else { LoggerAccessor.Instance.OnLoggerError(exeption, LogType.ClientLog); } //重新获取Endpoint,故障转移 if (!config.UseDirect) { endPoint = GetEndpoint(config.DiscoveryServiceName, config.DiscoveryUrl, config.DiscoveryServiceTag); channel = new Channel(endPoint, ChannelCredentials.Insecure); } } } return new ChannelInfo() { DiscoveryServiceName= config.DiscoveryServiceName,Channel = channel}; } /// /// 关闭不健康Channel /// /// /// private void ShutdownErrorChannel(List healthEndpoints,string serviceName) { //获取错误的channel var errorChannel = _channels.Where(p => p.Value.DiscoveryServiceName == serviceName && !healthEndpoints.Contains(p.Key)).ToList(); //关闭并删除错误的channel foreach (var channel in errorChannel) { channel.Value.Channel.ShutdownAsync(); _channels.TryRemove(channel.Key, out var tmp); } } /// /// 关闭所有Channel /// public void Shutdown() { _channels.Select(q => q.Value).ToList().ForEach(q => q.Channel.ShutdownAsync().Wait()); _channels.Clear(); } } }