【Mongoose笔记】MQTT 客户端

简介

Mongoose 笔记系列用于记录学习 Mongoose 的一些内容。

Mongoose 是一个 C/C++ 的网络库。它为 TCP、UDP、HTTP、WebSocket、MQTT 实现了事件驱动的、非阻塞的 API。

项目地址:

https://github.com/cesanta/mongoose

学习

下面通过学习 Mongoose 项目代码中的 mqtt-client 示例程序 ,来学习如何使用 Mongoose 实现一个简单的 MQTT 客户端。使用树莓派平台进行开发验证。

mqtt-client 的示例程序很简洁,代码如下:

// Copyright (c) 2020 Cesanta Software Limited

// All rights reserved

//

// Example MQTT client. It performs the following steps:

// 1. Connects to the MQTT server specified by `s_url` variable

// 2. When connected, subscribes to the topic `s_sub_topic`

// 3. Publishes message `hello` to the `s_pub_topic`

// 4. Receives that message back from the subscribed topic and closes

// 5. Timer-based reconnection logic revives the connection when it is down

//

// To enable SSL/TLS, make SSL=OPENSSL or make SSL=MBEDTLS

#include "mongoose.h"

static const char *s_url = "mqtt://broker.hivemq.com:1883";

static const char *s_sub_topic = "mg/+/test";

static const char *s_pub_topic = "mg/clnt/test";

static int s_qos = 1;

static struct mg_connection *s_conn;

// Handle interrupts, like Ctrl-C

static int s_signo;

static void signal_handler(int signo) {

s_signo = signo;

}

static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {

if (ev == MG_EV_OPEN) {

MG_INFO(("CREATED"));

// c->is_hexdumping = 1;

} else if (ev == MG_EV_ERROR) {

// On error, log error message

MG_ERROR(("%p %s", c->fd, (char *) ev_data));

} else if (ev == MG_EV_CONNECT) {

// If target URL is SSL/TLS, command client connection to use TLS

if (mg_url_is_ssl(s_url)) {

struct mg_tls_opts opts = {.ca = "ca.pem"};

mg_tls_init(c, &opts);

}

} else if (ev == MG_EV_MQTT_OPEN) {

// MQTT connect is successful

struct mg_str subt = mg_str(s_sub_topic);

struct mg_str pubt = mg_str(s_pub_topic), data = mg_str("hello");

MG_INFO(("CONNECTED to %s", s_url));

mg_mqtt_sub(c, subt, s_qos);

MG_INFO(("SUBSCRIBED to %.*s", (int) subt.len, subt.ptr));

mg_mqtt_pub(c, pubt, data, s_qos, false);

MG_INFO(("PUBLISHED %.*s -> %.*s", (int) data.len, data.ptr,

(int) pubt.len, pubt.ptr));

} else if (ev == MG_EV_MQTT_MSG) {

// When we get echo response, print it

struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;

MG_INFO(("RECEIVED %.*s <- %.*s", (int) mm->data.len, mm->data.ptr,

(int) mm->topic.len, mm->topic.ptr));

c->is_closing = 1;

} else if (ev == MG_EV_CLOSE) {

MG_INFO(("CLOSED"));

s_conn = NULL; // Mark that we're closed

}

(void) fn_data;

}

// Timer function - recreate client connection if it is closed

static void timer_fn(void *arg) {

struct mg_mgr *mgr = (struct mg_mgr *) arg;

struct mg_mqtt_opts opts = {.clean = true,

.will_qos = s_qos,

.will_topic = mg_str(s_pub_topic),

.will_message = mg_str("goodbye")};

if (s_conn == NULL) s_conn = mg_mqtt_connect(mgr, s_url, &opts, fn, NULL);

}

int main(void) {

struct mg_mgr mgr;

int topts = MG_TIMER_REPEAT | MG_TIMER_RUN_NOW;

signal(SIGINT, signal_handler); // Setup signal handlers - exist event

signal(SIGTERM, signal_handler); // manager loop on SIGINT and SIGTERM

mg_mgr_init(&mgr); // Init event manager

mg_timer_add(&mgr, 3000, topts, timer_fn, &mgr); // Init timer

while (s_signo == 0) mg_mgr_poll(&mgr, 1000); // Event loop, 1s timeout

mg_mgr_free(&mgr); // Finished, cleanup

return 0;

}

下面先从main函数开始分析代码。

首先是变量定义。struct mg_mgr是用于保存所有活动连接的事件管理器。

struct mg_mgr mgr;

设置定时器标志,用于下面的mg_timer_add函数。其中MG_TIMER_REPEAT表示定时重复调用函数,MG_TIMER_RUN_NOW表示设置定时器后立即调用。

int topts = MG_TIMER_REPEAT | MG_TIMER_RUN_NOW;

设置 signal 函数捕获 SIGINT 信号和 SIGTERM 信号。

signal(SIGINT, signal_handler); // Setup signal handlers - exist event

signal(SIGTERM, signal_handler); // manager loop on SIGINT and SIGTERM

