本帖最后由 mzwhhwj 于 2017-9-11 22:57 编辑
在上一篇文章中,只是讲了MQTT
的主要内容,至于怎么移植到STM32上,怎么使用才是最重要的关键。这里使用的平台是RT8711
的WIFI SOC,使用的LWIP跟FreeRTOS,移植使用跟STM32+LWIP是没什么区别的。 先在Github
上找到Eclipse的开源MQTT客户端程序https://github.com/eclipse/paho.mqtt.embedded-c.git,并把源码下载起来
解压源码,再进入MQTTPacket
文件夹,里面有三个文件夹
把src
里面的所有文件和samples下的transport.c、transport.h两个文件复制到工程目录下。这里我们主要的移植工作就在transport里面。打开transport.c文件,这个是MQTT连接,发送,接收的接口,源码是Linux跟Windows平台,用的标准的Socket接口函数,我们这里的移植工作量很小,因为LWIP也是支持标准的Socket接口函数,只不过里面有些函数接口是LWIP不支持的,主要就是transport_open这个连接函数有区别。把原来的transport_open函数注释掉,重新写一个。[mw_shl_code=c,true]int transport_open(char* addr, int port)
{
int* sock = &mysock;
struct hostent *server;
struct sockaddr_in serv_addr;
static struct timeval tv;
int timeout = 1000;
fd_set readset;
fd_set writeset;
*sock = socket(AF_INET, SOCK_STREAM, 0);
if(*sock < 0)
DiagPrintf("[ERROR] Create socket failed
");
server = gethostbyname(addr);
if(server == NULL)
DiagPrintf("[ERROR] Get host ip failed
");
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
memcpy(&serv_addr.sin_addr.s_addr,server->h_addr,server->h_length);
if (connect(*sock,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0){
DiagPrintf("[ERROR] connect failed
");
return -1;
}
tv.tv_sec = 10; /* 1 second Timeout */
tv.tv_usec = 0;
setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout));
return mysock;
}[/mw_shl_code]
到这里其实移植工作就已经完成了,就是这么的简单,剩下就是怎么使用MQTT
。直接上代码,这里用FreeRTOS新建一个MQTT的任务。[mw_shl_code=c,true]#define HOST_NAME "m2m.eclipse.org"#define HOST_PORT 1883
void mqtt_thread( void *arg)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
MQTTString receivedTopic;
int rc = 0;
char buf[200];
int buflen = sizeof(buf);
int mysock = 0;
MQTTString topicString = MQTTString_initializer;
int payloadlen_in;
unsigned char* payload_in;
unsigned short msgid = 1;
int subcount;
int granted_qos =0;
unsigned char sessionPresent, connack_rc;
unsigned short submsgid;
int len = 0;
int req_qos = 1;
unsigned char dup;
int qos;
unsigned char retained;
char *host = "m2m.eclipse.org";
int port = 1883;
uint8_t msgtypes = CONNECT;
uint32_t curtick = xTaskGetTickCount();
log_info("socket connect to server");
mysock = transport_open(host,port);
if(mysock < 0)
return mysock;
log_notice("Sending to hostname %s port %d
", host, port);
data.clientID.cstring = "me";
data.keepAliveInterval = 50;
data.cleansession = 1;
data.username.cstring = "";
data.password.cstring = "";
data.MQTTVersion = 4;
while(1)
{
if((xTaskGetTickCount() - curtick) >(data.keepAliveInterval/2*1000))
{
if(msgtypes == 0)
{
curtick = xTaskGetTickCount();
msgtypes = PINGREQ;
}
}
switch(msgtypes)
{
case CONNECT: len = MQTTSerialize_connect(buf, buflen, &data);
rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
if (rc == len)
log_info("send CONNECT Successfully");
else
log_debug("send CONNECT failed");
log_info("MQTT concet to server!");
msgtypes = 0;
break;
case CONNACK: if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
{
log_info("Unable to connect, return code %d
", connack_rc);
}
else log_info("MQTT is concet OK!");
msgtypes = SUBSCRIBE;
break;
case SUBSCRIBE: topicString.cstring = "ledtest";
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
if (rc == len)
log_info("send SUBSCRIBE Successfully
");
else
log_debug("send SUBSCRIBE failed
");
log_info("client subscribe:[%s]",topicString.cstring);
msgtypes = 0;
break;
case SUBACK: rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
log_info("granted qos is %d
", granted_qos);
msgtypes = 0;
break;
case PUBLISH: rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, &payload_in, &payloadlen_in, buf, buflen);
log_info("message arrived : %s
", payload_in);
if(strstr(payload_in,"on"))
{
log_notice("LED on!!");
}
else if(strstr(payload_in,"off"))
{
log_notice("LED off!!");
}
if(qos == 1)
{
log_info("publish qos is 1,send publish ack.");
memset(buf,0,buflen);
len = MQTTSerialize_ack(buf,buflen,PUBACK,dup,msgid); //publish ack
rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
if (rc == len)
log_info("send PUBACK Successfully");
else
log_debug("send PUBACK failed");
}
msgtypes = 0;
break;
case PUBACK: log_info("PUBACK!");
msgtypes = 0;
break;
case PUBREC: log_info("PUBREC!"); //just for qos2
break;
case PUBREL: log_info("PUBREL!"); //just for qos2
break;
case PUBCOMP: log_info("PUBCOMP!"); //just for qos2
break;
case PINGREQ: len = MQTTSerialize_pingreq(buf, buflen);
rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
if (rc == len)
log_info("send PINGREQ Successfully
");
else
log_debug("send PINGREQ failed
");
log_info("time to ping mqtt server to take alive!");
msgtypes = 0;
break;
case PINGRESP: log_info("mqtt server Pong");
msgtypes = 0;
break;
}
memset(buf,0,buflen);
rc=MQTTPacket_read(buf, buflen, transport_getdata);
if(rc >0)
{
msgtypes = rc;
log_info("MQTT is get recv:");
}
gpio_write(&gpio_led, !gpio_read(&gpio_led));
}
exit:
transport_close(mysock);
log_info("mqtt thread exit.");
vTaskDelete(NULL);
}[/mw_shl_code]
MQTT
服务器仍然是上篇文章用的m2m.eclipse.org,一开始就是调用 前面移植的transport_open(host,port)去连接MQTT服务器,并返回套接字。然后就是前面说的登录、订阅、发布、心跳等操作,这里在While里用状态来实现整个连接。首先就是CONNECT
登录,MQTTPacket_connectData data = MQTTPacket_connectData_initializer初始化登录数据结构体,然后再对data进行初始化,data.clientID.cstring = "me";data.keepAliveInterval = 50;data.cleansession = 1;data.username.cstring = "";data.password.cstring = "";data.MQTTVersion = 4;
这里表示cilentID
为 me,心跳时间为50,用户名跟密码都为空,结构初始化后,就是要对数据进去打包,调用MQTTSerialize_connect函数。打包后就是调用transport_sendPacketBuffer发送数据包。数据发送完后是调用MQTTPacket_read去接收服务器返回来的数据,并得到返回的数据包类型,根据数据包类型进行不同的逻辑处理。发送CONNECT后服务器对相应的返回CONNACK数据包表示登录成功。 登录成功后,开始订阅我们想要Topics
,这里订阅一个”ledtest”
的Tipics
。数据包的初始化、打包、发送跟登录是相似的,就不详述,直接看代码就可以了,服务器会相应的返回SUBACK。 接下来是心跳,通过获取系统的Tick
来判断是否要发PINGREQ,如果的话,让状态机状态为PINGREQ后去发送心跳包,相应服务器会返回PINGRESP。 最后是PUBLISH
,这是MQTT的最主要的通信协议,这里我只实现的客户端接收PUBLISH,其它的使用,其实都可以在源码的MQTTPacket里面sample可以找到例程。PUBLISH实现也很简单,定阅了Topics之后,只要其它客户端向这个Topics推送数据,服务器就会转发到订阅者上。MQTTPacket_read接收到服务器的推送,解析成PUBLISH数据包,然后状态机改变状态到PUBLISH再调用MQTTDeserialize_publish进行解包,得到推送的内容,最后根据推送的内容执行相应的动作,就实现远程控制。 整个代码的效果如下:
可以看到MQTT
的登录,订阅,还有在PC上通过MQTT.fx推送信息给ledtest,RT8711上收到推送的内容,并执行打开LED的动作。 也许有人会问,如果STM32
或者其它单片机是用WIFI模块或者GPRS模块,没有LWIP的怎么办。其实只要理解的MQTT的源码,就不难用GPRS或者WiFi模块去实现。MQTT的源码里都是对协议包进行打包解包,数据传输都是在tranport.c里面,我们完全不用transport,可以自己写通信接口,然后把打包的数据包通过模块发出去,写接收接口,把模块接收到服务器数据调用MQTT解包接口解析就可以了。
要在opt.h打开超时
/**
* LWIP_SO_RCVTIMEO==1: Enable receive timeout for sockets/netconns and
* SO_RCVTIMEO processing.
*/
#if !defined LWIP_SO_RCVTIMEO || defined __DOXYGEN__
#define LWIP_SO_RCVTIMEO 1
#endif
一周热门 更多>