1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
| import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.SpringApplication;
public class MessagingApp {
public static void main(String[] args) { String serviceUrl = "tcp://192.168.14.103:1883"; String userName = "test-mqtt:maxzhao"; String password = "maxzhao"; String clientId = "maxzhao_amdin"; MessagingApp mqttClient = new MessagingApp(); mqttClient.init(serviceUrl, userName, password, clientId); mqttClient.subscribeTopic("/key1", 0); for (int i = 0; i < 50; i++) { try { Thread.sleep(10); } catch (InterruptedException e) { } mqttClient.publishMessage("/key1", "消息 " + i, 1); } mqttClient.cleanTopic("/key"); SpringApplication.run(MessagingApp.class, args); }
private MqttClient mqttClient;
public void init(String serviceUrl, String userName, String password, String clientId) {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); if (mqttConnectOptions != null) { mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(30); mqttConnectOptions.setUserName(userName); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setKeepAliveInterval(45); MemoryPersistence memoryPersistence = new MemoryPersistence(); if (mqttConnectOptions != null && clientId != null) { try { mqttClient = new MqttClient(serviceUrl, clientId, memoryPersistence); } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("mqttConnectOptions == null || clientId == null"); } } else { System.out.println("mqttConnectOptions == null"); } System.out.println("MqttClient是否连接?" + mqttClient.isConnected()); if (mqttClient != null) { mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) {
}
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client 接收消息主题 : " + topic); System.out.println("Client 接收消息Qos : " + message.getQos()); System.out.println("Client 接收消息内容 : " + new String(message.getPayload())); }
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
} }); System.out.println("创建连接......"); try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("mqttClient对象为空,连接失败..."); } System.out.println("mqttClient是否连接?" + mqttClient.isConnected()); }
public void subscribeTopic(String topic, int qos) { if (mqttClient != null && mqttClient.isConnected() && topic != null) { try { mqttClient.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("订阅失败 mqttClient 无法连接"); } }
public void cleanTopic(String topic) { if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("取消订阅失败 mqttClient 无法连接"); } }
public void publishMessage(String topic, String messageContent, int qos) { if (mqttClient != null && mqttClient.isConnected()) { System.out.println("发布消息......."); System.out.println("发布消息人的clientId:" + mqttClient.getClientId()); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(messageContent.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); if (mqttTopic != null) { try { MqttDeliveryToken publish = mqttTopic.publish(mqttMessage); if (!publish.isComplete()) { System.out.println("消息发布成功!"); } } catch (MqttException e) { e.printStackTrace(); } } else { System.out.println("mqttTopic == null"); } } else { System.out.println("mqttClient == null || mqttClient.isConnected() == false"); } } }
|