Startup.cs 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Security.Authentication;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. using Microsoft.AspNetCore.Builder;
  8. using Microsoft.AspNetCore.Hosting;
  9. using Microsoft.AspNetCore.HttpsPolicy;
  10. using Microsoft.AspNetCore.Mvc;
  11. using Microsoft.AspNetCore.WebSockets;
  12. using Microsoft.Extensions.Configuration;
  13. using Microsoft.Extensions.DependencyInjection;
  14. using Microsoft.Extensions.Hosting;
  15. using Microsoft.Extensions.Logging;
  16. using MQTTnet;
  17. using MQTTnet.AspNetCore;
  18. using MQTTnet.Client.Receiving;
  19. using MQTTnet.Protocol;
  20. using MQTTnet.Server;
  21. namespace MQTT_Broker
  22. {
  23. public class Startup
  24. {
  25. //private IServiceProvider _serviceProvider;
  26. public Startup(IConfiguration configuration)
  27. {
  28. Configuration = configuration;
  29. // _logger = logger;
  30. }
  31. public IConfiguration Configuration { get; }
  32. // This method gets called by the runtime. Use this method to add services to the container.
  33. public void ConfigureServices(IServiceCollection services)
  34. {
  35. services.AddControllers();
  36. services.AddHostedMqttServerWithServices(
  37. builder =>
  38. {
  39. //builder.WithDefaultEndpoint();
  40. builder.WithDefaultEndpointPort(1883);
  41. builder.WithConnectionValidator(c =>
  42. {
  43. //从IServiceCollection中构建 ServiceProvider, 用以使用注入访问数据库的服务
  44. // var serprovider = services.BuildServiceProvider();
  45. // _logger.LogInformation($" ClientId:{c.ClientId} Endpoint:{c.Endpoint} Username:{c.Username} Password:{c.Password} WillMessage:{c.WillMessage}");
  46. //if (c.ClientId.Length < 5)
  47. //{
  48. // c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
  49. // return;
  50. //}
  51. //if (c.Username != "admin")
  52. //{
  53. // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  54. // return;
  55. //}
  56. //if (c.Password != "public")
  57. //{
  58. // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  59. // return;
  60. //}
  61. c.ReasonCode = MqttConnectReasonCode.Success;
  62. })
  63. .WithApplicationMessageInterceptor(context =>
  64. {
  65. //if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#"))
  66. //{
  67. // context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
  68. //}
  69. //if (context.ApplicationMessage.Topic == "not_allowed_topic")
  70. //{
  71. // context.AcceptPublish = false;
  72. // context.CloseConnection = true;
  73. //}
  74. // _logger.Log(LogLevel.Information, $"clientId:{context.ClientId}, topic:{context.ApplicationMessage.Topic}");
  75. // _logger.Log(LogLevel.Information, $"Payload:{Encoding.Default.GetString(context.ApplicationMessage.Payload)}");
  76. })///订阅拦截验证
  77. .WithSubscriptionInterceptor((context) =>
  78. {
  79. //if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
  80. //{
  81. // context.AcceptSubscription = false;
  82. //}
  83. //if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
  84. //{
  85. // context.AcceptSubscription = false;
  86. // context.CloseConnection = true;
  87. //}
  88. //context.TopicFilter.Topic.start
  89. });
  90. });
  91. services.AddMqttTcpServerAdapter();
  92. services.AddMqttWebSocketServerAdapter();
  93. services.AddMqttConnectionHandler() .AddConnections();
  94. //.AddMqttConnectionHandler().AddConnections().AddMqttTcpServerAdapter();
  95. //var mqttServerOptions = new MqttServerOptionsBuilder().WithEncryptionSslProtocol(SslProtocols.None)
  96. // .WithDefaultEndpointPort(9001).WithEncryptedEndpointPort(9002)
  97. // .WithoutDefaultEndpoint()
  98. // .Build();
  99. //services
  100. // .AddHostedMqttServer(mqttServerOptions)
  101. // .AddMqttWebSocketServerAdapter()
  102. // .AddMqttConnectionHandler()
  103. // //.AddWebSockets(x => { x = new WebSocketOptions { KeepAliveInterval = new TimeSpan(100000000) }; })
  104. // .AddConnections();
  105. }
  106. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
  107. public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
  108. {
  109. if (env.IsDevelopment())
  110. {
  111. app.UseDeveloperExceptionPage();
  112. }
  113. app.UseRouting();
  114. app.UseAuthorization();
  115. //app.UseMqttEndpoint("/mqtt");
  116. app.UseEndpoints(endpoints =>
  117. {
  118. endpoints.MapMqtt("/mqtt");
  119. endpoints.MapControllers();
  120. });
  121. // app.UseConnections(c => c.MapMqtt("/mqtt"));
  122. app.UseMqttServer(
  123. server =>
  124. {
  125. server.UseApplicationMessageReceivedHandler(e =>
  126. {
  127. });
  128. server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
  129. {
  130. Console.WriteLine(
  131. $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",
  132. ConsoleColor.Magenta);
  133. });
  134. server.StartedHandler = new MqttServerStartedHandlerDelegate((
  135. e =>
  136. {
  137. }));
  138. server.StoppedHandler = new MqttServerStoppedHandlerDelegate(
  139. e =>
  140. {
  141. });
  142. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(
  143. e =>
  144. {
  145. // _logger.LogInformation($"{e.ClientId} is connectioned");
  146. // _logger.LogInformation($"目前连接总数:{ server.GetClientStatusAsync().Result.Count}");
  147. });
  148. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(
  149. e =>
  150. {
  151. // _logger.LogInformation($"{e.ClientId} is disconnectioned");
  152. // _logger.LogInformation($"目前连接总数:{ server.GetClientStatusAsync().Result.Count}");
  153. });
  154. //开启订阅以及取消订阅
  155. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((args) => {
  156. Console.WriteLine("订阅" + args.ClientId + args.TopicFilter.Topic);
  157. });
  158. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((args) => {
  159. Console.WriteLine("取消订阅" + args.ClientId + args.TopicFilter);
  160. });
  161. server.UseClientConnectedHandler(x => {
  162. Console.WriteLine(x.ClientId + "连接");
  163. });
  164. server.UseClientDisconnectedHandler(x => {
  165. Console.WriteLine(x.ClientId + "断开连接");
  166. });
  167. });
  168. //app.UseMqttEndpoint("/mqtt");
  169. app.UseHttpsRedirection();
  170. }
  171. }
  172. }