jsy-app/uni_modules/zy-mqtt/utssdk/app-android/index.uts
2024-12-19 12:34:26 +08:00

504 lines
16 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { UTSAndroid } from "io.dcloud.uts";
import MqttClient from 'org.eclipse.paho.client.mqttv3.MqttClient';
import MqttConnectOptions from 'org.eclipse.paho.client.mqttv3.MqttConnectOptions';
import IMqttDeliveryToken from 'org.eclipse.paho.client.mqttv3.IMqttDeliveryToken';
import MqttMessage from 'org.eclipse.paho.client.mqttv3.MqttMessage';
import MqttCallback from 'org.eclipse.paho.client.mqttv3.MqttCallback';
// import Throwable from 'java.lang.Throwable'
import Throwable from 'kotlin.Throwable';
import MqttException from 'org.eclipse.paho.client.mqttv3.MqttException';
import MqttCallbackExtended from 'org.eclipse.paho.client.mqttv3.MqttCallbackExtended'
import KotlinCharArray from 'kotlin.CharArray';
import KotlinChar from 'kotlin.Char';
import CharArrayWriter from 'java.io.CharArrayWriter';
import PowerManager from 'android.os.PowerManager';
import Context from 'android.content.Context';
import { startConnectedDeviceService, closeConnectedDeviceService } from './service'
import Charsets from 'kotlin.text.Charsets'
import JSONObject from 'com.alibaba.fastjson.JSONObject'
import Thread from 'java.lang.Thread'
import Runnable from 'java.lang.Runnable';
class myRunnable implements Runnable {
constructor() {
}
override run() : void {
const mqtt = MqttManager.getInstance(null)
mqtt.connectMqtt()
}
}
type MqttManagerOption = {
host : string,
clientId : string,
userName : string,
password : string,
cleanSession ?: boolean, // false 创建可持续会话前提是clientId一直保持不变
automaticReconnect ?: boolean,
heartBeat ?: Int,
timeOut ?: Int,
keepAlive ?: boolean, // 是否保活
notificationContentText ?: string, // 通知文本
notificationContentTitle ?: string // 通知title
offPeriodMsg ?: callbackTypeOff // cleanSession 为false 的时候,重新连接会接收到断开期间的消息
}
//cleanSession 解释
//持久会话:当客户端连接到代理服务器时,如果启用了持久会话,代理服务器会记录客户端的订阅信息,即使客户端断开连接,代理服务器也会保存这些订阅信息。当客户端再次连接到代理服务器时,它会恢复之前的订阅信息,并接收它断开连接时的所有未接收的消息。
//清除会话:当客户端连接到代理服务器时,如果启用了清除会话,代理服务器不会保存客户端的订阅信息。当客户端断开连接时,代理服务器会删除与该客户端相关的所有信息。
type callbackType = (res : UTSJSONObject) => void
type callbackTypeOff = (topic : string, res : UTSJSONObject) => void
type subscribeItemType = {
value : Int,
qos : Int
}
let subscribeCallback_1 : callbackType | null = null // 订阅成功回调
let subscribeCallback_2 : callbackType | null = null // 订阅成功回调
let subscribeCallback_3 : callbackType | null = null // 订阅成功回调
let subscribeCallback_4 : callbackType | null = null // 订阅成功回调
let subscribeCallback_5 : callbackType | null = null // 订阅成功回调
let unubscribeCallback : callbackType | null = null // 取消订阅回调
let subscribeMap = {}
let connectCallback : callbackType | null = null // 连接回调,包含成功和失败
let publicCallback : callbackType | null = null // 发布消息回调,包含成功和失败
let disConnectCallback : callbackType | null = null // 主动断开连接后的回调
let onDisconnectCallback : callbackType | null = null // 监听连接中途断开的回调
let onReconnectCallback : callbackType | null = null // 监听自动重新连接的回调
let closeClient : Boolean = false // 是否关闭客户端
class MqttManager {
HOST : string; // 主机ip
CLIENT_ID : string; // 客户端id
USER_NAME : string; // 用户名
PASSWAORD : string; // 密码
CLEAN_SESSION : boolean; // 是否清除会话默认是false
AUTO_MATICRECONNECT : boolean; // 是否自动重连默认是true
HEART_BEAT : Int; // 默认60秒
TIME_OUT : Int; // 默认30秒
subscribeList : Array<string> = [] // 订阅列表
mqttClient : MqttClient; // 唯一客户端
manageOption : MqttManagerOption; // 外部传进来的条件
static instance : MqttManager | null = null; // MqttManager单例
static getInstance(option : MqttManagerOption | null) : MqttManager {
if (MqttManager.instance == null) {
var temp = option as MqttManagerOption
MqttManager.instance = new MqttManager(temp);
}
return MqttManager.instance as MqttManager;
}
constructor(option : MqttManagerOption) {
this.manageOption = option
this.HOST = option.host
this.CLIENT_ID = option.clientId
this.USER_NAME = option.userName
this.PASSWAORD = option.password
// 设置默认
this.CLEAN_SESSION = option.cleanSession == null ? false : option.cleanSession as boolean
this.AUTO_MATICRECONNECT = option.automaticReconnect == null ? true : option.automaticReconnect as boolean
this.HEART_BEAT = option.heartBeat == null ? 60 : option.heartBeat as Int
this.TIME_OUT = option.timeOut == null ? 30 : option.timeOut as Int
this.mqttClient = new MqttClient(option.host, option.clientId, null);
}
connectMqtt() : void {
let callback = connectCallback as callbackType
let res : UTSJSONObject
try {
const connOpts : MqttConnectOptions = new MqttConnectOptions();
connOpts.setCleanSession(this.CLEAN_SESSION); // 原来是true
connOpts.setAutomaticReconnect(this.AUTO_MATICRECONNECT); // 自动重连
connOpts.setKeepAliveInterval(this.HEART_BEAT) // 心跳
connOpts.setConnectionTimeout(this.TIME_OUT) // 设置连接超时
connOpts.setMaxReconnectDelay(10000) // 设置最大重新连接间隔(在无法连接时等待的最大毫秒数)
connOpts.setUserName(this.USER_NAME);
connOpts.setPassword(this.PASSWAORD.toCharArray());
this.mqttClient.connect(connOpts);
this.mqttClient.setCallback(new myMqttCallback());
res = {
"code": 200,
"message": "连接成功",
"clientId": this.CLIENT_ID
}
} catch (e) {
console.log(e)
let callback = connectCallback as callbackType
res = {
"code": 500,
"message": '连接失败,具体原因请查看控制台'
}
}
callback(res)
}
subscribe(topic : string, qos : Int) : void {
// 订阅主题
try {
this.subscribeList.push(topic)
console.log(this.subscribeList)
this.mqttClient.subscribe(topic, qos);
} catch (e) {
console.log(e.message)
}
}
unsubscribe(topic : string) : void { // unbscribeCallback
let callback = unubscribeCallback as callbackType
let res : UTSJSONObject
try {
this.mqttClient.unsubscribe(topic);
// 删除订阅
for (let i = 0; i < this.subscribeList.length; i++) {
if (this.subscribeList[i] == topic) {
this.subscribeList.splice(i, 1)
}
}
res = {
"code": 200,
"message": `取消订阅成功`,
"topic": topic
}
} catch (e) {
res = {
"code": 500,
"message": e.message,
"topic": topic
}
}
callback(res)
}
isConnected() : boolean {
return this.mqttClient.isConnected()
}
disconnectMqtt() : void {
try {
this.mqttClient.disconnect();
if (closeClient == true) {
this.mqttClient.close();
}
// MqttManager.instance = null // 清空的目的是为了可以让再次重连
let callback = disConnectCallback as callbackType
let res : UTSJSONObject
res = {
"code": 200,
"message": '断开连接成功',
}
callback(res)
} catch (e) {
let callback = disConnectCallback as callbackType
let res : UTSJSONObject
res = {
"code": 500,
"message": e.message,
}
callback(res)
}
}
publishMessage(topic : string, message : string, qos : Int) : void {
try {
const bytes : ByteArray = message.toByteArray(Charsets.UTF_8)
const mqttMessage : MqttMessage = new MqttMessage(bytes);
mqttMessage.setQos(qos);
this.mqttClient.publish(topic, mqttMessage);
} catch (e) {
console.log(e)
// 消息发送失败
let callback = publicCallback as callbackType
let res : UTSJSONObject
res = {
"code": 500,
"message": e.message,
"clientId": this.CLIENT_ID
}
callback(res)
}
}
}
// 回调汇总
class myMqttCallback extends MqttCallback implements MqttCallbackExtended {
override connectionLost(cause : Throwable) : void {
// 连接丢失的处理(不包含自己主动断开)
console.log(cause)
// 可以在这里手动重新连接,暂时先用自动重新连接
if (onDisconnectCallback !== null) {
let callback = onDisconnectCallback as callbackType
let res : UTSJSONObject
res = {
"code": 500,
"message": cause.message
}
callback(res)
}
}
override messageArrived(topic : string, message : MqttMessage) : void {
// 接收到消息的处理
// 返回的topic 是以mobile/location
try {
// 只接收订阅返回的,如果有多余的,肯定是订阅过
// console.log('格式化之前topic', topic)
let res : UTSJSONObject
// 如果不是json,直接返回二进制
try {
res = JSON.parse(new String(message.getPayload())) as UTSJSONObject
} catch (e) {
res = {
data: JSONObject.toJSON(message.getPayload())
}
}
console.log(res, topic)
const mqtt = MqttManager.getInstance(null)
if (mqtt.subscribeList.includes(topic)) {
let topicItem = subscribeMap[topic] as subscribeItemType
let topicNo = topicItem.value
if (topicNo == 1) {
let callback = subscribeCallback_1 as callbackType
callback(res)
} else if (topicNo == 2) {
let callback = subscribeCallback_2 as callbackType
callback(res)
} else if (topicNo == 3) {
let callback = subscribeCallback_3 as callbackType
callback(res)
} else if (topicNo == 4) {
let callback = subscribeCallback_4 as callbackType
callback(res)
} else {
let callback = subscribeCallback_5 as callbackType
callback(res)
}
} else {
if (mqtt.manageOption.offPeriodMsg !== null && mqtt.CLEAN_SESSION == false) {
let callback = mqtt.manageOption.offPeriodMsg as callbackTypeOff
callback(topic, res)
}
}
} catch (e) {
console.log(e.message)
}
}
override deliveryComplete(token : IMqttDeliveryToken) : void {
// 消息发送成功的处理
try {
let callback = publicCallback as callbackType
let res : UTSJSONObject
res = {
"code": 200,
"message": '消息发送成功'
}
callback(res)
} catch (e) {
console.log('消息发送成功回调catch', e.message)
}
}
// 自动重连
override connectComplete(reconnect : boolean, serverURI : string) : void {
// 重连成功后把之前的重新订阅
console.log('reconnect', reconnect, serverURI)
let res : UTSJSONObject
if (reconnect) {
// const mqtt = MqttManager.getInstance(null)
// console.log('oldSubscribeList', mqtt.subscribeList)
// for (let i = 0; i < mqtt.subscribeList.length; i++) {
// let topicItem = subscribeMap[mqtt.subscribeList[i].split('/').join('.')] as subscribeItemType
// let qos = topicItem.qos
// mqtt.mqttClient.subscribe(mqtt.subscribeList[i], qos)
// }
if (onReconnectCallback !== null) {
let callback = onReconnectCallback as callbackType
res = {
"code": 200,
"message": '自动重新连接成功'
}
callback(res)
}
} else {
if (onReconnectCallback !== null) {
let callback = onReconnectCallback as callbackType
res = {
"code": 500,
"message": '自动重新连接失败,会持续进行重连...'
}
callback(res)
}
}
}
}
// 对外暴露的Api
@UTSJS.keepAlive
export function connect(option : MqttManagerOption, callback : callbackType) {
const mqtt = MqttManager.getInstance(option)
// 如果已经连接
if (!mqtt.isConnected()) {
connectCallback = callback
if (option.keepAlive == true) {
startConnectedDeviceService(option.notificationContentTitle, option.notificationContentText)
}
new Thread(new myRunnable()).start();
} else {
let res : UTSJSONObject = {
"code": 200,
"message": "连接成功(301)",
"clientId": mqtt.CLIENT_ID
}
callback(res)
}
}
@UTSJS.keepAlive
export function subscribe(topic : string, qos : Int, callback : callbackType) {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
console.log('请先连接')
return
}
const mqtt = MqttManager.instance as MqttManager
if (mqtt.subscribeList.includes(topic)) {
// 已经订阅了,如果有其他地方需要订阅相同主题,就覆盖掉之前的回调函数
let topicItem = subscribeMap[topic] as subscribeItemType
let topicNo = topicItem.value
if (topicNo == 1) {
subscribeCallback_1 = callback
} else if (topicNo == 2) {
subscribeCallback_2 = callback
} else if (topicNo == 3) {
subscribeCallback_3 = callback
} else if (topicNo == 4) {
subscribeCallback_4 = callback
} else {
subscribeCallback_5 = callback
}
return
}
let value : Int
if (subscribeCallback_1 == null) {
subscribeCallback_1 = callback
value = 1
} else if (subscribeCallback_2 == null) {
subscribeCallback_2 = callback
value = 2
} else if (subscribeCallback_3 == null) {
subscribeCallback_3 = callback
value = 3
} else if (subscribeCallback_4 == null) {
subscribeCallback_4 = callback
value = 4
} else {
subscribeCallback_5 = callback
value = 5
}
let temp : subscribeItemType = {
value: value,
qos: qos
}
subscribeMap[topic] = temp
mqtt.subscribe(topic, qos);
}
@UTSJS.keepAlive
export function unSubscribe(topic : string, callback : callbackType) {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
console.log('请先连接')
return
}
unubscribeCallback = callback
const mqtt = MqttManager.instance as MqttManager
if (mqtt.subscribeList.includes(topic)) {
// 确实订阅过, 去取消订阅
let temp = subscribeMap[topic] as subscribeItemType
let topicNo = temp.value
if (topicNo == 1) {
subscribeCallback_1 = null
} else if (topicNo == 2) {
subscribeCallback_2 = null
} else if (topicNo == 3) {
subscribeCallback_3 = null
} else if (topicNo == 4) {
subscribeCallback_4 = null
} else {
subscribeCallback_5 = null
}
mqtt.unsubscribe(topic)
} else {
console.log('您没有订阅过该主题')
}
}
@UTSJS.keepAlive
export function publishMessage(topic : string, qos : Int, message : string, callback : callbackType) {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
console.log('请先连接')
return
}
const mqtt = MqttManager.instance as MqttManager
publicCallback = callback
mqtt.publishMessage(topic, message, qos);
}
@UTSJS.keepAlive
export function disConnect(callback : callbackType, closeClient_ : Boolean) {
const mqtt = MqttManager.instance as MqttManager
closeClient = closeClient_
disConnectCallback = callback
// 关闭服务
closeConnectedDeviceService()
mqtt.disconnectMqtt();
}
@UTSJS.keepAlive
export function onConnectLost(callback : callbackType) {
onDisconnectCallback = callback
}
@UTSJS.keepAlive
export function onReconnect(callback : callbackType) {
onReconnectCallback = callback
}
export function isConnected() : Boolean {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
return false
} else {
return true
}
}
export function getConfig() {
if (MqttManager.instance == null || MqttManager.instance!.isConnected() == false) {
return {
code: 500,
message: '请先连接'
}
}
const mqtt = MqttManager.instance as MqttManager
return {
host: mqtt.HOST,
clientId: mqtt.CLIENT_ID,
userName: mqtt.USER_NAME,
password: mqtt.PASSWAORD,
cleanSession: mqtt.CLEAN_SESSION, // false 创建可持续会话前提是clientId一直保持不变
automaticReconnect: mqtt.AUTO_MATICRECONNECT,
heartBeat: mqtt.HEART_BEAT,
timeOut: mqtt.TIME_OUT,
subscribeList: mqtt.subscribeList
}
}