Java实现MQTT

依赖

1
2
3
4
5
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.SpringApplication;

public class MessagingApp {

public static void main(String[] args) {
String serviceUrl = "tcp://192.168.14.103:1883";
String userName = "test-mqtt:maxzhao";
String password = "maxzhao";
String clientId = "maxzhao_amdin";
// 创建对象
MessagingApp mqttClient = new MessagingApp();
// mqttClient类初始化
mqttClient.init(serviceUrl, userName, password, clientId);
// 订阅主题
mqttClient.subscribeTopic("/key1", 0);
// 发布消息
for (int i = 0; i < 50; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// Do nothing
}
mqttClient.publishMessage("/key1", "消息 " + i, 1);
}
// 取消订阅
mqttClient.cleanTopic("/key");
SpringApplication.run(MessagingApp.class, args);
}

/**
* MQTT连接对象 对连接进行设置
*/
private MqttClient mqttClient;


/**
* 类初始化,建立mqttClient连接
*/
public void init(String serviceUrl, String userName, String password, String clientId) {
//初始化连接对象
/**
* mqtt客户端对象
*/
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//初始化mqttClient
if (mqttConnectOptions != null) {
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
//设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(30);
mqttConnectOptions.setUserName(userName);
mqttConnectOptions.setPassword(password.toCharArray());
//设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(45);
MemoryPersistence memoryPersistence = new MemoryPersistence();
if (mqttConnectOptions != null && clientId != null) {
try {
mqttClient = new MqttClient(serviceUrl, clientId, memoryPersistence);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("mqttConnectOptions == null || clientId == null");
}
} else {
System.out.println("mqttConnectOptions == null");
}
System.out.println("MqttClient是否连接?" + mqttClient.isConnected());
if (mqttClient != null) {
//创建回调函数对象,用来接收已经订阅的消息
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {

}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Client 接收消息主题 : " + topic);
System.out.println("Client 接收消息Qos : " + message.getQos());
System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
}

@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

}
});
System.out.println("创建连接......");
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("mqttClient对象为空,连接失败...");
}
System.out.println("mqttClient是否连接?" + mqttClient.isConnected());
}

/**
* 订阅主题
*
* @param topic topic
* @param qos 消息质量
*/
public void subscribeTopic(String topic, int qos) {
if (mqttClient != null && mqttClient.isConnected() && topic != null) {
try {
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("订阅失败 mqttClient 无法连接");
}
}

/**
* 取消订阅主题
*
* @param topic topic
*/
public void cleanTopic(String topic) {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("取消订阅失败 mqttClient 无法连接");
}
}

/**
* 发布消息
*
* @param topic topic
* @param messageContent 消息内容
* @param qos 消息质量
*/
public void publishMessage(String topic, String messageContent, int qos) {
// 判断是否连接
if (mqttClient != null && mqttClient.isConnected()) {
System.out.println("发布消息.......");
System.out.println("发布消息人的clientId:" + mqttClient.getClientId());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(messageContent.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
if (mqttTopic != null) {
try {
MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
if (!publish.isComplete()) {
System.out.println("消息发布成功!");
}
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("mqttTopic == null");
}
} else {
System.out.println("mqttClient == null || mqttClient.isConnected() == false");
}
}
}

本文地址: https://github.com/maxzhao-it/blog/post/48712fed/