MQTTExtension.cs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. using HiTeachCE.Models;
  2. using Microsoft.AspNetCore.Builder;
  3. using Microsoft.Extensions.Configuration;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using MQTTnet.AspNetCore;
  6. using MQTTnet.Client.Receiving;
  7. using MQTTnet.Protocol;
  8. using MQTTnet.Server;
  9. using System;
  10. using System.Collections.Generic;
  11. using System.Linq;
  12. using System.Text;
  13. using System.Threading.Tasks;
  14. namespace HiTeachCE.Extension.Mqtt
  15. {
  16. public static class MQTTExtension
  17. {
  18. public static void MQTTConnection(this IServiceCollection services) {
  19. services.AddHostedMqttServerWithServices(
  20. builder =>
  21. {
  22. //builder.WithDefaultEndpoint();
  23. builder.WithDefaultEndpointPort(3000);
  24. builder.WithConnectionValidator(c =>
  25. {
  26. //MQTTInfo info= RedisHelper.HGet<MQTTInfo>("mqtt:"+c.ClientId, c.ClientId);
  27. //if (info != null)
  28. //{
  29. // if (c.Username != info.username)
  30. // {
  31. // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  32. // return;
  33. // }
  34. // if (!BCrypt.Net.BCrypt.Verify(c.Password,info.password))
  35. // {
  36. // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  37. // return;
  38. // }
  39. //}
  40. //else {
  41. // c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
  42. // return;
  43. // }
  44. c.ReasonCode = MqttConnectReasonCode.Success;
  45. })
  46. .WithApplicationMessageInterceptor(context =>
  47. {
  48. ///发送消息拦截器
  49. ///
  50. MQTTInfo info = RedisHelper.HGet<MQTTInfo>("mqtt:" + context.ClientId, context.ClientId);
  51. if (info != null)
  52. {
  53. bool match = false;
  54. foreach (string topic in info.topic)
  55. {
  56. //if (context.TopicFilter.Topic .StartsWith(topic)) {
  57. if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, topic))
  58. {
  59. match = true;
  60. break;
  61. }
  62. }
  63. if (match)
  64. {
  65. context.AcceptPublish = true;
  66. }
  67. else
  68. {///改变
  69. context.AcceptPublish = true;
  70. }
  71. }
  72. else {///改变
  73. context.AcceptPublish = true;
  74. }
  75. })///订阅拦截验证
  76. .WithSubscriptionInterceptor((context) =>
  77. {
  78. MQTTInfo info = RedisHelper.HGet<MQTTInfo>("mqtt:" + context.ClientId, context.ClientId);
  79. if (info != null)
  80. {
  81. bool match = false;
  82. foreach (string topic in info.topic)
  83. {
  84. //if (context.TopicFilter.Topic .StartsWith(topic)) {
  85. if (MqttTopicFilterComparer.IsMatch(context.TopicFilter.Topic, topic))
  86. {
  87. match = true;
  88. break;
  89. }
  90. }
  91. if (match)
  92. {
  93. context.AcceptSubscription = true;
  94. }
  95. else
  96. {///改变
  97. context.AcceptSubscription = true;
  98. }
  99. }
  100. else {///改变
  101. context.AcceptSubscription = true;
  102. }
  103. });
  104. });
  105. services.AddMqttTcpServerAdapter();
  106. services.AddMqttWebSocketServerAdapter();
  107. services.AddMqttConnectionHandler().AddConnections();
  108. }
  109. public static void UseMqtt(this IApplicationBuilder app) {
  110. app.UseMqttServer(
  111. server =>
  112. {
  113. server.UseApplicationMessageReceivedHandler(e =>
  114. {
  115. });
  116. server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
  117. {
  118. Console.WriteLine(
  119. $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",
  120. ConsoleColor.Magenta);
  121. });
  122. server.StartedHandler = new MqttServerStartedHandlerDelegate((
  123. e =>
  124. {
  125. }));
  126. server.StoppedHandler = new MqttServerStoppedHandlerDelegate(
  127. e =>
  128. {
  129. });
  130. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(
  131. e =>
  132. {
  133. // _logger.LogInformation($"{e.ClientId} is connectioned");
  134. // _logger.LogInformation($"目前连接总数:{ server.GetClientStatusAsync().Result.Count}");
  135. });
  136. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(
  137. e =>
  138. {
  139. // _logger.LogInformation($"{e.ClientId} is disconnectioned");
  140. // _logger.LogInformation($"目前连接总数:{ server.GetClientStatusAsync().Result.Count}");
  141. });
  142. //开启订阅以及取消订阅
  143. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((args) => {
  144. Console.WriteLine("订阅" + args.ClientId + args.TopicFilter.Topic);
  145. });
  146. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((args) => {
  147. Console.WriteLine("取消订阅" + args.ClientId + args.TopicFilter);
  148. });
  149. server.UseClientConnectedHandler(x => {
  150. Console.WriteLine(x.ClientId + "连接");
  151. });
  152. server.UseClientDisconnectedHandler(x => {
  153. Console.WriteLine(x.ClientId + "断开连接");
  154. });
  155. });
  156. }
  157. }
  158. }