HarmonyOS实现MQTT消息监听展示

想了解更多内容,实示请访问:

和华为官方合作共建的现MT消息监鸿蒙技术社区

https://harmonyos.51cto.com

思路

因为harmonyOS暂时没有发现现成的mqtt的js包,所以使用Java进行Mqtt消息的听展接收,使用JS去定时调用Java接收到消息并展示

首先是实示JS调用Java,JS FA(Feature Ability)调用Java PA(Particle Ability)有两种方式,现MT消息监Ability和Internal Ability,听展这里使用的服务器租用实示是第一种Ability

然后是Java端的Mqtt消息接收,使用paho的现MT消息监第三方库进行消息接收,页面启动时JS端调用Java端实现Mqtt消息接收开始,听展使用异步挂起,实示接收消息并缓存,现MT消息监随后JS端每次调用Java端拿到的听展都是网站模板最新缓存的信息

具体代码

hml页面:

<div class="container">     <div>         <text class="title">             { {  title }}         </text>     </div>     <div>         <text class="title" onclick="mqttMessage">             开始mqtt         </text>     </div>     <div>         <text class="title" onclick="stopMqtt">             停止mqtt         </text>     </div> </div> 

JS代码:

const ABILITY_TYPE_EXTERNAL = 0; const ACTION_SYNC = 0; const ACTION_MESSAGE_CODE_START_MQTT = 1001; const ACTION_MESSAGE_CODE_MQTT_MESSAGE = 1002; const BUNDLE_NAME = com.example.mqttapplication; const ABILITY_NAME = com.example.mqttapplication.PlayAbility; export const playAbility = {      startMqtt: async function() {          FeatureAbility.callAbility({              messageCode: ACTION_MESSAGE_CODE_START_MQTT,             abilityType: ABILITY_TYPE_EXTERNAL,             syncOption: ACTION_SYNC,             bundleName: BUNDLE_NAME,             abilityName: ABILITY_NAME         });     },     mqttMessage: async function(that) {          var result = await FeatureAbility.callAbility({              messageCode: ACTION_MESSAGE_CODE_MQTT_MESSAGE,             abilityType: ABILITY_TYPE_EXTERNAL,             syncOption: ACTION_SYNC,             bundleName: BUNDLE_NAME,             abilityName: ABILITY_NAME         });         var ret = JSON.parse(result);         if (ret.code == 0) {              console.info(mqtt is: + JSON.stringify(ret.abilityResult));             that.title = mqtt is: + JSON.stringify(ret.abilityResult);         } else {              console.error(mqtt error code: + JSON.stringify(ret.code));         }     } } export default {      data: {          title: "",         timer: null     },     task() {          playAbility.mqttMessage(this);     },     mqttMessage() {          this.title = "开始获取MQTT消息";         this.task()         this.timer=setInterval(this.task,200)     },     stopMqtt() {          clearInterval(this.timer)     } } //初始化Java端Mqtt消息接收 playAbility.startMqtt() 

Java端代码(接收Mqtt消息,异步)

import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.List; public class MqttThread implements Runnable {      /**地址*/     public static final String MQTT_BROKER_HOST = "tcp://xxx.xxx.xxx.xxx:1883";     /**客户端唯一标识*/     public static final String MQTT_CLIENT_ID = "client";     /**订阅标识*/     public static final String MQTT_TOPIC = "HarmonyTest";     /**客户端*/     private volatile static MqttClient mqttClient;     /**连接选项*/     private static MqttConnectOptions options;     /**消息*/     private final List<String> message;     public MqttThread(List<String> message) {          this.message = message;     }     public void run() {          try {              mqttClient = new MqttClient(MQTT_BROKER_HOST,实示 MQTT_CLIENT_ID, new MemoryPersistence());             options = new MqttConnectOptions();             options.setCleanSession(true);             options.setConnectionTimeout(20);             options.setKeepAliveInterval(20);             mqttClient.connect(options);             mqttClient.subscribe(MQTT_TOPIC);             mqttClient.setCallback(new MqttCallback() {                  @Override                 public void connectionLost(Throwable throwable) {  }                 @Override                 public void messageArrived(String s, MqttMessage mqttMessage) {                      message.clear();                     message.add(mqttMessage.toString());                     System.out.println("接收到mqtt消息:" + mqttMessage.toString());                 }                 @Override                 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  }             });         } catch (Exception e) {              e.printStackTrace();         }     } } 

