跳至主要內容

MQTT

HeChuangJun约 956 字大约 3 分钟

MQTT协议

1. 注意事项

  • 同一个clientId重复连接会导致连接断开
  • MQTT订阅上下线的主题为
    • $SYS/brokers/+/clients/+/connected
    • $SYS/brokers/+/clients/+/disconnected
    • $SYS/brokers/+/clients/#
    • $SYS/#

2. MQTT客户端

import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.cert.CertificateException;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublishSample {
	public static void main(String[] args) throws KeyManagementException, CertificateException, FileNotFoundException, IOException, KeyStoreException {
		String topic        = "MQTT Examples";
		String content      = "Message from MqttPublishSample";
		int qos             = 2;
		String broker       = "tcp://127.0.0.1:11883";
		String broker       = "ssl://10.110.111.251:8883";
		String clientId     = "JavaSample";
		MemoryPersistence persistence = new MemoryPersistence();

		try {
			MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
			MqttConnectOptions connOpts = new MqttConnectOptions();
			connOpts.setCleanSession(true);
			System.out.println("Connecting to broker: "+broker);
			sampleClient.connect(connOpts);
			System.out.println("Connected");
			System.out.println("Publishing message: "+content);
			MqttMessage message = new MqttMessage(content.getBytes());
			message.setQos(qos);
			sampleClient.publish(topic, message);
			System.out.println("Message published");
			sampleClient.disconnect();
			System.out.println("Disconnected");
			System.exit(0);
		} catch(MqttException me) {
			System.out.println("reason "+me.getReasonCode());
			System.out.println("msg "+me.getMessage());
			System.out.println("loc "+me.getLocalizedMessage());
			System.out.println("cause "+me.getCause());
			System.out.println("excep "+me);
			me.printStackTrace();
		} 
	}
}

3. spring整合MQTT

# 定义mqtt的配置类

package com.junye.test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component
@Data
public class ClientConfig {
	@Value("tcp://127.0.0.1:11883")
	private String broker;
	@Value("admin")
	private String user;
	@Value("public")
	private String password;
	private int qos = 2;
	private String clientId;
}

# 定义RemoteClient接口

package com.junye.test;
import org.eclipse.paho.client.mqttv3.MqttException;
public interface RemoteClient {
	void init(String broker, String user, String password) throws MqttException;
	void publish(String topic, String message, int qos) throws MqttException;
	void subscribe(String topic, OnMessageListener listener) throws MqttException;
	void unSubscribe(String topic) throws MqttException;
	void close() throws MqttException;
}

# 实现RemoteClient接口RemoteClientImpl类

package com.junye.test;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@Data
public class RemoteClientImpl implements RemoteClient {  
	private String broker;
	private String user;
	private String password;
	private int qos = 2;
	private String clientId;  
	private MqttClient client;  

	public RemoteClientImpl() {  
		this(UUID.randomUUID().toString());  
	}  

	public BasicRemoteClient(String clientId) {  
		this.clientId = clientId;  
	}  

	public void init(String broker, String user, String password) throws MqttException {  
		MemoryPersistence persistence = new MemoryPersistence();  
		client = new MqttClient(broker, clientId, persistence);  
		MqttConnectOptions options = new MqttConnectOptions();  

		options.setCleanSession(true);  
		options.setUserName(user);  
		options.setPassword(password.toCharArray());  
		client.connect(options);  
	}  

	public void init() throws MqttException {  
		MemoryPersistence persistence = new MemoryPersistence();  
		client = new MqttClient(broker, clientId, persistence);  
		MqttConnectOptions options = new MqttConnectOptions();  
		options.setCleanSession(true);  
		options.setUserName(user);  
		options.setPassword(password.toCharArray());  
		client.connect(options);  
	}  


	public void publish(String topic, String message, int qos) throws MqttException {  
		MqttMessage mqttMessage = new MqttMessage(message.getBytes());  
		mqttMessage.setQos(qos);  
		client.publish(topic, mqttMessage);  
	}  

