DSP

MQTT学习笔记

2019-07-13 20:30发布

一.工作原理:

  mqtt包括客户端、代理两部分, 上图中代理为移动的Onenet服务器. 举个例子: 春秋淹城的某一个垃圾桶满了, 它向移动的Onenet平台发送了一个满溢报警的消息. 然后云平台转发给景区里所有的环卫工人. 加一个智能算法, 择优推送给环卫工人. 背后的工作机制就是我们使用的MQTT协议. 客户端为每一个环卫工人的手机APP和所有垃圾桶上的传感器设备.  客户端首先向代理Onenet发起请求,代理Onenet收到后对客户端认证,认证通过后在客户端和代理之间建立一个TCP长连接通道(这时候平台上就会显示设备在线啦). 其中TOPIC(主题)是一个产品下的. 客户端可以订阅和发布这个TOPIC, 发布了这个TOPIC后, name所有订阅此TOPIC的客户端都会收到代理的推送. 再来一个智能家居系统的例子: 末端智能电器与手机为客户端,云中心为代理。客户端首先向代理发起请求,代理收到后对客户端认证,认证通过后在客户端和代理之间建立一个TCP长连接通道,客户端通过该通道订阅若干关注的主题(TOPIC),同时在自身状态发生变化时,向相应的主题发布消息,代理将该消息发给正在订阅该主题的所有客户端。与http不同,mqtt是一种多对多的通信协议,设备不直接相连,而是通过一个代理实现互相通信。他是一种天然的异步协议,可以很好地将请求端和响应端解耦。

二.MQTT优点(适合物联网)

MQTT 与 HTTP 不同,后者是基于请求/响应方式的,服务器端无法直接发送数据给客户端。而 MQTT 是基于发布/订阅模式的,所有的客户端均与服务端保持连接状态   MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性: 1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合; 2.对负载内容屏蔽的消息传输; 3.使用 TCP/IP 提供网络连接; 4.有三种消息发布服务质量:
  • “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
  • “至少一次”,确保消息到达,但消息重复可能会发生。
  • “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5.小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量; 6.使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制. 是不是很适合物联网呀生气

三.MQTT的c#简单实现:

转自: http://www.cnblogs.com/kuige/articles/7724786.html 服务器端的建立:    (基于MQTTnet包) using MQTTnet; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; using System; using System.Text; using System.Threading; namespace MqttServerTest { class Program { private static MqttServer mqttServer = null; static void Main(string[] args) { MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished; new Thread(StartMqttServer).Start(); while (true) { var inputString = Console.ReadLine().ToLower().Trim(); if (inputString == "exit") { mqttServer?.StopAsync(); Console.WriteLine("MQTT服务已停止!"); break; } else if (inputString == "clients") { foreach (var item in mqttServer.GetConnectedClients()) { Console.WriteLine($"客户端标识:{item.ClientId},协议版本:{item.ProtocolVersion}"); } } else { Console.WriteLine($"命令[{inputString}]无效!"); } } } private static void StartMqttServer() { if (mqttServer == null) { try { var options = new MqttServerOptions { ConnectionValidator = p => { if (p.ClientId == "c001") { if (p.Username != "u001" || p.Password != "p001") { return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; } } return MqttConnectReturnCode.ConnectionAccepted; } }; mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer; mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived; mqttServer.ClientConnected += MqttServer_ClientConnected; mqttServer.ClientDisconnected += MqttServer_ClientDisconnected; } catch (Exception ex) { Console.WriteLine(ex.Message); return; } } mqttServer.StartAsync(); Console.WriteLine("MQTT服务启动成功!"); } private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e) { Console.WriteLine($"客户端[{e.Client.ClientId}]已连接,协议版本:{e.Client.ProtocolVersion}"); } private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e) { Console.WriteLine($"客户端[{e.Client.ClientId}]已断开连接!"); } private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Console.WriteLine($"客户端[{e.ClientId}]>> 主题:{e.ApplicationMessage.Topic} 负荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}"); } private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e) { /*Console.WriteLine($">> 线程ID:{e.ThreadId} 来源:{e.Source} 跟踪级别:{e.Level} 消息: {e.Message}"); if (e.Exception != null) { Console.WriteLine(e.Exception); }*/ } } } 客户端的建立: using MQTTnet; using MQTTnet.Core; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; namespace MqttClientWin { public partial class FmMqttClient : Form { private MqttClient mqttClient = null; public FmMqttClient() { InitializeComponent(); Task.Run(async () => { await ConnectMqttServerAsync(); }); } private async Task ConnectMqttServerAsync() { if (mqttClient == null) { mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient; mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived; mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += MqttClient_Disconnected; } try { var options = new MqttClientTcpOptions { Server = "127.0.0.1", ClientId = Guid.NewGuid().ToString().Substring(0, 5), UserName = "u001", Password = "p001", CleanSession = true }; await mqttClient.ConnectAsync(options); } catch (Exception ex) { Invoke((new Action(() => { txtReceiveMessage.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine); }))); } } private void MqttClient_Connected(object sender, EventArgs e) { Invoke((new Action(() => { txtReceiveMessage.AppendText("已连接到MQTT服务器!" + Environment.NewLine); }))); } private void MqttClient_Disconnected(object sender, EventArgs e) { Invoke((new Action(() => { txtReceiveMessage.AppendText("已断开MQTT连接!" + Environment.NewLine); }))); } private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Invoke((new Action(() => { txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); }))); } private void BtnSubscribe_ClickAsync(object sender, EventArgs e) { string topic = txtSubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) { MessageBox.Show("订阅主题不能为空!"); return; } if (!mqttClient.IsConnected) { MessageBox.Show("MQTT客户端尚未连接!"); return; } mqttClient.SubscribeAsync(new List { new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce) }); txtReceiveMessage.AppendText($"已订阅[{topic}]主题" + Environment.NewLine); txtSubTopic.Enabled = false; btnSubscribe.Enabled = false; } private void BtnPublish_Click(object sender, EventArgs e) { string topic = txtPubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) { MessageBox.Show("发布主题不能为空!"); return; } string inputString = txtSendMessage.Text.Trim(); var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false); mqttClient.PublishAsync(appMsg); } } } 运行效果: 四.MQTT的安卓简单实现 https://github.com/SCFMVP/Mqtt_Android