
| import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.SpringApplication;
public class MessagingApp {
public static void main(String[] args) { String serviceUrl = "tcp://192.168.14.103:1883"; String userName = "test-mqtt:maxzhao"; String password = "maxzhao"; String clientId = "maxzhao_amdin"; MessagingApp mqttClient = new MessagingApp(); mqttClient.init(serviceUrl, userName, password, clientId); mqttClient.subscribeTopic("/key1", 0); for (int i = 0; i < 50; i++) { try { Thread.sleep(10); } catch (InterruptedException e) { } mqttClient.publishMessage("/key1", "消息 " + i, 1); } mqttClient.cleanTopic("/key"); SpringApplication.run(MessagingApp.class, args); }
private MqttClient mqttClient;
public void init(String serviceUrl, String userName, String password, String clientId) {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); if (mqttConnectOptions != null) { mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(30); mqttConnectOptions.setUserName(userName); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setKeepAliveInterval(45); MemoryPersistence memoryPersistence = new MemoryPersistence(); if (mqttConnectOptions != null && clientId != null) { try { mqttClient = new MqttClient(serviceUrl, clientId, memoryPersistence); } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("mqttConnectOptions == null || clientId == null"); } } else { System.out.println("mqttConnectOptions == null"); } System.out.println("MqttClient是否连接?" + mqttClient.isConnected()); if (mqttClient != null) { mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) {
}
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client 接收消息主题 : " + topic); System.out.println("Client 接收消息Qos : " + message.getQos()); System.out.println("Client 接收消息内容 : " + new String(message.getPayload())); }
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
} }); System.out.println("创建连接......"); try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("mqttClient对象为空,连接失败..."); } System.out.println("mqttClient是否连接?" + mqttClient.isConnected()); }
public void subscribeTopic(String topic, int qos) { if (mqttClient != null && mqttClient.isConnected() && topic != null) { try { mqttClient.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("订阅失败 mqttClient 无法连接"); } }
public void cleanTopic(String topic) { if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("取消订阅失败 mqttClient 无法连接"); } }
public void publishMessage(String topic, String messageContent, int qos) { if (mqttClient != null && mqttClient.isConnected()) { System.out.println("发布消息......."); System.out.println("发布消息人的clientId:" + mqttClient.getClientId()); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(messageContent.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); if (mqttTopic != null) { try { MqttDeliveryToken publish = mqttTopic.publish(mqttMessage); if (!publish.isComplete()) { System.out.println("消息发布成功!"); } } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("mqttTopic == null"); } } else { System.out.println("mqttClient == null || mqttClient.isConnected() == false"); } } }
|