引言
Hello 大家好,这里是Anyin。
前段时间因为工作涉及到和硬件设备打交道,做了一些MQTT相关的工作。今天在这里也做下简单的分享。
基础概念
在做相关开发工作之前,我们先需要了解下什么是MQTT。
MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。
• MQTT官网[1]
• MQTT V3.1.1协议规范[2]
MQTT协议具有如下特点:
•开放消息协议,简单易实现
•发布订阅模式,一对多消息发布
•基于TCP/IP网络连接
•1字节固定报头,2字节心跳报文,报文结构紧凑
•消息QoS支持,可靠传输保证
MQTT主要的应用场景:
•物联网M2M通信,物联网大数据采集
•Android消息推送,WEB消息推送
•移动即时消息,例如Facebook Messenger
•智能硬件、智能家具、智能电器
•车联网通信,电动车站桩采集
•智慧城市、远程医疗、远程教育
•电力、石油与能源等行业市场
更多详细信息可以查看官网。这里就不复述了。
对于MQTT服务端的安装,我们这里使用 EMQX , 其官网地址:
实现一个MQTT客户端
当我们 EMQ 服务端安装之后,我们就可以进行编码我们的MQTT客户端,用于接收设备端的消息或者给设备端发送消息,整个过程都是 异步 的。
1. pom.xml 添加依赖
org.eclipse.paho
org.eclipse.paho.client.mqttv3
${mqtt.version}
2.封装 MqttClient 实例 在步骤1,我们依赖了一个mqtt的第三方库,为了以防后续可能替换其他第三方库,我们需要对其 MqttClient 进行一个简单的封装。
新增一个 MQTTClient 类。
/**
* 实例化MQTT 实例
* @param properties 配置信息
* @param factory 扩展点工厂
*/
public MQTTClient(MQTTProperties properties,
IExtensionHandlerFactory factory,
List subscribeTopics) throws MqttException {
if(CollectionUtils.isEmpty(subscribeTopics)){
throw new CommonBusinessException("-1","订阅列表不能为空");
}
this.subscribeTopics = subscribeTopics;
this.properties = properties;
this.factory = factory;
this.clientId = "SERVER_" + RandomUtil.randomString(8);
this.init();
}
• MQTTProperties 是MQTT的相关配置熟悉
• IExtensionHandlerFactory 扩展点工厂组件,在接收消息的时候,需要根据不同的指令进行业务处理,所以需要这个组件
• List
接着,我们看看 init 方法。
/**
* 初始化MQTT Client 实例
*/
public void init() throws MqttException {
String broker = "tcp://" + properties.getHost() + ":" + properties.getPort();
MemoryPersistence persistence = new MemoryPersistence();
try {
if(client == null){
client = new MqttClient(broker, clientId, persistence);
}
MqttConnectOptions connOpts = this.getOptions();
if (client.isConnected()) {
client.disconnect();
}
client.connect(connOpts);
client.setCallback(new MQTTSubscribe(this, factory, subscribeTopics));
// 订阅路径
client.subscribe(this.getSubscribeTopicList());
}catch (MqttException ex) {
log.error("mqtt服务初始化失败: {}", ex.getMessage(), ex);
throw ex;
}
log.info("mqtt服务连接成功");
}
这里主要处理了客户端实例连接服务器的一些操作,主要有设置参数 connOpts 、设置接收消息的回调 setCallback ,设置订阅设备端的消息主题 subscribe 。
这里有地方需要特别注意下,在进行连接的时候之前,做了一个 client.isConnected() 的判断,如果连接的状态,则需要手动的断开连接 client.disconnect() 。这里主要是为了做重连的时候,能够确保客户端是断开连接的状态,然后再进行重连。
客户端连接的逻辑处理了,我们还需要处理下发送消息的逻辑,简单的封装下即可。
/**
* 发送消息
* @param path path
* @param deviceId 设备ID
* @param content 发送内容
*/
public void publish(String path, String deviceId, byte[] content){
try {
MqttMessage message = new MqttMessage(content);
message.setQos(properties.getQos());
String topic = path + deviceId;
client.publish( topic, message);
}catch (Exception ex){
log.error("mqtt服务发送消息失败: deviceId: {} {}",deviceId, ex.getMessage(), ex);
}
}
3.处理订阅的消息
基本的客户端实例化我们已经处理完了,接着需要处理上行的消息(就是订阅的消息)。
对于不同厂商上来的业务消息可能不一样,有可能是 MQTT协议 包含着JSON的字符串的业务数据,也有可能是MQTT协议包含的是 二进制的私有协议 。
为了抽象上行的消息,我们定义了2个接口,分别抽象上行消息的整包对象和上行消息的某个指令。上行消息的整包对象就是从订阅接口返回的完整的 byte[] 数据包;而上行消息的某个指令是指在这个完整的数据包内肯定会有某个字段指明本次消息是属于什么业务的,可能是心跳、可能是状态等等。
分别新增一个 MQTTProtocol 和 Cmd 类。
@Data
public abstract class MQTTProtocol {
/**
* 设备ID
*/
private String deviceId;
/**
* 消息唯一序号
*/
private String msgId;
/**
* 具体某个业务的指令
*/
private Cmd cmd;
}
public interface Cmd {
/**
* 指令类型,上行的指令或者是下行的指令
*/
CmdTypeEnum getCmdType();
/**
* 指令发送的目标topic
*/
Topic getTopic();
}
接着,我们再新增一个协议的处理器接口: MQTTProtocolHandler
public interface MQTTProtocolHandler extends IExtensionHandler {
String getDeviceId(String topic, byte[] payload);
/**
* 解码
* @param payload 原始数据
* @return 协议
*/
T decode(byte[] payload);
/**
* 校验
* @param protocol 解析出来的基础协议
* @param payload 原始数据
* @return true 通过 false 不通过
*/
boolean check(T protocol, byte[] payload);
/**
* 编码
* @param protocol 协议
* @param data 业务数据
* @return 编码数据
*/
byte[] encode(T protocol, byte[] data);
/**
* 业务处理
* @param protocol 协议
*/
byte[] handle(T protocol);
/**
* 错误响应
* @param protocol 协议
*/
byte[] error(T protocol);
}
这个接口把整个消息的处理过程分为5个步骤:解码、校验、编码、业务处理、错误响应。该接口是一个扩展点,扩展点的枚举类是: BusinessType ,它表示业务类型,即使不同的业务,可能会不同的编解码和处理规则。例如:JSON的数据和二进制的私有协议,它们的编解码就不一样。
然后,我们再看看当接收到消息的时候,我们如何使用这个扩展点进行业务逻辑处理。
@Override
public void messageArrived(String subscribeTopic, MqttMessage message) throws Exception {
try {
// 根据topic解析不同的业务类型
BusinessType businessType = this.matchBusinessTypeBySubscribeTopic(subscribeTopic);
// 根据业务类型拿到具体的协议处理器
MQTTProtocolHandler protocolHandler = extensionHandlerFactory.getExtensionHandler(businessType, MQTTProtocolHandler.class);
// 获取设备ID
String deviceId = protocolHandler.getDeviceId(subscribeTopic, message.getPayload());
// 整包协议解码
MQTTProtocol protocol = protocolHandler.decode(message.getPayload());
if (protocol == null) {
log.error("协议解析异常,无法进行应答");
return;
}
// 指令
Cmd cmd = protocol.getCmd();
if(cmd == null){
log.error("解析后指令为空,无法进行应答");
return;
}
// 设置基础信息
protocol.setMsgId(String.valueOf(message.getId()));
protocol.setDeviceId(deviceId);
// 校验
boolean success = protocolHandler.check(protocol, message.getPayload());
if(!success){
this.errorHandle(protocolHandler, protocol, cmd.getTopic());
return;
}
try {
// 业务处理
byte[] result = protocolHandler.handle(protocol);
// 应答
if(CmdTypeEnum.DOWN == cmd.getCmdType()){
log.info("下行消息,无需应答");
return;
}
Topic topic = cmd.getTopic();
if(topic == null){
log.error("上行消息的发布Topic为空,无需进行应答");
return;
}
// 编码后进行应答
byte[] content = protocolHandler.encode(protocol, result);
client.publish(topic.getTopic(), deviceId, content);
} catch (Exception ex) {
log.error("业务逻辑处理异常: {}, 原始数据:{}", ex.getMessage(), ByteUtil.byte2Str(message.getPayload()), ex);
this.errorHandle(protocolHandler, protocol, cmd.getTopic());
}
}catch (Exception ex){
log.error("unknown error: {}, 原始数据:{}", ex.getMessage(), ByteUtil.byte2Str(message.getPayload()), ex);
}
}
4.处理需要发送的消息
该步骤3,我们处理的是上行的消息,会涉及到解码、业务处理、编码、应答等步骤。接着我们需要处理发送的消息,即下行的消息。
下行的消息处理会比较简单,只要拿到对应的 MQTTClient 实例和协议处理器即可编码之后,然后进行下发消息
@Slf4j
public class MQTTPublish {
private MQTTClient client;
private MQTTProtocolHandler protocolHandler;
public MQTTPublish(MQTTClient client, MQTTProtocolHandler protocolHandler) {
this.client = client;
this.protocolHandler = protocolHandler;
}
public void publish(MQTTProtocol protocol, byte[] data){
byte[] content = protocolHandler.encode(protocol, data);
String deviceId = protocol.getDeviceId();
String topic = protocol.getCmd().getTopic().getTopic();
client.publish(topic, deviceId, content);
}
}
以上的代码只能处理异步的下行协议,在某些场景下,下行协议还需要等待设备端的应答。那这个时候这段代码就无法满足需求。
所以,我们还需要对这段代码再封装。我们设计一个扩展点,不同的业务类型具有不同的发送逻辑
public interface MQTTPublishHandler extends IExtensionHandler {
T handle(C cmd, Class clazz);
}
接着处理其实现类。
@Override
public T handle(C cmd, Class clazz) {
CmdEnum cmdEnum = CmdEnum.get(cmd.getCmd());
// 编码
EncodeCmdHandler handler = factory.getExtensionHandler(cmdEnum, EncodeCmdHandler.class);
byte[] data = handler.encode(cmd);
// 根据业务类型,拿到具体的协议处理器
MQTTProtocolHandler protocolHandler = factory.getExtensionHandler(BusinessType.CHARGING, MQTTProtocolHandler.class);
MQTTPublish publish = new MQTTPublish(client, protocolHandler);
Long serial = this.getSerial(cmd.getDeviceId());
// TODO 这里是具体的实现类,需要具体业务实现
ChargingMQTTProtocol protocol = new ChargingMQTTProtocol();
protocol.setSerial(serial.shortValue());
protocol.setDeviceId(cmd.getDeviceId());
protocol.setVersion("10");
protocol.setMac(cmd.getDeviceId());
protocol.setCode(cmd.getCmd());
protocol.setCmd(cmd);
publish.publish(protocol, data);
// 阻塞应答
RedisMessageTask task = new RedisMessageTask();
RedisMessageListener listener = new RedisMessageListener(task);
try {
// 配置RedisKey
String key = MQTTRedisKeyUtil.callbackKey(cmd.getTopic().getTopic(), cmd.getDeviceId(), serial);
ChannelTopic topic = new ChannelTopic(key);
redisMessageListenerContainer.addMessageListener(listener, topic);
// 同步阻塞
Message message = (Message)task.getFuture().get(60000, TimeUnit.MILLISECONDS);
return JsonUtil.fromJson(message.toString(), clazz);
}catch (Exception ex){
log.error("消息获取失败: {}", ex.getMessage(), ex);
throw new CommonBusinessException("-1", "Redis应答失败: " + ex.getMessage());
} finally {
redisMessageListenerContainer.removeMessageListener(listener);
}
}
这里是使用Java的 CompletableFuture 类进行异步阻塞的。另外我们通过使用Redis的MQ机制,在Redis实现一个监听,当有上行的消息是作为下行的应答的时候,则通过 StringRedisTemplate#convertAndSend 发送消息,监听收到消息后,设置到 CompletableFuture 中进行应答。
RedisMessageTask 会持有一个CompletableFuture实例和RedisMessageListener的引用。它的代码如下:
@Data
public class RedisMessageTask{
private CompletableFuture future = new CompletableFuture<>();
// Redis的监听器
private RedisMessageListener listener;
}
RedisMessageListener 持有 RedisMessageTask 的引用,在收到消息的时候,把消息设置到 CompletableFuture 中,则 CompletableFuture 实例的阻塞就会收到应答。
public class RedisMessageListener implements MessageListener {
private RedisMessageTask task;
public RedisMessageListener(RedisMessageTask task) {
this.task = task;
}
@Override
public void onMessage(Message message, byte[] bytes) { ;
task.getFuture().complete(message);
}
}
最后,在上行应答的时候发送消息即可。
stringRedisTemplate.convertAndSend(key, JsonUtil.toJson(data));
最后
好了,以上就是前端时间接触MQTT相关内容的一些笔记。相关代码因为在实现的过程中,未做更细致的设计和解耦,部分还是和业务耦合。但是后续也会整理成一个Lib包放在 Anyin Cloud[3] 项目中,敬请期待。
References
[1] MQTT官网:
[2] MQTT V3.1.1协议规范:
[3] Anyin Cloud:
相关文章
本站已关闭游客评论,请登录或者注册后再评论吧~