下面是对应的信号处理函数,当 SIGINT 信号和 SIGTERM 信号到达时,修改 s_signo 的值,使其值不为 0,然后会让主事件循环退出。当用户通过 Ctrl-C 结束进程是会发送 SIGINT 信号,通过 kill 命令不带参数时会发送 SIGTERM 信号。当通过以上两种操作时,都能让主事件循环正常退出。

// Handle interrupts, like Ctrl-C

static int s_signo;

static void signal_handler(int signo) {

s_signo = signo;

}

初始化一个事件管理器,也就是将最开始定义的struct mg_mgr变量 mgr 中的数据进行初始化。

mg_mgr_init(&mgr); // Init event manager

调用mg_timer_add设置一个定时器,这会将其添加到事件管理器的内部定时器列表中。其中的参数3000表示 3000 毫秒,topts是定时器标志,timer_fn是要调用的函数,&mgr是要传递的参数。事件管理器将以参数 3000 毫秒的时间间隔调用 timer_fn 函数,并将参数 &mgr 传递给它。

mg_timer_add(&mgr, 3000, topts, timer_fn, &mgr); // Init timer

其中timer_fn函数的作用是,如果客户端连接关闭,则重新创建该连接。

下面我们先看下timer_fn的实现:

// Timer function - recreate client connection if it is closed

static void timer_fn(void *arg) {

结构体struct mg_mqtt_opts用于指定 MQTT 连接选项。clean表示清理会话(clean session) 标志设置为 1,客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。will_qos表示设置遗嘱消息的服务质量(Will message quality of service),设置的参数s_qos的值默认为 1,表示至少一次。will_topic表示设置遗嘱主题(Will topic),设置的参数s_pub_topic默认为mg/clnt/test。will_message表示设置遗嘱消息(Will message)。其中mg_str用于创建 Mongoose 字符串。

struct mg_mgr *mgr = (struct mg_mgr *) arg;

struct mg_mqtt_opts opts = {.clean = true,

.will_qos = s_qos,

.will_topic = mg_str(s_pub_topic),

.will_message = mg_str("goodbye")};

判断s_conn的值,如果s_conn为NULL则创建客户端 MQTT 连接。其中mg_mqtt_connect用于创建客户端 MQTT 连接,s_url是要连接的 URL,fn是事件处理函数。

if (s_conn == NULL) s_conn = mg_mqtt_connect(mgr, s_url, &opts, fn, NULL);

}

其中s_url是一个静态全局变量,默认参数如下:

static const char *s_url = "mqtt://broker.hivemq.com:1883";

broker.hivemq.com是一个免费的公共 MQTT 代理,可以用于 MQTT 测试,TCP 端口为 1883。

分析完timer_fn的实现,我们回到main函数中。

接下来进行事件循环,mg_mgr_poll 遍历所有连接,接受新连接,发送和接收数据,关闭连接,并为各个事件调用事件处理函数。

while (s_signo == 0) mg_mgr_poll(&mgr, 1000); // Event loop, 1s timeout

当 s_signo 不为 0 时,也就是接收到了退出信号,则结束无限循环,调用 mg_mgr_free 关闭所有连接,释放所有资源。

mg_mgr_free(&mgr); // Finished, cleanup

分析完main函数的实现后,我们看下事件处理函数fn的代码。

static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {

判断是否接收到MG_EV_OPEN事件,收到MG_EV_OPEN 事件表示已创建连接。该事件在分配连接并将其添加到事件管理器之后立即发送。创建连接后打印创建的消息。

if (ev == MG_EV_OPEN) {

MG_INFO(("CREATED"));

// c->is_hexdumping = 1;

}

判断是否接收到MG_EV_ERROR事件,表示发生错误。如果发生错误了,会将错误信息打印出来。错误信息通过参数ev_data传递进来,可以直接以字符串形式打印。

} else if (ev == MG_EV_ERROR) {

// On error, log error message

MG_ERROR(("%p %s", c->fd, (char *) ev_data));

}

判断是否接收到MG_EV_CONNECT事件,表示连接已建立。

使用mg_url_is_ssl函数用于检查给定的 URL 是否使用加密方案,如果 MQTT 服务器的 URL s_url是mqtts://,则告诉客户端连接使用 TLS,调用mg_tls_init函数初始化 TLS。其中ca表示证书颁发机构(Certificate Authority),用于验证另一端发送过来的证书,如果为 NULL,则禁用证书检查。

} else if (ev == MG_EV_CONNECT) {

// If target URL is SSL/TLS, command client connection to use TLS

if (mg_url_is_ssl(s_url)) {

struct mg_tls_opts opts = {.ca = "ca.pem"};

mg_tls_init(c, &opts);

}

}

