import { UTSAndroid } from "io.dcloud.uts"; import MqttClient from 'org.eclipse.paho.client.mqttv3.MqttClient'; import MqttConnectOptions from 'org.eclipse.paho.client.mqttv3.MqttConnectOptions'; import IMqttDeliveryToken from 'org.eclipse.paho.client.mqttv3.IMqttDeliveryToken'; import MqttMessage from 'org.eclipse.paho.client.mqttv3.MqttMessage'; import MqttCallback from 'org.eclipse.paho.client.mqttv3.MqttCallback'; // import Throwable from 'java.lang.Throwable' import Throwable from 'kotlin.Throwable'; import MqttException from 'org.eclipse.paho.client.mqttv3.MqttException'; import MqttCallbackExtended from 'org.eclipse.paho.client.mqttv3.MqttCallbackExtended' import KotlinCharArray from 'kotlin.CharArray'; import KotlinChar from 'kotlin.Char'; import CharArrayWriter from 'java.io.CharArrayWriter'; import PowerManager from 'android.os.PowerManager'; import Context from 'android.content.Context'; import { startConnectedDeviceService, closeConnectedDeviceService } from './service' import Charsets from 'kotlin.text.Charsets' import JSONObject from 'com.alibaba.fastjson.JSONObject' import Thread from 'java.lang.Thread' import Runnable from 'java.lang.Runnable'; class myRunnable implements Runnable { constructor() { } override run() : void { const mqtt = MqttManager.getInstance(null) mqtt.connectMqtt() } } type MqttManagerOption = { host : string, clientId : string, userName : string, password : string, cleanSession ?: boolean, // false 创建可持续会话,前提是clientId一直保持不变, automaticReconnect ?: boolean, heartBeat ?: Int, timeOut ?: Int, keepAlive ?: boolean, // 是否保活 notificationContentText ?: string, // 通知文本 notificationContentTitle ?: string // 通知title offPeriodMsg ?: callbackTypeOff // cleanSession 为false 的时候,重新连接会接收到断开期间的消息 } //cleanSession 解释 //持久会话:当客户端连接到代理服务器时,如果启用了持久会话,代理服务器会记录客户端的订阅信息,即使客户端断开连接,代理服务器也会保存这些订阅信息。当客户端再次连接到代理服务器时,它会恢复之前的订阅信息,并接收它断开连接时的所有未接收的消息。 //清除会话:当客户端连接到代理服务器时,如果启用了清除会话,代理服务器不会保存客户端的订阅信息。当客户端断开连接时,代理服务器会删除与该客户端相关的所有信息。 type callbackType = (res : UTSJSONObject) => void type callbackTypeOff = (topic : string, res : UTSJSONObject) => void type subscribeItemType = { value : Int, qos : Int } let subscribeCallback_1 : callbackType | null = null // 订阅成功回调 let subscribeCallback_2 : callbackType | null = null // 订阅成功回调 let subscribeCallback_3 : callbackType | null = null // 订阅成功回调 let subscribeCallback_4 : callbackType | null = null // 订阅成功回调 let subscribeCallback_5 : callbackType | null = null // 订阅成功回调 let unubscribeCallback : callbackType | null = null // 取消订阅回调 let subscribeMap = {} let connectCallback : callbackType | null = null // 连接回调,包含成功和失败 let publicCallback : callbackType | null = null // 发布消息回调,包含成功和失败 let disConnectCallback : callbackType | null = null // 主动断开连接后的回调 let onDisconnectCallback : callbackType | null = null // 监听连接中途断开的回调 let onReconnectCallback : callbackType | null = null // 监听自动重新连接的回调 let closeClient : Boolean = false // 是否关闭客户端 class MqttManager { HOST : string; // 主机ip CLIENT_ID : string; // 客户端id USER_NAME : string; // 用户名 PASSWAORD : string; // 密码 CLEAN_SESSION : boolean; // 是否清除会话,默认是false AUTO_MATICRECONNECT : boolean; // 是否自动重连,默认是true HEART_BEAT : Int; // 默认60秒 TIME_OUT : Int; // 默认30秒 subscribeList : Array = [] // 订阅列表 mqttClient : MqttClient; // 唯一客户端 manageOption : MqttManagerOption; // 外部传进来的条件 static instance : MqttManager | null = null; // MqttManager单例 static getInstance(option : MqttManagerOption | null) : MqttManager { if (MqttManager.instance == null) { var temp = option as MqttManagerOption MqttManager.instance = new MqttManager(temp); } return MqttManager.instance as MqttManager; } constructor(option : MqttManagerOption) { this.manageOption = option this.HOST = option.host this.CLIENT_ID = option.clientId this.USER_NAME = option.userName this.PASSWAORD = option.password // 设置默认 this.CLEAN_SESSION = option.cleanSession == null ? false : option.cleanSession as boolean this.AUTO_MATICRECONNECT = option.automaticReconnect == null ? true : option.automaticReconnect as boolean this.HEART_BEAT = option.heartBeat == null ? 60 : option.heartBeat as Int this.TIME_OUT = option.timeOut == null ? 30 : option.timeOut as Int this.mqttClient = new MqttClient(option.host, option.clientId, null); } connectMqtt() : void { let callback = connectCallback as callbackType let res : UTSJSONObject try { const connOpts : MqttConnectOptions = new MqttConnectOptions(); connOpts.setCleanSession(this.CLEAN_SESSION); // 原来是true connOpts.setAutomaticReconnect(this.AUTO_MATICRECONNECT); // 自动重连 connOpts.setKeepAliveInterval(this.HEART_BEAT) // 心跳 connOpts.setConnectionTimeout(this.TIME_OUT) // 设置连接超时 connOpts.setMaxReconnectDelay(10000) // 设置最大重新连接间隔(在无法连接时等待的最大毫秒数) connOpts.setUserName(this.USER_NAME); connOpts.setPassword(this.PASSWAORD.toCharArray()); this.mqttClient.connect(connOpts); this.mqttClient.setCallback(new myMqttCallback()); res = { "code": 200, "message": "连接成功", "clientId": this.CLIENT_ID } } catch (e) { console.log(e) let callback = connectCallback as callbackType res = { "code": 500, "message": '连接失败,具体原因请查看控制台' } } callback(res) } subscribe(topic : string, qos : Int) : void { // 订阅主题 try { this.subscribeList.push(topic) console.log(this.subscribeList) this.mqttClient.subscribe(topic, qos); } catch (e) { console.log(e.message) } } unsubscribe(topic : string) : void { // unbscribeCallback let callback = unubscribeCallback as callbackType let res : UTSJSONObject try { this.mqttClient.unsubscribe(topic); // 删除订阅 for (let i = 0; i < this.subscribeList.length; i++) { if (this.subscribeList[i] == topic) { this.subscribeList.splice(i, 1) } } res = { "code": 200, "message": `取消订阅成功`, "topic": topic } } catch (e) { res = { "code": 500, "message": e.message, "topic": topic } } callback(res) } isConnected() : boolean { return this.mqttClient.isConnected() } disconnectMqtt() : void { try { this.mqttClient.disconnect(); if (closeClient == true) { this.mqttClient.close(); } // MqttManager.instance = null // 清空的目的是为了可以让再次重连 let callback = disConnectCallback as callbackType let res : UTSJSONObject res = { "code": 200, "message": '断开连接成功', } callback(res) } catch (e) { let callback = disConnectCallback as callbackType let res : UTSJSONObject res = { "code": 500, "message": e.message, } callback(res) } } publishMessage(topic : string, message : string, qos : Int) : void { try { const bytes : ByteArray = message.toByteArray(Charsets.UTF_8) const mqttMessage : MqttMessage = new MqttMessage(bytes); mqttMessage.setQos(qos); this.mqttClient.publish(topic, mqttMessage); } catch (e) { console.log(e) // 消息发送失败 let callback = publicCallback as callbackType let res : UTSJSONObject res = { "code": 500, "message": e.message, "clientId": this.CLIENT_ID } callback(res) } } } // 回调汇总 class myMqttCallback extends MqttCallback implements MqttCallbackExtended { override connectionLost(cause : Throwable) : void { // 连接丢失的处理(不包含自己主动断开) console.log(cause) // 可以在这里手动重新连接,暂时先用自动重新连接 if (onDisconnectCallback !== null) { let callback = onDisconnectCallback as callbackType let res : UTSJSONObject res = { "code": 500, "message": cause.message } callback(res) } } override messageArrived(topic : string, message : MqttMessage) : void { // 接收到消息的处理 // 返回的topic 是以mobile/location try { // 只接收订阅返回的,如果有多余的,肯定是订阅过 // console.log('格式化之前topic', topic) let res : UTSJSONObject // 如果不是json,直接返回二进制 try { res = JSON.parse(new String(message.getPayload())) as UTSJSONObject } catch (e) { res = { data: JSONObject.toJSON(message.getPayload()) } } console.log(res, topic) const mqtt = MqttManager.getInstance(null) if (mqtt.subscribeList.includes(topic)) { let topicItem = subscribeMap[topic] as subscribeItemType let topicNo = topicItem.value if (topicNo == 1) { let callback = subscribeCallback_1 as callbackType callback(res) } else if (topicNo == 2) { let callback = subscribeCallback_2 as callbackType callback(res) } else if (topicNo == 3) { let callback = subscribeCallback_3 as callbackType callback(res) } else if (topicNo == 4) { let callback = subscribeCallback_4 as callbackType callback(res) } else { let callback = subscribeCallback_5 as callbackType callback(res) } } else { if (mqtt.manageOption.offPeriodMsg !== null && mqtt.CLEAN_SESSION == false) { let callback = mqtt.manageOption.offPeriodMsg as callbackTypeOff callback(topic, res) } } } catch (e) { console.log(e.message) } } override deliveryComplete(token : IMqttDeliveryToken) : void { // 消息发送成功的处理 try { let callback = publicCallback as callbackType let res : UTSJSONObject res = { "code": 200, "message": '消息发送成功' } callback(res) } catch (e) { console.log('消息发送成功回调catch', e.message) } } // 自动重连 override connectComplete(reconnect : boolean, serverURI : string) : void { // 重连成功后把之前的重新订阅 console.log('reconnect', reconnect, serverURI) let res : UTSJSONObject if (reconnect) { // const mqtt = MqttManager.getInstance(null) // console.log('oldSubscribeList', mqtt.subscribeList) // for (let i = 0; i < mqtt.subscribeList.length; i++) { // let topicItem = subscribeMap[mqtt.subscribeList[i].split('/').join('.')] as subscribeItemType // let qos = topicItem.qos // mqtt.mqttClient.subscribe(mqtt.subscribeList[i], qos) // } if (onReconnectCallback !== null) { let callback = onReconnectCallback as callbackType res = { "code": 200, "message": '自动重新连接成功' } callback(res) } } else { if (onReconnectCallback !== null) { let callback = onReconnectCallback as callbackType res = { "code": 500, "message": '自动重新连接失败,会持续进行重连...' } callback(res) } } } } // 对外暴露的Api @UTSJS.keepAlive export function connect(option : MqttManagerOption, callback : callbackType) { const mqtt = MqttManager.getInstance(option) // 如果已经连接 if (!mqtt.isConnected()) { connectCallback = callback if (option.keepAlive == true) { startConnectedDeviceService(option.notificationContentTitle, option.notificationContentText) } new Thread(new myRunnable()).start(); } else { let res : UTSJSONObject = { "code": 200, "message": "连接成功(301)", "clientId": mqtt.CLIENT_ID } callback(res) } } @UTSJS.keepAlive export function subscribe(topic : string, qos : Int, callback : callbackType) { if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) { console.log('请先连接') return } const mqtt = MqttManager.instance as MqttManager if (mqtt.subscribeList.includes(topic)) { // 已经订阅了,如果有其他地方需要订阅相同主题,就覆盖掉之前的回调函数 let topicItem = subscribeMap[topic] as subscribeItemType let topicNo = topicItem.value if (topicNo == 1) { subscribeCallback_1 = callback } else if (topicNo == 2) { subscribeCallback_2 = callback } else if (topicNo == 3) { subscribeCallback_3 = callback } else if (topicNo == 4) { subscribeCallback_4 = callback } else { subscribeCallback_5 = callback } return } let value : Int if (subscribeCallback_1 == null) { subscribeCallback_1 = callback value = 1 } else if (subscribeCallback_2 == null) { subscribeCallback_2 = callback value = 2 } else if (subscribeCallback_3 == null) { subscribeCallback_3 = callback value = 3 } else if (subscribeCallback_4 == null) { subscribeCallback_4 = callback value = 4 } else { subscribeCallback_5 = callback value = 5 } let temp : subscribeItemType = { value: value, qos: qos } subscribeMap[topic] = temp mqtt.subscribe(topic, qos); } @UTSJS.keepAlive export function unSubscribe(topic : string, callback : callbackType) { if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) { console.log('请先连接') return } unubscribeCallback = callback const mqtt = MqttManager.instance as MqttManager if (mqtt.subscribeList.includes(topic)) { // 确实订阅过, 去取消订阅 let temp = subscribeMap[topic] as subscribeItemType let topicNo = temp.value if (topicNo == 1) { subscribeCallback_1 = null } else if (topicNo == 2) { subscribeCallback_2 = null } else if (topicNo == 3) { subscribeCallback_3 = null } else if (topicNo == 4) { subscribeCallback_4 = null } else { subscribeCallback_5 = null } mqtt.unsubscribe(topic) } else { console.log('您没有订阅过该主题') } } @UTSJS.keepAlive export function publishMessage(topic : string, qos : Int, message : string, callback : callbackType) { if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) { console.log('请先连接') return } const mqtt = MqttManager.instance as MqttManager publicCallback = callback mqtt.publishMessage(topic, message, qos); } @UTSJS.keepAlive export function disConnect(callback : callbackType, closeClient_ : Boolean) { const mqtt = MqttManager.instance as MqttManager closeClient = closeClient_ disConnectCallback = callback // 关闭服务 closeConnectedDeviceService() mqtt.disconnectMqtt(); } @UTSJS.keepAlive export function onConnectLost(callback : callbackType) { onDisconnectCallback = callback } @UTSJS.keepAlive export function onReconnect(callback : callbackType) { onReconnectCallback = callback } export function isConnected() : Boolean { if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) { return false } else { return true } } export function getConfig() { if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) { return { code: 500, message: '请先连接' } } const mqtt = MqttManager.instance as MqttManager return { host: mqtt.HOST, clientId: mqtt.CLIENT_ID, userName: mqtt.USER_NAME, password: mqtt.PASSWAORD, cleanSession: mqtt.CLEAN_SESSION, // false 创建可持续会话,前提是clientId一直保持不变, automaticReconnect: mqtt.AUTO_MATICRECONNECT, heartBeat: mqtt.HEART_BEAT, timeOut: mqtt.TIME_OUT, subscribeList: mqtt.subscribeList } }