	public void subscribe(String topic, final OnMessageListener listener) throws MqttException {  
		try {  
			client.subscribe(topic, this.qos);
			client.setCallback(new MqttCallback(){
				@Override
				public void connectionLost(Throwable cause) {
					try {
						init();
						System.out.println("重连");
					} catch (MqttException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}

				@Override
				public void messageArrived(String topic2, MqttMessage message) throws Exception {
					listener.handleMessage(topic2, message.toString());  
				}

				@Override
				public void deliveryComplete(IMqttDeliveryToken token) {
					System.out.println("成功");
					logger.info("deliveryComplete {}", token);
				}
			});
		} catch (MqttException e) {  
			throw e;  
		}  
	}  

	public void unSubscribe(String topic) throws MqttException {  
		client.unsubscribe(topic);  
	}  

	@Override  
	public void close() throws MqttException {  
		client.disconnect();  
	}  

}

# 定义监听接口OnMessageListener用于传入RemoteClient处理信息
public interface OnMessageListener {
	void handleMessage(String topic, String message);
}
# 自定义Bean注入
//自定义Basic的注入,直接当成一个普通的Bean即可,只是初始化的方法自定义了
@Component
public class BasicRemoteClientFactory implements FactoryBean<RemoteClient>{

	@Autowired
	private ClientConfig clientConfig;
	public void setClientConfig(ClientConfig clientConfig) {
		this.clientConfig = clientConfig;
	}
	@Override
	public RemoteClient getObject() throws Exception {
		BasicRemoteClient client = new BasicRemoteClient();
		if(clientConfig.getClientId()!=null){
			client.setClientId(clientConfig.getClientId());
		}
		client.init(clientConfig.getBroker(), clientConfig.getUser(), clientConfig.getPassword());
		client.setQos(clientConfig.getQos());
		return (RemoteClient) client;
	}

	@Override
	public Class<?> getObjectType() {
		return BasicRemoteClient.class;
	}

	@Override
	public boolean isSingleton() {
		return false;
	}
}
# 定义抽象类BaseMqttSubscriber用于多个Controller继承(可选)
	public abstract class BaseMqttSubscriber {
		protected Logger logger =LoggerFactory.getLogger(this.getClass());
		@Value("TEST")
		private String clientId;

		public void setClientId(String clientId) {
			this.clientId = clientId;
		}

		@Autowired
		private ClientConfig clientConfig;

		public void setClientConfig(ClientConfig clientConfig) {
			this.clientConfig = clientConfig;
		}


		public RemoteClient getMqttClient() throws Exception {
			clientConfig.setClientId(clientId);//override the clientId;
			BasicRemoteClientFactory factory=new BasicRemoteClientFactory();
			factory.setClientConfig(clientConfig);
			return factory.getObject();
		}
	}
# 定义HelloController测试
package com.junye.test;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController extends BaseMqttSubscriber{
	@RequestMapping("/hello")
	public Map<String, Object> index(String test,String test2) throws MqttException, Exception {
		Map<String , Object> result=new HashMap<String,Object>();
		result.put("test", test);
		Demo demo=new Demo("junye", "1");
		result.put("Demo", demo);
		this.getMqttClient().publish("/test", "hahah", 2);
		System.out.println("chenggong");
		return result;
	}
	@PostConstruct
	public void init() throws MqttException, Exception {
		System.out.println("qidong");
		this.getMqttClient().subscribe("/test", new OnMessageListener() {
			@Override
			public void handleMessage(String topic, String message)  {
			System.out.println("test");
			}
		});	
	}  
}
# 定义springboot入口
package com.junye.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableAutoConfiguration
public class MainApplication {
	public static void main(String[] args) {
		SpringApplication.run(MainApplication.class, args);
	}	
}

4. JavaScript连接emq

<!DOCTYPE html>
<html>
<head>
	<meta charset="UTF-8">
	<title>Insert title here</title>
</head>
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<script>
window.onload=function(){
	client = new Paho.MQTT.Client(location.hostname, Number(8083), "clientId");
	client.onConnectionLost = onConnectionLost;
	client.onMessageArrived = onMessageArrived;
	client.connect({onSuccess:onConnect});
	//,onFailure : artmisMqttOnError
	function onConnect() {
		// Once a connection has been made, make a subscription and send a message.
		alert("connect");
		console.log("onConnect");
		client.subscribe("/World");
		message = new Paho.MQTT.Message("Hello");
		message.destinationName = "/World";
		client.send(message);
	};
	function onConnectionLost(responseObject) {
		if (responseObject.errorCode !== 0){
			console.log("onConnectionLost:"+responseObject.errorMessage);
			alter("lost");
		}
	};
	function onMessageArrived(message) {
		console.log("onMessageArrived:"+message.payloadString);
		alert(message.payloadString);
		client.disconnect();
	};
}

</script>
<body>
xxx
</body>
</html>