Android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)

关于MQTT介绍MQTT协议实现方式MQTT服务器MQTT协议中的订阅、主题、会话MQTT协议中的方法MQTT服务质量 (QoS)MQTT服务端(Broker)MQTT客户端

mqtt服务选取Android连接mqtt配置修改mainActivity文件订阅 topic取消订阅 topic发布消息publish断开 MQTT 连接

关于

  可能有很多小伙伴和我一样是初次知道mqtt,然后它是啥,用来干什么那就更不清楚了,前段时间公司要求调研这方面,所以今天这篇文章就来介绍mqtt是啥,以及Android可以用它来干啥。

MQTT介绍

MQTT协议实现方式

  实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。 MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);payload,可以理解为消息的内容,是指订阅者具体要使用的内容。 MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。 当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

MQTT服务器

  MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

(1)接受来自客户的网络连接;(2)接受客户发布的应用信息;(3)处理来自客户端的订阅和退订请求;(4)向订阅的客户转发应用程序消息。

MQTT协议中的订阅、主题、会话

一、订阅(Subscription)   订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。 二、会话(Session)   每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。 三、主题名(Topic Name)   连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。 四、主题筛选器(Topic Filter)   一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。 五、负载(Payload)   消息订阅者所具体接收的内容。

MQTT协议中的方法

  MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

(1)Connect。等待与服务器建立连接。(2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。(3)Subscribe。等待完成订阅。(4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。(5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

MQTT服务质量 (QoS)

  qos为0"至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。 qos为1"至少一次",确保消息到达,但消息重复可能会发生。 qos为2"只有一次",确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。 可以在订阅/发布消息的时候设置服务质量。

MQTT服务端(Broker)

  网上有开源的服务端项目代码可以在电脑上进行搭建,也可以试用一些供应商的服务器。

MQTT客户端

经调研,Android开发mqtt客户端主流使用的是eclipse提供的paho.mqtt,项目引用:

implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'

implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

mqtt服务选取

  因为是调研,所以就不考虑自己去打一个mqtt的服务,网上搜能搜到很多搭建的mqtt,这里我是用了一个三方可以免费试用14天的服务方EMQ,使用前需要注册一个账号,如果你有github账号的话可以直接提供使用。 然后我们选中试用的服务(基础版和专业版都可以试用14天,这边建议试用专业版的),我因为之前已经试用过了专业版,所以现在只能试用基础版的了,专业版的好处就是提供的ip:   然后我们等待它自动部署好项目即可:   完成之后我们需要添加一个认证用户,这个认证用户用来连接时候判断身份用的:   认证的用户及密码需要记一下。

Android连接mqtt

配置

  首先要在项目的build文件里添加如下引用:

implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'

implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

其实这里有个问题,target为Android 12的(31),需要去做很多修改,包括去掉引用,当然这部分我准备放到第二篇里面来讲如何适配Android12版本手机实现mqtt使用。   修改Androidmanifest.xml文件,添加权限和mqttservice的的注册:

修改mainActivity文件

private val TAG = "MqttClient"

private lateinit var mqttClient: MqttAndroidClient

override fun onCreate(savedInstanceState: Bundle?) {

//....

val serverURI = "tcp://pc1c6e1c.cn-shenzhen.emqx.cloud:11838"

mqttClient = MqttAndroidClient(this, serverURI, "kotlin_mqtt_test1") //"kotlin_mqtt_test1"是作为连接客户端的名称来使用,所以要注意避免重复

}

fun connect() {

mqttClient.setCallback(object : MqttCallback {

override fun messageArrived(topic: String?, message: MqttMessage?) {

Log.d(TAG, "Receive message: ${message.toString()} from topic: $topic")

}

override fun connectionLost(cause: Throwable?) {

Log.d(TAG, "Connection lost ${cause.toString()}")

}

override fun deliveryComplete(token: IMqttDeliveryToken?) {

}

})

val options = MqttConnectOptions()

options.apply {

userName = "tobeyr1"

this.password = "1234".toCharArray()

connectionTimeout = 12

this.keepAliveInterval = 0

this.isAutomaticReconnect = false

this.isCleanSession = true

}

try {

mqttClient.connect(options, null, object : IMqttActionListener {

override fun onSuccess(asyncActionToken: IMqttToken?) {

Log.d(TAG, "Connection success")

}

override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {

Log.d(TAG, "Connection failure")

}

})

} catch (e: MqttException) {

e.printStackTrace()

}

}

  其中我们的serverurl可以在橄榄中看到:

  然后我们可以在调试台的监控里面看到已经连接到了mqtt服务:

订阅 topic

private fun subscribe(topic: String, qos: Int = 1) {

try {

mqttClient.subscribe(topic, qos, null, object : IMqttActionListener {

override fun onSuccess(asyncActionToken: IMqttToken?) {

Log.d(TAG, "Subscribed to $topic")

}

override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {

Log.d(TAG, "Failed to subscribe $topic")

}

})

} catch (e: MqttException) {

e.printStackTrace()

}

}

  订阅成功之后也可以在控制台看到订阅的主题:

取消订阅 topic

private fun unsubscribe(topic: String) {

try {

mqttClient.unsubscribe(topic, null, object : IMqttActionListener {

override fun onSuccess(asyncActionToken: IMqttToken?) {

Log.d(TAG, "Unsubscribed to $topic")

}

override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {

Log.d(TAG, "Failed to unsubscribe $topic")

}

})

} catch (e: MqttException) {

e.printStackTrace()

}

}

发布消息publish

private fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {

try {

val message = MqttMessage()

message.payload = msg.toByteArray()

message.qos = qos

message.isRetained = retained

mqttClient.publish(topic, message, null, object : IMqttActionListener {

override fun onSuccess(asyncActionToken: IMqttToken?) {

Log.d(TAG, "$msg published to $topic")

}

override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {

Log.d(TAG, "Failed to publish $msg to $topic")

}

})

} catch (e: MqttException) {

e.printStackTrace()

}

}

断开 MQTT 连接

private fun disconnect() {

try {

mqttClient.disconnect(null, object : IMqttActionListener {

override fun onSuccess(asyncActionToken: IMqttToken?) {

Log.d(TAG, "Disconnected")

}

override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {

Log.d(TAG, "Failed to disconnect")

}

})

} catch (e: MqttException) {

e.printStackTrace()

}

}

  好了,本篇简单介绍Android(11及以下版本)连接mqtt服务就到此结束了,下篇《Android使用MQTT订阅及发布消息((二)兼容Android12 封装Mqtt客户端service)》将会介绍兼容Android12版本调用mqtt服务。有问题欢迎批评指正,觉得不错的也请点个赞,多谢。

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: