jsy-app/uni_modules/zy-mqtt/utssdk/app-android/index.uts

504 lines
16 KiB
Plaintext
Raw Normal View History

2024-12-19 12:34:26 +08:00
import { UTSAndroid } from "io.dcloud.uts";
import MqttClient from 'org.eclipse.paho.client.mqttv3.MqttClient';
import MqttConnectOptions from 'org.eclipse.paho.client.mqttv3.MqttConnectOptions';
import IMqttDeliveryToken from 'org.eclipse.paho.client.mqttv3.IMqttDeliveryToken';
import MqttMessage from 'org.eclipse.paho.client.mqttv3.MqttMessage';
import MqttCallback from 'org.eclipse.paho.client.mqttv3.MqttCallback';
// import Throwable from 'java.lang.Throwable'
import Throwable from 'kotlin.Throwable';
import MqttException from 'org.eclipse.paho.client.mqttv3.MqttException';
import MqttCallbackExtended from 'org.eclipse.paho.client.mqttv3.MqttCallbackExtended'
import KotlinCharArray from 'kotlin.CharArray';
import KotlinChar from 'kotlin.Char';
import CharArrayWriter from 'java.io.CharArrayWriter';
import PowerManager from 'android.os.PowerManager';
import Context from 'android.content.Context';
import { startConnectedDeviceService, closeConnectedDeviceService } from './service'
import Charsets from 'kotlin.text.Charsets'
import JSONObject from 'com.alibaba.fastjson.JSONObject'
import Thread from 'java.lang.Thread'
import Runnable from 'java.lang.Runnable';
class myRunnable implements Runnable {
constructor() {
}
override run() : void {
const mqtt = MqttManager.getInstance(null)
mqtt.connectMqtt()
}
}
type MqttManagerOption = {
host : string,
clientId : string,
userName : string,
password : string,
cleanSession ?: boolean, // false 创建可持续会话前提是clientId一直保持不变
automaticReconnect ?: boolean,
heartBeat ?: Int,
timeOut ?: Int,
keepAlive ?: boolean, // 是否保活
notificationContentText ?: string, // 通知文本
notificationContentTitle ?: string // 通知title
offPeriodMsg ?: callbackTypeOff // cleanSession 为false 的时候,重新连接会接收到断开期间的消息
}
//cleanSession 解释
//持久会话:当客户端连接到代理服务器时,如果启用了持久会话,代理服务器会记录客户端的订阅信息,即使客户端断开连接,代理服务器也会保存这些订阅信息。当客户端再次连接到代理服务器时,它会恢复之前的订阅信息,并接收它断开连接时的所有未接收的消息。
//清除会话:当客户端连接到代理服务器时,如果启用了清除会话,代理服务器不会保存客户端的订阅信息。当客户端断开连接时,代理服务器会删除与该客户端相关的所有信息。
type callbackType = (res : UTSJSONObject) => void
type callbackTypeOff = (topic : string, res : UTSJSONObject) => void
type subscribeItemType = {
value : Int,
qos : Int
}
let subscribeCallback_1 : callbackType | null = null // 订阅成功回调
let subscribeCallback_2 : callbackType | null = null // 订阅成功回调
let subscribeCallback_3 : callbackType | null = null // 订阅成功回调
let subscribeCallback_4 : callbackType | null = null // 订阅成功回调
let subscribeCallback_5 : callbackType | null = null // 订阅成功回调
let unubscribeCallback : callbackType | null = null // 取消订阅回调
let subscribeMap = {}
let connectCallback : callbackType | null = null // 连接回调,包含成功和失败
let publicCallback : callbackType | null = null // 发布消息回调,包含成功和失败
let disConnectCallback : callbackType | null = null // 主动断开连接后的回调
let onDisconnectCallback : callbackType | null = null // 监听连接中途断开的回调
let onReconnectCallback : callbackType | null = null // 监听自动重新连接的回调
let closeClient : Boolean = false // 是否关闭客户端
class MqttManager {
HOST : string; // 主机ip
CLIENT_ID : string; // 客户端id
USER_NAME : string; // 用户名
PASSWAORD : string; // 密码
CLEAN_SESSION : boolean; // 是否清除会话默认是false
AUTO_MATICRECONNECT : boolean; // 是否自动重连默认是true
HEART_BEAT : Int; // 默认60秒
TIME_OUT : Int; // 默认30秒
subscribeList : Array<string> = [] // 订阅列表
mqttClient : MqttClient; // 唯一客户端
manageOption : MqttManagerOption; // 外部传进来的条件
static instance : MqttManager | null = null; // MqttManager单例
static getInstance(option : MqttManagerOption | null) : MqttManager {
if (MqttManager.instance == null) {
var temp = option as MqttManagerOption
MqttManager.instance = new MqttManager(temp);
}
return MqttManager.instance as MqttManager;
}
constructor(option : MqttManagerOption) {
this.manageOption = option
this.HOST = option.host
this.CLIENT_ID = option.clientId
this.USER_NAME = option.userName
this.PASSWAORD = option.password
// 设置默认
this.CLEAN_SESSION = option.cleanSession == null ? false : option.cleanSession as boolean
this.AUTO_MATICRECONNECT = option.automaticReconnect == null ? true : option.automaticReconnect as boolean
this.HEART_BEAT = option.heartBeat == null ? 60 : option.heartBeat as Int
this.TIME_OUT = option.timeOut == null ? 30 : option.timeOut as Int
this.mqttClient = new MqttClient(option.host, option.clientId, null);
}
connectMqtt() : void {
let callback = connectCallback as callbackType
let res : UTSJSONObject
try {
const connOpts : MqttConnectOptions = new MqttConnectOptions();
connOpts.setCleanSession(this.CLEAN_SESSION); // 原来是true
connOpts.setAutomaticReconnect(this.AUTO_MATICRECONNECT); // 自动重连
connOpts.setKeepAliveInterval(this.HEART_BEAT) // 心跳
connOpts.setConnectionTimeout(this.TIME_OUT) // 设置连接超时
connOpts.setMaxReconnectDelay(10000) // 设置最大重新连接间隔(在无法连接时等待的最大毫秒数)
connOpts.setUserName(this.USER_NAME);
connOpts.setPassword(this.PASSWAORD.toCharArray());
this.mqttClient.connect(connOpts);
this.mqttClient.setCallback(new myMqttCallback());
res = {
"code": 200,
"message": "连接成功",
"clientId": this.CLIENT_ID
}
} catch (e) {
console.log(e)
let callback = connectCallback as callbackType
res = {
"code": 500,
"message": '连接失败,具体原因请查看控制台'
}
}
callback(res)
}
subscribe(topic : string, qos : Int) : void {
// 订阅主题
try {
this.subscribeList.push(topic)
console.log(this.subscribeList)
this.mqttClient.subscribe(topic, qos);
} catch (e) {
console.log(e.message)
}
}
unsubscribe(topic : string) : void { // unbscribeCallback
let callback = unubscribeCallback as callbackType
let res : UTSJSONObject
try {
this.mqttClient.unsubscribe(topic);
// 删除订阅
for (let i = 0; i < this.subscribeList.length; i++) {
if (this.subscribeList[i] == topic) {
this.subscribeList.splice(i, 1)
}
}
res = {
"code": 200,
"message": `取消订阅成功`,
"topic": topic
}
} catch (e) {
res = {
"code": 500,
"message": e.message,
"topic": topic
}
}
callback(res)
}
isConnected() : boolean {
return this.mqttClient.isConnected()
}
disconnectMqtt() : void {
try {
this.mqttClient.disconnect();
if (closeClient == true) {
this.mqttClient.close();
}
// MqttManager.instance = null // 清空的目的是为了可以让再次重连
let callback = disConnectCallback as callbackType
let res : UTSJSONObject
res = {
"code": 200,
"message": '断开连接成功',
}
callback(res)
} catch (e) {
let callback = disConnectCallback as callbackType
let res : UTSJSONObject
res = {
"code": 500,
"message": e.message,
}
callback(res)
}
}
publishMessage(topic : string, message : string, qos : Int) : void {
try {
const bytes : ByteArray = message.toByteArray(Charsets.UTF_8)
const mqttMessage : MqttMessage = new MqttMessage(bytes);
mqttMessage.setQos(qos);
this.mqttClient.publish(topic, mqttMessage);
} catch (e) {
console.log(e)
// 消息发送失败
let callback = publicCallback as callbackType
let res : UTSJSONObject
res = {
"code": 500,
"message": e.message,
"clientId": this.CLIENT_ID
}
callback(res)
}
}
}
// 回调汇总
class myMqttCallback extends MqttCallback implements MqttCallbackExtended {
override connectionLost(cause : Throwable) : void {
// 连接丢失的处理(不包含自己主动断开)
console.log(cause)
// 可以在这里手动重新连接,暂时先用自动重新连接
if (onDisconnectCallback !== null) {
let callback = onDisconnectCallback as callbackType
let res : UTSJSONObject
res = {
"code": 500,
"message": cause.message
}
callback(res)
}
}
override messageArrived(topic : string, message : MqttMessage) : void {
// 接收到消息的处理
// 返回的topic 是以mobile/location
try {
// 只接收订阅返回的,如果有多余的,肯定是订阅过
// console.log('格式化之前topic', topic)
let res : UTSJSONObject
// 如果不是json,直接返回二进制
try {
res = JSON.parse(new String(message.getPayload())) as UTSJSONObject
} catch (e) {
res = {
data: JSONObject.toJSON(message.getPayload())
}
}
console.log(res, topic)
const mqtt = MqttManager.getInstance(null)
if (mqtt.subscribeList.includes(topic)) {
let topicItem = subscribeMap[topic] as subscribeItemType
let topicNo = topicItem.value
if (topicNo == 1) {
let callback = subscribeCallback_1 as callbackType
callback(res)
} else if (topicNo == 2) {
let callback = subscribeCallback_2 as callbackType
callback(res)
} else if (topicNo == 3) {
let callback = subscribeCallback_3 as callbackType
callback(res)
} else if (topicNo == 4) {
let callback = subscribeCallback_4 as callbackType
callback(res)
} else {
let callback = subscribeCallback_5 as callbackType
callback(res)
}
} else {
if (mqtt.manageOption.offPeriodMsg !== null && mqtt.CLEAN_SESSION == false) {
let callback = mqtt.manageOption.offPeriodMsg as callbackTypeOff
callback(topic, res)
}
}
} catch (e) {
console.log(e.message)
}
}
override deliveryComplete(token : IMqttDeliveryToken) : void {
// 消息发送成功的处理
try {
let callback = publicCallback as callbackType
let res : UTSJSONObject
res = {
"code": 200,
"message": '消息发送成功'
}
callback(res)
} catch (e) {
console.log('消息发送成功回调catch', e.message)
}
}
// 自动重连
override connectComplete(reconnect : boolean, serverURI : string) : void {
// 重连成功后把之前的重新订阅
console.log('reconnect', reconnect, serverURI)
let res : UTSJSONObject
if (reconnect) {
// const mqtt = MqttManager.getInstance(null)
// console.log('oldSubscribeList', mqtt.subscribeList)
// for (let i = 0; i < mqtt.subscribeList.length; i++) {
// let topicItem = subscribeMap[mqtt.subscribeList[i].split('/').join('.')] as subscribeItemType
// let qos = topicItem.qos
// mqtt.mqttClient.subscribe(mqtt.subscribeList[i], qos)
// }
if (onReconnectCallback !== null) {
let callback = onReconnectCallback as callbackType
res = {
"code": 200,
"message": '自动重新连接成功'
}
callback(res)
}
} else {
if (onReconnectCallback !== null) {
let callback = onReconnectCallback as callbackType
res = {
"code": 500,
"message": '自动重新连接失败,会持续进行重连...'
}
callback(res)
}
}
}
}
// 对外暴露的Api
@UTSJS.keepAlive
export function connect(option : MqttManagerOption, callback : callbackType) {
const mqtt = MqttManager.getInstance(option)
// 如果已经连接
if (!mqtt.isConnected()) {
connectCallback = callback
if (option.keepAlive == true) {
startConnectedDeviceService(option.notificationContentTitle, option.notificationContentText)
}
new Thread(new myRunnable()).start();
} else {
let res : UTSJSONObject = {
"code": 200,
"message": "连接成功(301)",
"clientId": mqtt.CLIENT_ID
}
callback(res)
}
}
@UTSJS.keepAlive
export function subscribe(topic : string, qos : Int, callback : callbackType) {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
console.log('请先连接')
return
}
const mqtt = MqttManager.instance as MqttManager
if (mqtt.subscribeList.includes(topic)) {
// 已经订阅了,如果有其他地方需要订阅相同主题,就覆盖掉之前的回调函数
let topicItem = subscribeMap[topic] as subscribeItemType
let topicNo = topicItem.value
if (topicNo == 1) {
subscribeCallback_1 = callback
} else if (topicNo == 2) {
subscribeCallback_2 = callback
} else if (topicNo == 3) {
subscribeCallback_3 = callback
} else if (topicNo == 4) {
subscribeCallback_4 = callback
} else {
subscribeCallback_5 = callback
}
return
}
let value : Int
if (subscribeCallback_1 == null) {
subscribeCallback_1 = callback
value = 1
} else if (subscribeCallback_2 == null) {
subscribeCallback_2 = callback
value = 2
} else if (subscribeCallback_3 == null) {
subscribeCallback_3 = callback
value = 3
} else if (subscribeCallback_4 == null) {
subscribeCallback_4 = callback
value = 4
} else {
subscribeCallback_5 = callback
value = 5
}
let temp : subscribeItemType = {
value: value,
qos: qos
}
subscribeMap[topic] = temp
mqtt.subscribe(topic, qos);
}
@UTSJS.keepAlive
export function unSubscribe(topic : string, callback : callbackType) {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
console.log('请先连接')
return
}
unubscribeCallback = callback
const mqtt = MqttManager.instance as MqttManager
if (mqtt.subscribeList.includes(topic)) {
// 确实订阅过, 去取消订阅
let temp = subscribeMap[topic] as subscribeItemType
let topicNo = temp.value
if (topicNo == 1) {
subscribeCallback_1 = null
} else if (topicNo == 2) {
subscribeCallback_2 = null
} else if (topicNo == 3) {
subscribeCallback_3 = null
} else if (topicNo == 4) {
subscribeCallback_4 = null
} else {
subscribeCallback_5 = null
}
mqtt.unsubscribe(topic)
} else {
console.log('您没有订阅过该主题')
}
}
@UTSJS.keepAlive
export function publishMessage(topic : string, qos : Int, message : string, callback : callbackType) {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
console.log('请先连接')
return
}
const mqtt = MqttManager.instance as MqttManager
publicCallback = callback
mqtt.publishMessage(topic, message, qos);
}
@UTSJS.keepAlive
export function disConnect(callback : callbackType, closeClient_ : Boolean) {
const mqtt = MqttManager.instance as MqttManager
closeClient = closeClient_
disConnectCallback = callback
// 关闭服务
closeConnectedDeviceService()
mqtt.disconnectMqtt();
}
@UTSJS.keepAlive
export function onConnectLost(callback : callbackType) {
onDisconnectCallback = callback
}
@UTSJS.keepAlive
export function onReconnect(callback : callbackType) {
onReconnectCallback = callback
}
export function isConnected() : Boolean {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
return false
} else {
return true
}
}
export function getConfig() {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
return {
code: 500,
message: '请先连接'
}
}
const mqtt = MqttManager.instance as MqttManager
return {
host: mqtt.HOST,
clientId: mqtt.CLIENT_ID,
userName: mqtt.USER_NAME,
password: mqtt.PASSWAORD,
cleanSession: mqtt.CLEAN_SESSION, // false 创建可持续会话前提是clientId一直保持不变
automaticReconnect: mqtt.AUTO_MATICRECONNECT,
heartBeat: mqtt.HEART_BEAT,
timeOut: mqtt.TIME_OUT,
subscribeList: mqtt.subscribeList
}
}