Java端代码(Particle Ability)

import com.example.mqttapplication.mqtt.MqttThread; import ohos.aafwk.ability.Ability; import ohos.aafwk.content.Intent; import ohos.hiviewdfx.HiLog; import ohos.hiviewdfx.HiLogLabel; import ohos.rpc.*; import ohos.utils.zson.ZSONObject; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class PlayAbility extends Ability {      static final HiLogLabel label = new HiLogLabel(HiLog.LOG_APP, 1, "MY_TAG");     private static final int ERROR = -1;     private static final int SUCCESS = 0;     private static final int START_MQTT = 1001;     private static final int MQTT_MESSAGE = 1002;     @Override     protected void onStart(Intent intent) {          super.onStart(intent);     }     @Override     protected IRemoteObject onConnect(Intent intent) {          super.onConnect(intent);         PlayRemote remote = new PlayRemote();         return remote.asObject();     }     static class PlayRemote extends RemoteObject implements IRemoteBroker {          private List<String> message;         private Thread thread;         public PlayRemote() {              super("PlayRemote");         }         @Override         public boolean onRemoteRequest(int code, MessageParcel data, MessageParcel reply, MessageOption option) {              // 开始mqtt             else if (code == START_MQTT) {                  Map<String, Object> result = new HashMap<>();                 result.put("code", SUCCESS);                 result.put("abilityResult", "成功开始mqtt");                 try {                      message = new ArrayList<>();                     MqttThread mqttThread = new MqttThread(message);                     thread = new Thread(mqttThread);                     thread.start();                     System.out.println("mqtt启动成功");                 }                 catch (Exception e) {                      result.put("code", ERROR);                     result.put("abilityResult", "启动失败");                 }                 reply.writeString(ZSONObject.toZSONString(result));             }             // 获取mqtt消息             else if (code == MQTT_MESSAGE) {                  Map<String, Object> result = new HashMap<>();                 result.put("code", SUCCESS);                 if (message.isEmpty()) {                      result.put("abilityResult", "未接收到MQTT消息");                 }                 else {                      ZSONObject zsonObject = ZSONObject.stringToZSON(message.get(0));                     result.put("abilityResult", zsonObject.getString("message"));                 }                 reply.writeString(ZSONObject.toZSONString(result));             }             else {                  Map<String, Object> result = new HashMap<>();                 result.put("abilityError", ERROR);                 reply.writeString(ZSONObject.toZSONString(result));                 return false;             }             return true;         }         @Override         public IRemoteObject asObject() {              return this;         }     } } 

另外启动网络连接还需要往config.json里加点东西获取权限

{    ...   "module": {      ...     "reqPermissions": [       {          "name": "ohos.permission.GET_NETWORK_INFO"       },       {          "name": "ohos.permission.INTERNET"       },       {          "name": "ohos.permission.SET_NETWORK_INFO"       },       {          "name": "ohos.permission.MANAGE_WIFI_CONNECTION"       },       {          "name": "ohos.permission.SET_WIFI_INFO"       },       {          "name": "ohos.permission.GET_WIFI_INFO"       }     ]   } } 

 最后写了个python的脚本用来发送mqtt消息,很简单就一行

import paho.mqtt.publish as publish publish.single(HarmonyTest,现MT消息监 { "message":"BongShakalaka"}, hostname=xxx.xxx.xxx.xxx) 

附:mqtt消息是要有mqtt服务器的,这个就自己搭或者买吧

想了解更多内容,听展请访问:

和华为官方合作共建的鸿蒙技术社区

https://harmonyos.51cto.com

滇ICP备2023000592号-31