1.安装依赖:
npm install mqtt
或者使用import mqtt from '../../utils/mqtt.js';//使用静态包
2.使用:
import mqtt from 'mqtt'
var client=null
const options={//默认配置
port:8083,//默认端口
connectTimeout: 4000, //超时4s
clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8),//随机id
username: 'admin', // 账户名密码没就不用写
password: 'admin',
clean: true,
reconnectPeriod: 3000, //1000毫秒,两次重新连接之间的间隔
resubscribe: true //如果连接断开并重新连接,则会再次自动订阅已订阅的主题(默认true)
}
client= mqtt.connect('ws://192.168.31.151/mqtt',options)
export default {
data() {
return {
id:'';//id
}
},
created(){
this.connecter()
},
methods:{
//连接
connecter(){
//监听连接成功
client.on('connect',(e)=>{
console.log('连接成功')
//订阅主题
client.subscribe('/test/helloworld',{qos:0},(error)=>{
console.log(error)
})
})
//消息监听
client.on('message',(topic,message)=>{
console.log('收到来自'+topic+'的消息',message.toString())
console.log('receiveMessage ' ,JSON.parse(message));
})
//重连监听
client.on('reconnect',(error)=>{
console.log('正在重接',error)
})
//错误监听
client.on('error',(error)=>{
console.log('连接失败',error)
})
// 连接断开后触发的回调
client.on("close", function () {
// console.log("已断开连接")
});
},
//订阅
subscribe: function () {
const that = this;
if (!tclient || client.connected != true) {// 判断是否已成功连接
that.$message.error({ content: '客户端未连接!' });
that.connecter()
return;
}
// 订阅主题(网页接收主题)
// mqtt/磅点ID+车道/ReceivedCarNumber 测试主题: let Topic = 'mqtt/1/10086/ledContent'; // mqtt/磅点ID+车道/ReceivedCarNumber
let Topic1 = 'mqtt/1/' + that.sendMassage.stationId + '/ledContent';
let Topic2 = 'mqtt/2/' + that.sendMassage.stationId + '/ledContent';
let Topic3 = 'mqtt/3/' + that.sendMassage.stationId + '/ledContent';
let Topic4 = 'mqtt/4/' + that.sendMassage.stationId + '/ledContent';
// 【1】[Topic1, Topic2]:订阅多个主题。
// 【2】QoS: (1)0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息;(2)1:消息传递至少 1 次;(3)2:消息仅传送一次。
client.subscribe([Topic1, Topic2, Topic3, Topic4], { qos: 0 }, error => {
if (!error) {
that.$message.success({ content: '订阅成功' });
} else {
that.$message.error({ content: '订阅失败!' + error });
}
});
},
// 推送消息
publish: function () {
const that = this;
if (!client || client.connected != true) {// 判断是否已成功连接
that.$message.error({ content: '客户端未连接!' });
that.connecter()
return;
}
// mqtt/车道/指令/ReceivedCarNumber 网页推送(发布)主题
let Topic = `mqtt/${车道}/${指令}/ledContent`;
client.publish(Topic, JSON.stringify("发送的信息(这里对象格式)"), error => {
if (!error) {
that.$message.success("指令发送成功");
} else {
that.$message.error({ content: '指令发送失败!' + error });
}
});
},
},
destroyed() {
if(client) {
client.end() //离开页面的时候 关闭mqtt连接
client = null
}
}
}
qos:0 =>Sender 发送的一条消息,Receiver 最多能收到一次,即Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
qos:1 =>Sender 发送的一条消息,Receiver 至少能收到一次,即Sender 向 Receiver 发送消息,如果发送失败,会继续重试,
直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
qos:2 =>Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,即Sender 尽力向 Receiver 发送消息,如果发送失败,
会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息
3.相关链接:
mqtt.js包 EMQ GitHub=>MQTT.js
4.【案例】【MQTT】MQTT实现前端与软件交互
5.使用 WebSocket 客户端连接 MQTT 服务器
【1】安装 MQTT.js
如果读者机器上装有 Node.js 运行环境,可使用 npm 命令安装 MQTT.js 在当前目录安装
npm i mqtt
(1)全局安装 将注册 mqtt mqtt_pub mqtt_sub 命令到当前用户,此处借助 iot.eclipse.org 讲解一下命令行的使用
# 全局安装
npm i mqtt -g
# 使用命令行订阅
$ mqtt sub -t 'hello' -h 'iot.eclipse.org' -v
> hello 09860
# 成功连接到服务器并订阅了主题 hello, 命令行将阻塞等待消息
# 在另一个终端上使用命令行发布
mqtt pub -t 'hello' -h 'iot.eclipse.org' -m 'from MQTT.js'
# 命令行将进行 连接 -> 发布 -> 断开连接 操作,此时读者会到订阅命令行,应当收到来自 hello 主题的消息
> hello from MQTT.js
npm 在当前目录安装仍然可以使用 ./node_module/.bin/mqtt 命令来执行以上操作。
(2)CDN 引用
#MQTT.js 包可以通过 http://unpkg.com 获得
// 将在全局初始化一个 mqtt 变量
console.log(mqtt)
【2】连接至 MQTT 服务器
几个公共的用于 WebSocket 测试连接服务器:
test.mosquitto.org - 使用端口 8080 未加密,8081 用于 SSL 上的 WebSocket; iot.eclipse.org - 使用端口 80 未加密,443 用于 SSL 上的 WebSocket; broker.hivemq.com - 使用端口 8000 未加密,不支持 SSL 上的 WebSocket。 由于需要展示客户端认证部分内容,但上述服务器未提供客户端认证服务,笔者特通过 ActorCloud 平台注册了一个设备进行接入连接。
EMQ 使用 8083 端口用于普通连接,8084 用于 SSL 上的 WebSocket 连接。
//
// const mqtt = require('mqtt')
import mqtt from 'mqtt'
// 连接选项
const options = {
connectTimeout: 4000, // 超时时间
// 认证信息
clientId: 'emqx-connect-via-websocket',
username: 'emqx-connect-via-websocket',
password: 'emqx-connect-via-websocket',
}
const client = mqtt.connect('wss://iot.actorcloud.io:8084/mqtt', options)
client.on('reconnect', (error) => {
console.log('正在重连:', error)
})
client.on('error', (error) => {
console.log('连接失败:', error)
})
【3】连接地址
上文示范的连接地址可以拆分为: wss: // iot . actorcloud.io : 8084 /mqtt 即 协议 // 主机名 . 域名 : 端口 / 路径
初学者容易出现以下几个错误: (1)连接地址没有指明协议:WebSocket 作为一种通信协议,其使用 ws(非加密)、wss(SSL 加密) 作为协议标识。MQTT.js 客户端支持多种协议,连接地址需指明协议类型; (2) 连接地址没有指明端口:MQTT 并未对 WebSocket 接入端口做出规定,EMQ 上默认使用 8083 8084 分别作为非加密连接、加密连接端口。而 WebSocket 协议默认端口同 HTTP 保持一致 (80/443),不填写端口则表明使用 WebSocket 的默认端口连接;而使用标准 MQTT 连接时则无需指定端口,如 MQTT.js 在 Node.js 端可以使用 mqtt://localhost 连接至标准 MQTT 8083 端口,当连接地址是 mqtts://localhost 则连接到 8884 端口; (3)连接地址无路径:MQTT-WebSoket 统一使用 /path 作为连接路径,连接时需指明; (4)协议与端口不符:使用了 wss 连接却连接到 8083 端口; (5)在 HTTPS 下使用非加密的 WebSocket 连接: Google 等机构在推进 HTTPS 的同时也通过浏览器约束进行了安全限定,即 HTTPS 连接下浏览器会自动禁止使用非加密的 ws 协议发起连接请求; (6)证书与连接地址不符: 篇幅较长,详见下文 EMQ 启用 SSL/TLS 加密连接。
【4】连接选项
上面代码中, options 是客户端连接选项,以下是主要参数说明,其余参数详见https://www.npmjs.com/package/mqtt#connect。 keepalive:心跳时间,默认 60秒,设置 0 为禁用; clientId: 客户端 ID ,默认通过 ‘mqttjs_’ + Math.random().toString(16).substr(2, 8) 随机生成; username:连接用户名(如果有); password:连接密码(如果有); clean:true,设置为 false 以在离线时接收 QoS 1 和 2 消息; reconnectPeriod:默认 1000 毫秒,两次重新连接之间的间隔,客户端 ID 重复、认证失败等客户端会重新连接; connectTimeout:默认 30 * 1000毫秒,收到 CONNACK 之前等待的时间,即连接超时时间。
【5】订阅/取消订阅
连接成功之后才能订阅,且订阅的主题必须符合 MQTT 订阅主题规则; 注意 JavaScript 异步非阻塞特性,只有在 connect 事件后才能确保客户端已成功连接,或通过 client.connected 判断是否连接成功:
// 错误示例
client.on('connect', handleConnect)
client.subscribe('hello')
client.publish('hello', 'Hello EMQ')
// 正确示例
client.on('connect', (e) => {
console.log('成功连接服务器')
// 订阅一个主题
client.subscribe('hello', { qos: 1 }, (error) => {
if (!error) {
cosnole.log('订阅成功')
client.publish('hello', 'Hello EMQ', { qos: 1, rein: false }, (error) => {
cosnole.log(error || '发布成功')
})
}
})
// 订阅多个主题
client.subscribe(['hello', 'one/two/three/#', '#'], { qos: 1 }, onSubscribeSuccess)
// 订阅不同 qos 的不同主题
client.subscribe(
[
{ hello: 1 },
{ 'one/two/three': 2 },
{ '#': 0 }
],
onSubscribeSuccess,
)
})
// 取消订阅
client.unubscribe(
// topic, topic Array, topic Array-Onject
'hello',
onUnubscribeSuccess,
)
【6】发布/接收消息
发布消息到某主题,发布的主题必须符合 MQTT 发布主题规则,否则将断开连接。发布之前无需订阅该主题,但要确保客户端已成功连接:
// 监听接收消息事件
client.on('message', (topic, message) => {
console.log('收到来自', topic, '的消息', message.toString())
})
// 发布消息
if (!client.connected) {
console.log('客户端未连接')
return
}
client.publish('hello', 'hello EMQ', (error) => {
console.log(error || '消息发布成功')
})
【7】微信小程序
MQTT.js 库对微信小程序特殊处理,使用 wxs 协议标识符。注意小程序开发规范中要求必须使用加密连接,连接地址应类似为wxs://iot.actorcloud.io:8084/mqtt。
【8】EMQ 启用 SSL/TLS 加密连接
EMQ 内置自签名证书,默认已经启动了加密的 WebSocket 连接,但大部分浏览器会报证书无效错误如net::ERR_CERT_COMMON_NAME_INVALID (Chrome、360 等 webkit 内核浏览器在开发者模式下, Console 选项卡 可以查看大部分连接错误)。
【9】准备工作
这篇文章 https流程和原理 中对证书认证进行了详细的阐述,EMQ 君总结启用 SSL/TLS 证书需要具备的条件是: 将域名绑定到 EMQ 服务器公网地址:CA 机构签发的证书签名是针对域名的; 申请证书:向 CA 机构申请所用域名的证书,注意选择一个可靠的 CA 机构且证书要区分泛域名与主机名; 使用加密连接的时候选择 wss 协议,并使用域名连接:绑定域名-证书之后,必须使用域名而非 IP 地址进行连接,这样浏览器才会根据域名去校验证书以在通过校验后建立连接。
【10】在 EMQ 上配置
打开 etc/emqx.conf 配置文件,修改以下配置
# wss 监听地址
listener.wss.external = 8084
# 修改密钥文件地址
listener.wss.external.keyfile = etc/certs/cert.key
# 修改证书文件地址
listener.wss.external.certfile = etc/certs/cert.pem
重启 EMQ 即可。
可以使用你的证书与密钥文件直接替换到 etc/certs/ 下。
【11】在 nginx 上配置反向代理与证书
使用 nginx 来反向代理并加密 WebSocket 可以减轻 EMQ 服务器计算压力,同时实现域名复用,同时通过 nginx 的负载均衡可以分配多个后端服务实体。
# 建议 WebSocket 也绑定到 443 端口
listen 443, 8084;
server_name example.com;
ssl on;
ssl_certificate /etc/cert.crt; # 证书路径
ssl_certificate_key /etc/cert.key; # 密钥路径
# upstream 服务器列表
upstream emq_server {
server 10.10.1.1:8883 weight=1;
server 10.10.1.2:8883 weight=1;
server 10.10.1.3:8883 weight=1;
}
# 普通网站应用
location / {
root www;
index index.html;
}
# 反向代理到 EMQ 非加密 WebSocket
location / {
proxy_redirect off;
# upstream
proxy_pass http://emq_server;
proxy_set_header Host $host;
# 反向代理保留客户端地址
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $remote_addr:$remote_port;
# WebSocket 额外请求头
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection “upgrade”;
}
精彩文章
发表评论