SpringBoot - 集成MQTT教程2(订阅消息)
作者:hangge | 2019-12-01 08:10
在之前的文章中我介绍了 SpringBoot 项目如何实现 MQTT 消息推送功能,本文接着介绍消息的订阅处理。其中 pom.xml 文件的依赖配置、以及 application.properties 文件的 MQTT 服务器配置同前文一样(点击查看),这里就不重复说明了。

(2)然后我们就可以在代码中通过这个 adapter 来添加或者删除 Topic:
三、实现 MQTT 消息的订阅
1,MqttReceiverConfig.java(MQTT 消息订阅配置类)
/**
* MQTT配置,消费者
*/
@Configuration
public class MqttReceiverConfig {
/**
* 订阅的bean名称
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.receiver.clientId}")
private String clientId;
@Value("${mqtt.receiver.defaultTopic}")
private String defaultTopic;
/**
* MQTT连接器选项
*/
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions(){
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接的用户名
if(!username.trim().equals("")){
options.setUserName(username);
}
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置连接的地址
options.setServerURIs(StringUtils.split(url, ","));
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
return options;
}
/**
* MQTT客户端
*/
@Bean
public MqttPahoClientFactory receiverMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(消费者)
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息订阅绑定(消费者)
*/
@Bean
public MessageProducer inbound() {
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId, receiverMqttClientFactory(),
StringUtils.split(defaultTopic, ","));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
System.out.println("\n--------------------START-------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
"\n---------------------END--------------------");
}
};
}
}
2,测试运行
(1)项目启动后,我们使用 MQTTBox 对“hangge”这个主题,发送一条消息:

(2)可以看到 SprinBoot 项目这边成功接收到消息并打印出来:
附:动态设置监听的 Topic
(1)上面的样例中我们代码在初始化时就配置好监听主题。如果希望在运行过程中能够动态地新增或者删除订阅的 Topic,可以对 MQTT 消息订阅配置类稍作修改,将MqttPahoMessageDrivenChannelAdapter 对象定义成一个全局变量:
/**
* MQTT配置,消费者
*/
@Configuration
public class MqttReceiverConfig {
/**
* 订阅的bean名称
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
public MqttPahoMessageDrivenChannelAdapter adapter;
// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.receiver.clientId}")
private String clientId;
@Value("${mqtt.receiver.defaultTopic}")
private String defaultTopic;
@Autowired
private ReceiveDataController receiveDataController;
/**
* MQTT连接器选项
*/
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions(){
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接的用户名
if(!username.trim().equals("")){
options.setUserName(username);
}
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置连接的地址
options.setServerURIs(url.split(","));
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
return options;
}
/**
* MQTT客户端
*/
@Bean
public MqttPahoClientFactory receiverMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(消费者)
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息订阅绑定(消费者)
*/
@Bean
public MessageProducer inbound() {
// 可以同时消费(订阅)多个Topic
adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId, receiverMqttClientFactory(),
defaultTopic.split(","));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
/**System.out.println("\n--------------------START-------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
"\n---------------------END--------------------");
**/
EventBean event = JSON.parseObject(msg, EventBean.class);
receiveDataController.receive(event);
}
};
}
}
(2)然后我们就可以在代码中通过这个 adapter 来添加或者删除 Topic:
@RestController
public class TestController {
@Autowired
MqttReceiverConfig mqttReceiverConfig;
@GetMapping("/test")
public void test() {
// 添加一个或多个监听Topic
mqttReceiverConfig.adapter.addTopic("topic1"); // 默认qos为1
mqttReceiverConfig.adapter.addTopic("topic2", 1);
mqttReceiverConfig.adapter.addTopic("topic3", "topic4");
mqttReceiverConfig.adapter.addTopics(new String[]{"topic5", "topic6"},new int[]{1, 1});
// 删除一个或多个监听Topic
mqttReceiverConfig.adapter.removeTopic("topic1");
mqttReceiverConfig.adapter.removeTopic("topic2", "topic3");
}
}
全部评论(0)