|
@@ -0,0 +1,196 @@
|
|
|
|
+using System;
|
|
|
|
+using System.Collections.Generic;
|
|
|
|
+using System.Linq;
|
|
|
|
+using System.Security.Authentication;
|
|
|
|
+using System.Text;
|
|
|
|
+using System.Threading.Tasks;
|
|
|
|
+using Microsoft.AspNetCore.Builder;
|
|
|
|
+using Microsoft.AspNetCore.Hosting;
|
|
|
|
+using Microsoft.AspNetCore.HttpsPolicy;
|
|
|
|
+using Microsoft.AspNetCore.Mvc;
|
|
|
|
+using Microsoft.AspNetCore.WebSockets;
|
|
|
|
+using Microsoft.Extensions.Configuration;
|
|
|
|
+using Microsoft.Extensions.DependencyInjection;
|
|
|
|
+using Microsoft.Extensions.Hosting;
|
|
|
|
+using Microsoft.Extensions.Logging;
|
|
|
|
+using MQTTnet;
|
|
|
|
+using MQTTnet.AspNetCore;
|
|
|
|
+using MQTTnet.Client.Receiving;
|
|
|
|
+using MQTTnet.Protocol;
|
|
|
|
+using MQTTnet.Server;
|
|
|
|
+namespace MQTT_Broker
|
|
|
|
+{
|
|
|
|
+ public class Startup
|
|
|
|
+ {
|
|
|
|
+ //private IServiceProvider _serviceProvider;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ public Startup(IConfiguration configuration)
|
|
|
|
+ {
|
|
|
|
+ Configuration = configuration;
|
|
|
|
+ // _logger = logger;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public IConfiguration Configuration { get; }
|
|
|
|
+
|
|
|
|
+ // This method gets called by the runtime. Use this method to add services to the container.
|
|
|
|
+ public void ConfigureServices(IServiceCollection services)
|
|
|
|
+ {
|
|
|
|
+ services.AddControllers();
|
|
|
|
+ services.AddHostedMqttServerWithServices(
|
|
|
|
+ builder =>
|
|
|
|
+ {
|
|
|
|
+ //builder.WithDefaultEndpoint();
|
|
|
|
+
|
|
|
|
+ builder.WithDefaultEndpointPort(1883);
|
|
|
|
+ builder.WithConnectionValidator(c =>
|
|
|
|
+ {
|
|
|
|
+ //从IServiceCollection中构建 ServiceProvider, 用以使用注入访问数据库的服务
|
|
|
|
+ // var serprovider = services.BuildServiceProvider();
|
|
|
|
+
|
|
|
|
+ // _logger.LogInformation($" ClientId:{c.ClientId} Endpoint:{c.Endpoint} Username:{c.Username} Password:{c.Password} WillMessage:{c.WillMessage}");
|
|
|
|
+
|
|
|
|
+ //if (c.ClientId.Length < 5)
|
|
|
|
+ //{
|
|
|
|
+ // c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
|
|
|
|
+ // return;
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ //if (c.Username != "admin")
|
|
|
|
+ //{
|
|
|
|
+ // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
|
|
|
|
+ // return;
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ //if (c.Password != "public")
|
|
|
|
+ //{
|
|
|
|
+ // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
|
|
|
|
+ // return;
|
|
|
|
+ //}
|
|
|
|
+ c.ReasonCode = MqttConnectReasonCode.Success;
|
|
|
|
+ })
|
|
|
|
+ .WithApplicationMessageInterceptor(context =>
|
|
|
|
+ {
|
|
|
|
+ //if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#"))
|
|
|
|
+ //{
|
|
|
|
+ // context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ //if (context.ApplicationMessage.Topic == "not_allowed_topic")
|
|
|
|
+ //{
|
|
|
|
+ // context.AcceptPublish = false;
|
|
|
|
+ // context.CloseConnection = true;
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ // _logger.Log(LogLevel.Information, $"clientId:{context.ClientId}, topic:{context.ApplicationMessage.Topic}");
|
|
|
|
+ // _logger.Log(LogLevel.Information, $"Payload:{Encoding.Default.GetString(context.ApplicationMessage.Payload)}");
|
|
|
|
+ })///订阅拦截验证
|
|
|
|
+ .WithSubscriptionInterceptor((context) =>
|
|
|
|
+ {
|
|
|
|
+ //if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
|
|
|
|
+ //{
|
|
|
|
+ // context.AcceptSubscription = false;
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ //if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
|
|
|
|
+ //{
|
|
|
|
+ // context.AcceptSubscription = false;
|
|
|
|
+ // context.CloseConnection = true;
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ //context.TopicFilter.Topic.start
|
|
|
|
+ });
|
|
|
|
+ });
|
|
|
|
+ services.AddMqttTcpServerAdapter();
|
|
|
|
+ services.AddMqttWebSocketServerAdapter();
|
|
|
|
+ services.AddMqttConnectionHandler() .AddConnections();
|
|
|
|
+ //.AddMqttConnectionHandler().AddConnections().AddMqttTcpServerAdapter();
|
|
|
|
+ //var mqttServerOptions = new MqttServerOptionsBuilder().WithEncryptionSslProtocol(SslProtocols.None)
|
|
|
|
+ // .WithDefaultEndpointPort(9001).WithEncryptedEndpointPort(9002)
|
|
|
|
+ // .WithoutDefaultEndpoint()
|
|
|
|
+ // .Build();
|
|
|
|
+ //services
|
|
|
|
+ // .AddHostedMqttServer(mqttServerOptions)
|
|
|
|
+ // .AddMqttWebSocketServerAdapter()
|
|
|
|
+ // .AddMqttConnectionHandler()
|
|
|
|
+ // //.AddWebSockets(x => { x = new WebSocketOptions { KeepAliveInterval = new TimeSpan(100000000) }; })
|
|
|
|
+ // .AddConnections();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
|
|
|
|
+ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
|
|
|
|
+ {
|
|
|
|
+ if (env.IsDevelopment())
|
|
|
|
+ {
|
|
|
|
+ app.UseDeveloperExceptionPage();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ app.UseRouting();
|
|
|
|
+ app.UseAuthorization();
|
|
|
|
+ //app.UseMqttEndpoint("/mqtt");
|
|
|
|
+ app.UseEndpoints(endpoints =>
|
|
|
|
+ {
|
|
|
|
+ endpoints.MapMqtt("/mqtt");
|
|
|
|
+ endpoints.MapControllers();
|
|
|
|
+ });
|
|
|
|
+ // app.UseConnections(c => c.MapMqtt("/mqtt"));
|
|
|
|
+ app.UseMqttServer(
|
|
|
|
+ server =>
|
|
|
|
+ {
|
|
|
|
+ server.UseApplicationMessageReceivedHandler(e =>
|
|
|
|
+ {
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
|
|
|
|
+ {
|
|
|
|
+ Console.WriteLine(
|
|
|
|
+ $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",
|
|
|
|
+ ConsoleColor.Magenta);
|
|
|
|
+ });
|
|
|
|
+ server.StartedHandler = new MqttServerStartedHandlerDelegate((
|
|
|
|
+ e =>
|
|
|
|
+ {
|
|
|
|
+
|
|
|
|
+ }));
|
|
|
|
+ server.StoppedHandler = new MqttServerStoppedHandlerDelegate(
|
|
|
|
+ e =>
|
|
|
|
+ {
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(
|
|
|
|
+ e =>
|
|
|
|
+ {
|
|
|
|
+ // _logger.LogInformation($"{e.ClientId} is connectioned");
|
|
|
|
+ // _logger.LogInformation($"目前连接总数:{ server.GetClientStatusAsync().Result.Count}");
|
|
|
|
+ });
|
|
|
|
+ server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(
|
|
|
|
+ e =>
|
|
|
|
+ {
|
|
|
|
+
|
|
|
|
+ // _logger.LogInformation($"{e.ClientId} is disconnectioned");
|
|
|
|
+ // _logger.LogInformation($"目前连接总数:{ server.GetClientStatusAsync().Result.Count}");
|
|
|
|
+ });
|
|
|
|
+ //开启订阅以及取消订阅
|
|
|
|
+ server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((args) => {
|
|
|
|
+ Console.WriteLine("订阅" + args.ClientId + args.TopicFilter.Topic);
|
|
|
|
+ });
|
|
|
|
+ server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((args) => {
|
|
|
|
+ Console.WriteLine("取消订阅" + args.ClientId + args.TopicFilter);
|
|
|
|
+ });
|
|
|
|
+ server.UseClientConnectedHandler(x => {
|
|
|
|
+ Console.WriteLine(x.ClientId + "连接");
|
|
|
|
+ });
|
|
|
|
+ server.UseClientDisconnectedHandler(x => {
|
|
|
|
+ Console.WriteLine(x.ClientId + "断开连接");
|
|
|
|
+ });
|
|
|
|
+ });
|
|
|
|
+ //app.UseMqttEndpoint("/mqtt");
|
|
|
|
+
|
|
|
|
+ app.UseHttpsRedirection();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|