判断是否接收到MG_EV_MQTT_OPEN事件,表示收到 MQTT CONNACK。这个事件是在 MQTT 服务器接受我们作为客户端时发送的,服务端发送 CONNACK 报文响应从客户端收到的 CONNECT 报文。

} else if (ev == MG_EV_MQTT_OPEN) {

// MQTT connect is successful

定义订阅和发布主题、发布的消息的 Mongoose 字符串变量。mg_str用于创建 Mongoose 字符串。

struct mg_str subt = mg_str(s_sub_topic);

struct mg_str pubt = mg_str(s_pub_topic), data = mg_str("hello");

将连接的 url 打印出来。

MG_INFO(("CONNECTED to %s", s_url));

mg_mqtt_sub用于订阅主题。subt表示要订阅的主题,默认参数为mg/+/test。s_qos表示要求的服务质量,默认为 1 ,表示至少一次。然后将订阅信息打印出来。

mg_mqtt_sub(c, subt, s_qos);

MG_INFO(("SUBSCRIBED to %.*s", (int) subt.len, subt.ptr));

mg_mqtt_pub用于发布消息。pubt表示发布数据的主题,默认参数为mg/clnt/test。data表示要发布的数据。s_qos表示要求的服务质量,默认为 1 ,表示至少一次。然后将发布的主题和数据打印出来。

mg_mqtt_pub(c, pubt, data, s_qos, false);

MG_INFO(("PUBLISHED %.*s -> %.*s", (int) data.len, data.ptr,

(int) pubt.len, pubt.ptr));

}

判断是否接收到MG_EV_MQTT_MSG事件,表示收到 MQTT PUBLISH 。接收到了我们订阅的主题所发布的消息。

将接收到的数据和主题打印出来,然后将is_closing设置为 1 ,表示立即关闭并释放连接。

} else if (ev == MG_EV_MQTT_MSG) {

// When we get echo response, print it

struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;

MG_INFO(("RECEIVED %.*s <- %.*s", (int) mm->data.len, mm->data.ptr,

(int) mm->topic.len, mm->topic.ptr));

c->is_closing = 1;

}

判断是否接收到MG_EV_CLOSE事件,表示连接关闭。将s_conn设置为NULL,用于表示我们的连接已经关闭了,以便于之后在定时器的调用函数timer_fn中判断是否创建客户端 MQTT 连接。

} else if (ev == MG_EV_CLOSE) {

MG_INFO(("CLOSED"));

s_conn = NULL; // Mark that we're closed

}

mqtt-client 的示例程序代码就都解析完了,下面实际运行一下 mqtt-client 程序。

打开示例程序,编译并运行:

pi@raspberrypi:~ $ cd Desktop/study/mongoose/examples/mqtt-client

pi@raspberrypi:~/Desktop/study/mongoose/examples/mqtt-client $ make

cc ../../mongoose.c -I../.. -W -Wall -o example main.c

./example

9b4a0 2 main.c:29:fn CREATED

9b6b3 2 main.c:44:fn CONNECTED to mqtt://broker.hivemq.com:1883

9b6b3 2 main.c:46:fn SUBSCRIBED to mg/+/test

9b6b3 2 main.c:50:fn PUBLISHED hello -> mg/clnt/test

9b7a5 2 main.c:55:fn RECEIVED hello <- mg/clnt/test

9b7a5 2 main.c:58:fn CLOSED

查看日志可以看到,这个 mqtt-client 的示例程序完成了 MQTT 客户端创建,连接,订阅主题,发布数据,收到所订阅主题的数据,关闭连接。

如果将这个程序放置在那里,会看到每 3 秒都会重复一边上述的过程:

e465e 2 main.c:29:fn CREATED

e4844 2 main.c:44:fn CONNECTED to mqtt://broker.hivemq.com:1883

e4844 2 main.c:46:fn SUBSCRIBED to mg/+/test

e4844 2 main.c:50:fn PUBLISHED hello -> mg/clnt/test

e492e 2 main.c:55:fn RECEIVED hello <- mg/clnt/test

e492e 2 main.c:58:fn CLOSED

e54e9 2 main.c:29:fn CREATED

e56d6 2 main.c:44:fn CONNECTED to mqtt://broker.hivemq.com:1883

e56d6 2 main.c:46:fn SUBSCRIBED to mg/+/test

e56d6 2 main.c:50:fn PUBLISHED hello -> mg/clnt/test

e57b7 2 main.c:55:fn RECEIVED hello <- mg/clnt/test

e57b7 2 main.c:58:fn CLOSED

e5f89 2 main.c:29:fn CREATED

e618d 2 main.c:44:fn CONNECTED to mqtt://broker.hivemq.com:1883

e618d 2 main.c:46:fn SUBSCRIBED to mg/+/test

e618d 2 main.c:50:fn PUBLISHED hello -> mg/clnt/test

e6286 2 main.c:55:fn RECEIVED hello <- mg/clnt/test

e6286 2 main.c:58:fn CLOSED

【参考资料】

examples/mqtt-client

Documentation

MQTT协议中文版

MQTT Version 3.1.1

The Free Public MQTT Broker by HiveMQ

本文链接:https://blog.csdn.net/u012028275/article/details/128783308

参考链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: