配置说明
mqtt:
clientId: ${spring.application.name} # 非空,客户端id,系统会在该id的基础上追加一串随机数,防止重复
host: tcp://192.168.0.100:1883 # 非空,MQTT-服务器连接地址,支持集群,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.2.133:1885
username: gjzz # 非空,用户名
password: gjzz2020 # 非空,密码
timeout: 3000 # 连接超时时长,单位:毫秒;可空,默认值:0
keeplive: 100 # 心跳检测时间,单位:秒;可空,默认值:100
default: # 启动时默认订阅主题
topic: gjzz.device-001 # 主题,支持多个,用","分开;如果mqtt.default.handle不为空,那么将以该Bean实例作为处理方法;否则,系统会获取ReceiveInterface的Bean实例作为处理方法
handle: cn.cw.app.base.uc.app.config.ReceiveImpl # 可空,处理订阅主题的 cn.cw.common.client.mqtt.listener.ReceiveInterface 实现类,
公共组件:cn.cw.common.client.mqtt.utils.MqttUtil
订阅
参数说明:
topic: 订阅主题
以 $share/<group-name> 为前缀的共享订阅是带群组的共享订阅,每个组会收到一个;样例:$share/groupName/topic,发送的时候只需发送:topic
以 $queue/ 为前缀的共享订阅是不带群组的共享订阅。它是 $share 订阅的一种特例,相当于所有订阅者都在一个订阅组里面;样例:$queue/topic,发送的时候只需发送:topic
receiveBeanName: 处理消息的实例名称,没有将获取ReceiveInterface的Bean实例,没有将不会订阅
receiveImpl: 处理消息的实例,没有将获取ReceiveInterface的Bean实例,没有将不会订阅
qos: QOS级别, 默认值:0;0:不可靠,消息基本上仅传送一次,如果当时客户端不可用,则会丢失该消息; 1:消息应传送至少 1 次; 2:消息仅传送一次
方法:
(1) public void subscribe(String topic)
(2) public void subscribe(String topic, int qos)
(3) public void subscribe(String topic, String receiveBeanName)
(4) public void subscribe(String topic, ReceiveInterface receive)
(5) public void subscribe(String topic, String receiveBeanName, int qos)
(6) public void subscribe(String topic, ReceiveInterface receiveImpl, int qos)
取消订阅
参数说明:
topic: 主题
receiveBeanName: 如果该值不为空,那么只取消这个订阅的Bean实例
receiveImpl: 如果该值不为空,那么只取消这个订阅的Bean实例
方法:
(1) public void unsubscribe(String topic)
(2) public void unsubscribe(String topic, String receiveBeanName)
(3) public void unsubscribe(String topic, ReceiveInterface receiveImpl)
发布消息
参数说明:
topic: 主题,不需要携带 $share/<group-name>/ 和 $queue/
data: 数据
qos: QOS级别,默认值:0
方法:
(1) public void send(String topic, Object data)
(2) public void send(String topic, Object data, int qos)
消息订阅
方便客户端发送mqtt消息来订阅或取消订阅主题
"$$"开头的主题为系统主题,为MQTT系统关键字,业务开发不得使用
1,$$subscribe:
{
topic: "$$subscribe",
data: {
topic: "topic1", // 非空,通知系统要订阅的主题
handle: "handle1", // 可空,主题消息的处理方法,如果不为空,那么将以该Bean实例作为处理方法;否则,系统会获取ReceiveInterface的Bean实例作为处理方法
qos: 0 // 可空
}
}
2,$$unsubscribe:
{
topic: "$$subscribe",
data: {
topic: "topc1", // 非空,通知系统要取消订阅的主题
handle: "handle1", // 可空,主题消息的处理方法,如果不为空,那么只会取消该主题处理的这个Handle,不会取消主题订阅;否则,取消整个主题的订阅
}
}
接口
1, 发送:/api/mqtt/send
{
topic: "topic1", // 非空
data: Object // 可空
}
2, 订阅:/api/mqtt/subscribe
{
topic: "topic1", // 非空
qos: 0, // 可空
handle: "cn.cw.app.base.ppe.app.config.ReceiveImpl", // 可空,为空会获取ReceiveInterface的Bean实例作为处理方法
}
3, 取消订阅:/api/mqtt/unsubscribe
{
topic: "topic1", // 非空
handle: "cn.cw.app.base.ppe.app.config.ReceiveImpl", // 可空,为空会取消整个主题订阅,否则只是删除该订阅处理方法,其他的订阅还可以继续执行
}
代码样例
1,启动时订阅
(1) 代码方式订阅:
@Component
@Order(10)
public class BindingConfig implements CommandLineRunner {
@Autowired
MqttUtil mqttUtil;
@Override
public void run(String... args) {
ReceiveImpl listener = new ReceiveImpl(); // 创建接收消息方法
this.mqttUtil.subscribe("topic1", listener);
this.mqttUtil.subscribe("$queue/topic1", listener);
this.mqttUtil.subscribe("$share/groupName/topic1", listener);
}
}
(2) 配置方式订阅
配置application.yml
mqtt:
default: # 启动时默认订阅主题
topic: topic1, $queue/topic1, $share/groupName/topic1
handle: cn.cw.app.base.uc.app.config.ReceiveImpl
添加订阅处理方法:ReceiveImpl.java
@Component
public class ReceiveImpl implements ReceiveInterface {
@Override
public void message(String topic, String data){
System.err.println("Topic: "+ topic + " Data: " + data);
}
}
2,业务代码订阅:
订阅处理方法:ReceiveImpl.java
public class ReceiveImpl implements ReceiveInterface {
@Override
public void message(String topic, String data){
System.err.println("Topic: "+ topic + " Data: " + data);
}
}
在业务代码中订阅
private void subscribe() {
ReceiveImpl listener = new ReceiveImpl(); // 创建接收消息方法
this.mqttUtil.subscribe("topic1", listener);
this.mqttUtil.subscribe("$queue/topic1", listener);
this.mqttUtil.subscribe("$share/groupName/topic1", listener);
}
3,消息订阅:
MqttClient.send({
topic: "$$subscribe",
data: {
topic: "topic1", // 非空,通知系统要订阅的主题
handle: "handle1", // 可空,主题消息的处理方法,如果不为空,那么将以该Bean实例作为处理方法;否则,系统会获取ReceiveInterface的Bean实例作为处理方法
qos: 0 // 可空
}
});