MQTT
约 956 字大约 3 分钟
MQTT协议
1. 注意事项
- 同一个clientId重复连接会导致连接断开
- MQTT订阅上下线的主题为
- $SYS/brokers/+/clients/+/connected
- $SYS/brokers/+/clients/+/disconnected
- $SYS/brokers/+/clients/#
- $SYS/#
2. MQTT客户端
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.cert.CertificateException;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttPublishSample {
public static void main(String[] args) throws KeyManagementException, CertificateException, FileNotFoundException, IOException, KeyStoreException {
String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://127.0.0.1:11883";
String broker = "ssl://10.110.111.251:8883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: "+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}
3. spring整合MQTT
# 定义mqtt的配置类
package com.junye.test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
@Data
public class ClientConfig {
@Value("tcp://127.0.0.1:11883")
private String broker;
@Value("admin")
private String user;
@Value("public")
private String password;
private int qos = 2;
private String clientId;
}
# 定义RemoteClient接口
package com.junye.test;
import org.eclipse.paho.client.mqttv3.MqttException;
public interface RemoteClient {
void init(String broker, String user, String password) throws MqttException;
void publish(String topic, String message, int qos) throws MqttException;
void subscribe(String topic, OnMessageListener listener) throws MqttException;
void unSubscribe(String topic) throws MqttException;
void close() throws MqttException;
}
# 实现RemoteClient接口RemoteClientImpl类
package com.junye.test;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@Data
public class RemoteClientImpl implements RemoteClient {
private String broker;
private String user;
private String password;
private int qos = 2;
private String clientId;
private MqttClient client;
public RemoteClientImpl() {
this(UUID.randomUUID().toString());
}
public BasicRemoteClient(String clientId) {
this.clientId = clientId;
}
public void init(String broker, String user, String password) throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(user);
options.setPassword(password.toCharArray());
client.connect(options);
}
public void init() throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(user);
options.setPassword(password.toCharArray());
client.connect(options);
}
public void publish(String topic, String message, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
client.publish(topic, mqttMessage);
}
public void subscribe(String topic, final OnMessageListener listener) throws MqttException {
try {
client.subscribe(topic, this.qos);
client.setCallback(new MqttCallback(){
@Override
public void connectionLost(Throwable cause) {
try {
init();
System.out.println("重连");
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void messageArrived(String topic2, MqttMessage message) throws Exception {
listener.handleMessage(topic2, message.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("成功");
logger.info("deliveryComplete {}", token);
}
});
} catch (MqttException e) {
throw e;
}
}
public void unSubscribe(String topic) throws MqttException {
client.unsubscribe(topic);
}
@Override
public void close() throws MqttException {
client.disconnect();
}
}
# 定义监听接口OnMessageListener用于传入RemoteClient处理信息
public interface OnMessageListener {
void handleMessage(String topic, String message);
}
# 自定义Bean注入
//自定义Basic的注入,直接当成一个普通的Bean即可,只是初始化的方法自定义了
@Component
public class BasicRemoteClientFactory implements FactoryBean<RemoteClient>{
@Autowired
private ClientConfig clientConfig;
public void setClientConfig(ClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
@Override
public RemoteClient getObject() throws Exception {
BasicRemoteClient client = new BasicRemoteClient();
if(clientConfig.getClientId()!=null){
client.setClientId(clientConfig.getClientId());
}
client.init(clientConfig.getBroker(), clientConfig.getUser(), clientConfig.getPassword());
client.setQos(clientConfig.getQos());
return (RemoteClient) client;
}
@Override
public Class<?> getObjectType() {
return BasicRemoteClient.class;
}
@Override
public boolean isSingleton() {
return false;
}
}
# 定义抽象类BaseMqttSubscriber用于多个Controller继承(可选)
public abstract class BaseMqttSubscriber {
protected Logger logger =LoggerFactory.getLogger(this.getClass());
@Value("TEST")
private String clientId;
public void setClientId(String clientId) {
this.clientId = clientId;
}
@Autowired
private ClientConfig clientConfig;
public void setClientConfig(ClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
public RemoteClient getMqttClient() throws Exception {
clientConfig.setClientId(clientId);//override the clientId;
BasicRemoteClientFactory factory=new BasicRemoteClientFactory();
factory.setClientConfig(clientConfig);
return factory.getObject();
}
}
# 定义HelloController测试
package com.junye.test;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController extends BaseMqttSubscriber{
@RequestMapping("/hello")
public Map<String, Object> index(String test,String test2) throws MqttException, Exception {
Map<String , Object> result=new HashMap<String,Object>();
result.put("test", test);
Demo demo=new Demo("junye", "1");
result.put("Demo", demo);
this.getMqttClient().publish("/test", "hahah", 2);
System.out.println("chenggong");
return result;
}
@PostConstruct
public void init() throws MqttException, Exception {
System.out.println("qidong");
this.getMqttClient().subscribe("/test", new OnMessageListener() {
@Override
public void handleMessage(String topic, String message) {
System.out.println("test");
}
});
}
}
# 定义springboot入口
package com.junye.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableAutoConfiguration
public class MainApplication {
public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}
4. JavaScript连接emq
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<script>
window.onload=function(){
client = new Paho.MQTT.Client(location.hostname, Number(8083), "clientId");
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
client.connect({onSuccess:onConnect});
//,onFailure : artmisMqttOnError
function onConnect() {
// Once a connection has been made, make a subscription and send a message.
alert("connect");
console.log("onConnect");
client.subscribe("/World");
message = new Paho.MQTT.Message("Hello");
message.destinationName = "/World";
client.send(message);
};
function onConnectionLost(responseObject) {
if (responseObject.errorCode !== 0){
console.log("onConnectionLost:"+responseObject.errorMessage);
alter("lost");
}
};
function onMessageArrived(message) {
console.log("onMessageArrived:"+message.payloadString);
alert(message.payloadString);
client.disconnect();
};
}
</script>
<body>
xxx
</body>
</html>