123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- using HiTeachCE.Models;
- using Microsoft.AspNetCore.Builder;
- using Microsoft.Extensions.Configuration;
- using Microsoft.Extensions.DependencyInjection;
- using MQTTnet.AspNetCore;
- using MQTTnet.Client.Receiving;
- using MQTTnet.Protocol;
- using MQTTnet.Server;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace HiTeachCE.Extension.Mqtt
- {
- public static class MQTTExtension
- {
- public static void MQTTConnection(this IServiceCollection services) {
- services.AddHostedMqttServerWithServices(
- builder =>
- {
- //builder.WithDefaultEndpoint();
- builder.WithDefaultEndpointPort(3000);
- builder.WithConnectionValidator(c =>
- {
-
- //MQTTInfo info= RedisHelper.HGet<MQTTInfo>("mqtt:"+c.ClientId, c.ClientId);
- //if (info != null)
- //{
- // if (c.Username != info.username)
- // {
- // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
- // return;
- // }
- // if (!BCrypt.Net.BCrypt.Verify(c.Password,info.password))
- // {
- // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
- // return;
- // }
- //}
- //else {
- // c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
- // return;
- // }
- c.ReasonCode = MqttConnectReasonCode.Success;
-
- })
- .WithApplicationMessageInterceptor(context =>
- {
- ///发送消息拦截器
- ///
-
- MQTTInfo info = RedisHelper.HGet<MQTTInfo>("mqtt:" + context.ClientId, context.ClientId);
- if (info != null)
- {
- bool match = false;
- foreach (string topic in info.topic)
- {
- //if (context.TopicFilter.Topic .StartsWith(topic)) {
- if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, topic))
- {
- match = true;
- break;
- }
- }
- if (match)
- {
- context.AcceptPublish = true;
- }
- else
- {///改变
- context.AcceptPublish = true;
- }
- }
- else {///改变
- context.AcceptPublish = true;
- }
-
-
- })///订阅拦截验证
- .WithSubscriptionInterceptor((context) =>
- {
-
- MQTTInfo info = RedisHelper.HGet<MQTTInfo>("mqtt:" + context.ClientId, context.ClientId);
- if (info != null)
- {
- bool match = false;
- foreach (string topic in info.topic)
- {
- //if (context.TopicFilter.Topic .StartsWith(topic)) {
- if (MqttTopicFilterComparer.IsMatch(context.TopicFilter.Topic, topic))
- {
- match = true;
- break;
- }
- }
- if (match)
- {
- context.AcceptSubscription = true;
- }
- else
- {///改变
- context.AcceptSubscription = true;
- }
- }
- else {///改变
- context.AcceptSubscription = true;
- }
- });
- });
- services.AddMqttTcpServerAdapter();
- services.AddMqttWebSocketServerAdapter();
- services.AddMqttConnectionHandler().AddConnections();
- }
- public static void UseMqtt(this IApplicationBuilder app) {
- app.UseMqttServer(
- server =>
- {
- server.UseApplicationMessageReceivedHandler(e =>
- {
- });
- server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
- {
- Console.WriteLine(
- $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",
- ConsoleColor.Magenta);
- });
- server.StartedHandler = new MqttServerStartedHandlerDelegate((
- e =>
- {
- }));
- server.StoppedHandler = new MqttServerStoppedHandlerDelegate(
- e =>
- {
- });
- server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(
- e =>
- {
- // _logger.LogInformation($"{e.ClientId} is connectioned");
- // _logger.LogInformation($"目前连接总数:{ server.GetClientStatusAsync().Result.Count}");
- });
- server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(
- e =>
- {
- // _logger.LogInformation($"{e.ClientId} is disconnectioned");
- // _logger.LogInformation($"目前连接总数:{ server.GetClientStatusAsync().Result.Count}");
- });
- //开启订阅以及取消订阅
- server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((args) => {
- Console.WriteLine("订阅" + args.ClientId + args.TopicFilter.Topic);
- });
- server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((args) => {
- Console.WriteLine("取消订阅" + args.ClientId + args.TopicFilter);
- });
- server.UseClientConnectedHandler(x => {
- Console.WriteLine(x.ClientId + "连接");
- });
- server.UseClientDisconnectedHandler(x => {
- Console.WriteLine(x.ClientId + "断开连接");
- });
- });
- }
- }
- }
|