ChannelPool.cs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. using Grpc.Core;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using Microsoft.Extensions.Caching.Memory;
  7. using Grpc.Extension.Abstract.Model;
  8. using Grpc.Extension.Abstract.Discovery;
  9. using Grpc.Extension.Abstract;
  10. using Grpc.Extension.Client.Model;
  11. using Microsoft.Extensions.Options;
  12. namespace Grpc.Extension.Client.Internal
  13. {
  14. /// <summary>
  15. /// Channel统一管理
  16. /// </summary>
  17. internal class ChannelPool
  18. {
  19. private ConcurrentDictionary<string, ChannelInfo> _channels = new ConcurrentDictionary<string, ChannelInfo>();
  20. private IServiceDiscovery _serviceDiscovery;
  21. private ILoadBalancer _loadBalancer;
  22. private IMemoryCache _memoryCache;
  23. private GrpcClientOptions _grpcClientOptions;
  24. /// <summary>
  25. /// Channel统一管理
  26. /// </summary>
  27. /// <param name="serviceDiscovery"></param>
  28. /// <param name="loadBalancer"></param>
  29. /// <param name="memoryCache"></param>
  30. /// <param name="grpcClientOptions"></param>
  31. public ChannelPool(IServiceDiscovery serviceDiscovery, ILoadBalancer loadBalancer,IMemoryCache memoryCache, IOptions<GrpcClientOptions> grpcClientOptions)
  32. {
  33. this._serviceDiscovery = serviceDiscovery;
  34. this._loadBalancer = loadBalancer;
  35. this._memoryCache = memoryCache;
  36. this._grpcClientOptions = grpcClientOptions.Value;
  37. }
  38. internal static List<ChannelConfig> Configs { get; set; } = new List<ChannelConfig>();
  39. /// <summary>
  40. /// 根据客户端代理类型获取channel
  41. /// </summary>
  42. public Channel GetChannel(string grpcServiceName)
  43. {
  44. var config = Configs?.FirstOrDefault(q => q.GrpcServiceName == grpcServiceName?.Trim());
  45. if (config == null)
  46. {
  47. throw new InternalException(GrpcErrorCode.Internal, $"{grpcServiceName ?? ""} client has not config,please call AddGrpcClient method");
  48. }
  49. if (config.UseDirect)
  50. {
  51. return GetChannelCore(config.DirectEndpoint,config);
  52. }
  53. else//from discovery
  54. {
  55. var discoveryUrl = !string.IsNullOrWhiteSpace(config.DiscoveryUrl) ? config.DiscoveryUrl : _grpcClientOptions.DiscoveryUrl;
  56. var endPoint = GetEndpoint(config.DiscoveryServiceName, discoveryUrl, config.DiscoveryServiceTag);
  57. return GetChannelCore(endPoint,config);
  58. }
  59. }
  60. /// <summary>
  61. /// 根据服务名称返回服务地址
  62. /// </summary>
  63. private string GetEndpoint(string serviceName, string dicoveryUrl, string serviceTag)
  64. {
  65. //获取健康的endpoints
  66. var isCache = true;
  67. var healthEndpoints = _memoryCache.GetOrCreate(serviceName, cacheEntry =>
  68. {
  69. isCache = false;
  70. cacheEntry.SetAbsoluteExpiration(TimeSpan.FromSeconds(_grpcClientOptions.ServiceAddressCacheTime));
  71. return _serviceDiscovery.GetEndpoints(serviceName, dicoveryUrl, serviceTag);
  72. });
  73. if (healthEndpoints == null || healthEndpoints.Count == 0)
  74. {
  75. throw new InternalException(GrpcErrorCode.Internal,$"get endpoints from discovery of {serviceName} is null");
  76. }
  77. //只有重新拉取了健康结点才需要去关闭不健康的Channel
  78. if (isCache == false) ShutdownErrorChannel(healthEndpoints, serviceName);
  79. return _loadBalancer.SelectEndpoint(serviceName, healthEndpoints);
  80. }
  81. private Channel GetChannelCore(string endpoint,ChannelConfig config)
  82. {
  83. //获取channel,不存在就添加
  84. var channel = _channels.GetOrAdd(endpoint, (key) => CreateChannel(key,config)).Channel;
  85. //检查channel状态
  86. if (channel.State != ChannelState.Ready)
  87. {
  88. //状态异常就关闭后重建
  89. channel.ShutdownAsync();
  90. _channels.TryRemove(config.DiscoveryServiceName, out var tmp);
  91. //新增或者修改channel
  92. return _channels.AddOrUpdate(endpoint, (key) => CreateChannel(key, config), (key, value) => CreateChannel(key,config)).Channel;
  93. }
  94. else
  95. {
  96. return channel;
  97. }
  98. }
  99. private ChannelInfo CreateChannel(string endPoint, ChannelConfig config)
  100. {
  101. var channel = new Channel(endPoint, ChannelCredentials.Insecure, config.ChannelOptions);
  102. var tryCount = 0;//重试计数
  103. //检查channel状态
  104. while (channel.State != ChannelState.Ready)
  105. {
  106. try
  107. {
  108. channel.ConnectAsync(DateTime.UtcNow.AddSeconds(1)).Wait();
  109. }
  110. catch (Exception ex)
  111. {
  112. tryCount++;
  113. var exMsg = $"create channel for {config.DiscoveryServiceName} service failed {tryCount},status:{channel.State},endpoint:{endPoint}";
  114. var exeption = new InternalException(GrpcErrorCode.Internal, exMsg, ex);
  115. if (tryCount > 2)
  116. {
  117. throw exeption;
  118. }
  119. else
  120. {
  121. LoggerAccessor.Instance.OnLoggerError(exeption, LogType.ClientLog);
  122. }
  123. //重新获取Endpoint,故障转移
  124. if (!config.UseDirect)
  125. {
  126. endPoint = GetEndpoint(config.DiscoveryServiceName, config.DiscoveryUrl, config.DiscoveryServiceTag);
  127. channel = new Channel(endPoint, ChannelCredentials.Insecure);
  128. }
  129. }
  130. }
  131. return new ChannelInfo() { DiscoveryServiceName= config.DiscoveryServiceName,Channel = channel};
  132. }
  133. /// <summary>
  134. /// 关闭不健康Channel
  135. /// </summary>
  136. /// <param name="healthEndpoints"></param>
  137. /// <param name="serviceName"></param>
  138. private void ShutdownErrorChannel(List<string> healthEndpoints,string serviceName)
  139. {
  140. //获取错误的channel
  141. var errorChannel = _channels.Where(p => p.Value.DiscoveryServiceName == serviceName &&
  142. !healthEndpoints.Contains(p.Key)).ToList();
  143. //关闭并删除错误的channel
  144. foreach (var channel in errorChannel)
  145. {
  146. channel.Value.Channel.ShutdownAsync();
  147. _channels.TryRemove(channel.Key, out var tmp);
  148. }
  149. }
  150. /// <summary>
  151. /// 关闭所有Channel
  152. /// </summary>
  153. public void Shutdown()
  154. {
  155. _channels.Select(q => q.Value).ToList().ForEach(q => q.Channel.ShutdownAsync().Wait());
  156. _channels.Clear();
  157. }
  158. }
  159. }