java使用MQTT发送数据
1.建立Maven项目
点击文件->新建->项目
2.配置pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>send</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.10</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <!-- <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency>--> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.25</version> </dependency> </dependencies> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> </project>
3.(方法一)MqttClient.java文件
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.*; @Slf4j public class MqttClient { public static org.eclipse.paho.client.mqttv3.MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; private static MqttClient instance = null; public static MqttClient getInstance() throws Exception { if (instance == null) { synchronized (MqttClient.class) { if (instance == null) { instance = new MqttClient(); } } } return instance; } public MqttClient(){ init("mqttx_6b238cf4"); } public void init(String clientId) { //初始化连接设置对象 mqttConnectOptions = new MqttConnectOptions(); //初始化MqttClient if(null != mqttConnectOptions) { // true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态 mqttConnectOptions.setCleanSession(true); // 设置连接超时 mqttConnectOptions.setConnectionTimeout(30); // 设置持久化方式 memoryPersistence = new MemoryPersistence(); if(null != memoryPersistence && null != clientId) { try { mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient("tcp://123.57.159.13:1883", clientId,memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { } }else { log.error("mqttConnectOptions对象为空"); } //设置连接和回调 if(null != mqttClient) { if(!mqttClient.isConnected()) { try { log.info("创建连接:" + mqttClient.isConnected()); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { log.error("mqttClient为空"); } } // 关闭连接 public void closeConnect() { //关闭存储方式 if(null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("memoryPersistence is null"); } // 关闭连接 if(null != mqttClient) { if(mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("mqttClient is not connect"); } }else { log.error("mqttClient is null"); } } // 发布消息 public void publishMessage(String pubTopic,String message,int qos) { if(null != mqttClient&& mqttClient.isConnected()) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(message.getBytes()); MqttTopic topic = mqttClient.getTopic(pubTopic); if(null != topic) { try { MqttDeliveryToken publish = topic.publish(mqttMessage); if(!publish.isComplete()) { log.info("消息发布成功"); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { reConnect(); } } // 重新连接 public void reConnect() { if(null != mqttClient) { if(!mqttClient.isConnected()) { if(null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("mqttConnectOptions is null"); } }else { log.error("mqttClient is null or connect"); } }else { init("admin"); } } // 订阅主题 public void subTopic(String topic) { if(null != mqttClient&& mqttClient.isConnected()) { try { mqttClient.subscribe(topic, 1); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("mqttClient is error"); } } // 清空主题 public void cleanTopic(String topic) { if(null != mqttClient&& !mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("mqttClient is error"); } } public static void main(String [] args){ MqttClient mqttClient = new MqttClient(); for (int i=1;i<101;i++){ mqttClient.publishMessage("tztdatas", "{\n" + "\t\"InsChnName\": \"1-1-1\",\n" + "\t\"SampleDatas\": [{\n" + "\t\t\"SampleTime\": \"2022-06-13T10:25:41\",\n" + "\t\t\"SampleValue\": -152.588\n" + "\t}, {\n" + "\t\t\"SampleTime\": \"2022-06-13T10:26:05\",\n" + "\t\t\"SampleValue\": 152.588\n" + "\t}]\n" + "\t}", 1); try { Thread.sleep(1000); // 休眠1秒 } catch (Exception e) { } } } }
4.(方法二)MqttClient.java文件
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.*; @Slf4j public class MqttClient { public static org.eclipse.paho.client.mqttv3.MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; private static MqttClient instance = null; public static MqttClient getInstance() throws Exception { if (instance == null) { synchronized (MqttClient.class) { if (instance == null) { instance = new MqttClient(); } } } return instance; } public MqttClient(){ init("mqttx_6b238cf4"); } public void init(String clientId) { //初始化连接设置对象 mqttConnectOptions = new MqttConnectOptions(); //初始化MqttClient if(null != mqttConnectOptions) { // true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态 mqttConnectOptions.setCleanSession(true); // 设置连接超时 mqttConnectOptions.setConnectionTimeout(30); // 设置持久化方式 memoryPersistence = new MemoryPersistence(); if(null != memoryPersistence && null != clientId) { try { mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient("tcp://192.168.0.112:1883", clientId,memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { } }else { log.error("mqttConnectOptions对象为空"); } //设置连接和回调 if(null != mqttClient) { if(!mqttClient.isConnected()) { try { log.info("创建连接:" + mqttClient.isConnected()); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { log.error("mqttClient为空"); } } // 关闭连接 public void closeConnect() { //关闭存储方式 if(null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("memoryPersistence is null"); } // 关闭连接 if(null != mqttClient) { if(mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("mqttClient is not connect"); } }else { log.error("mqttClient is null"); } } // 发布消息 public void publishMessage(String pubTopic,String message,int qos) { if(null != mqttClient&& mqttClient.isConnected()) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(message.getBytes()); MqttTopic topic = mqttClient.getTopic(pubTopic); if(null != topic) { try { MqttDeliveryToken publish = topic.publish(mqttMessage); if(!publish.isComplete()) { log.info("消息发布成功"); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { reConnect(); } } // 重新连接 public void reConnect() { if(null != mqttClient) { if(!mqttClient.isConnected()) { if(null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("mqttConnectOptions is null"); } }else { log.error("mqttClient is null or connect"); } }else { init("admin"); } } // 订阅主题 public void subTopic(String topic) { if(null != mqttClient&& mqttClient.isConnected()) { try { mqttClient.subscribe(topic, 1); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("mqttClient is error"); } } // 清空主题 public void cleanTopic(String topic) { if(null != mqttClient&& !mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { log.error("mqttClient is error"); } } public static void main(String [] args){ MqttClient mqttClient = new MqttClient(); for (int i=0;i<50;i++){ mqttClient.publishMessage("tztdatas", "{\n" + "\t\"InsChnName\": \"1-1-1\",\n" + "\t\"SampleDatas\": [{\n" + "\t\t\"SampleTime\": \"2022-06-13T10:25:41\",\n" + "\t\t\"SampleValue\": 152.588\n" + "\t}, {\n" + "\t\t\"SampleTime\": \"2022-06-13T10:26:05\",\n" + "\t\t\"SampleValue\": 152.588\n" + "\t}]\n" + "\t}", 1); try { Thread.sleep(1000); // 休眠1秒 } catch (Exception e) { } } } }
5.休眠函数
(1)sleep()使当前线程进入停滞状态(阻塞当前线程),让出CPU的使用、目的是不让当前线程独自霸占该进程所获的CPU资源,以留一定时间给其他线程执行的机会。
(2)代码
import java.util.*; public class SleepDemo { public static void main(String args[]) { try { System.out.println(new Date( ) + "\n"); Thread.sleep(1000*3); // 休眠3秒 System.out.println(new Date( ) + "\n"); } catch (Exception e) { System.out.println("Got an exception!"); } } }
6.进入EMQX,可以看到客户端是否连接,进入方式:ip:端口号
7.MQTTX 测试
(1)MQTTX下载网址:MQTT X:跨平台 MQTT 5.0 桌面客户端工具
(2)MQTT可以测试是否发送成功。