mirror of
https://github.com/kerwincui/FastBee.git
synced 2025-10-13 03:44:41 +08:00
接收设备消息的异常处理
This commit is contained in:
@@ -33,8 +33,10 @@ public class EmqxService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IDeviceLogService deviceLogService;
|
private IDeviceLogService deviceLogService;
|
||||||
|
|
||||||
/** 订阅的主题 */
|
/**
|
||||||
private static final String prefix="/+/+/";
|
* 订阅的主题
|
||||||
|
*/
|
||||||
|
private static final String prefix = "/+/+/";
|
||||||
String sInfoTopic = prefix + "info/post";
|
String sInfoTopic = prefix + "info/post";
|
||||||
String sNtpTopic = prefix + "ntp/post";
|
String sNtpTopic = prefix + "ntp/post";
|
||||||
String sPropertyTopic = prefix + "property/post";
|
String sPropertyTopic = prefix + "property/post";
|
||||||
@@ -43,7 +45,9 @@ public class EmqxService {
|
|||||||
String sShadowPropertyTopic = prefix + "property-offline/post";
|
String sShadowPropertyTopic = prefix + "property-offline/post";
|
||||||
String sShadowFunctionTopic = prefix + "function-offline/post";
|
String sShadowFunctionTopic = prefix + "function-offline/post";
|
||||||
|
|
||||||
/** 发布的主题 */
|
/**
|
||||||
|
* 发布的主题
|
||||||
|
*/
|
||||||
String pStatusTopic = "/status/post";
|
String pStatusTopic = "/status/post";
|
||||||
String pNtpTopic = "/ntp/get";
|
String pNtpTopic = "/ntp/get";
|
||||||
String pPropertyTopic = "/property/get";
|
String pPropertyTopic = "/property/get";
|
||||||
@@ -55,50 +59,50 @@ public class EmqxService {
|
|||||||
// 订阅时钟同步
|
// 订阅时钟同步
|
||||||
client.subscribe(sNtpTopic, 1);
|
client.subscribe(sNtpTopic, 1);
|
||||||
// 订阅设备属性
|
// 订阅设备属性
|
||||||
client.subscribe(sPropertyTopic,1);
|
client.subscribe(sPropertyTopic, 1);
|
||||||
// 订阅设备功能
|
// 订阅设备功能
|
||||||
client.subscribe(sFunctionTopic,1);
|
client.subscribe(sFunctionTopic, 1);
|
||||||
// 订阅设备事件
|
// 订阅设备事件
|
||||||
client.subscribe(sEventTopic,1);
|
client.subscribe(sEventTopic, 1);
|
||||||
// 订阅属性(影子模式)
|
// 订阅属性(影子模式)
|
||||||
client.subscribe(sShadowPropertyTopic,1);
|
client.subscribe(sShadowPropertyTopic, 1);
|
||||||
// 订阅功能(影子模式)
|
// 订阅功能(影子模式)
|
||||||
client.subscribe(sShadowFunctionTopic,1);
|
client.subscribe(sShadowFunctionTopic, 1);
|
||||||
logger.info("mqtt订阅了设备信息和物模型主题");
|
logger.info("mqtt订阅了设备信息和物模型主题");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void subscribeCallback(String topic, MqttMessage mqttMessage){
|
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
|
||||||
// subscribe后得到的消息会执行到这里面
|
// subscribe后得到的消息会执行到这里面
|
||||||
String message=new String(mqttMessage.getPayload());
|
String message = new String(mqttMessage.getPayload());
|
||||||
logger.info("接收消息主题 : " + topic);
|
logger.info("接收消息主题 : " + topic);
|
||||||
logger.info("接收消息Qos : " + mqttMessage.getQos());
|
logger.info("接收消息Qos : " + mqttMessage.getQos());
|
||||||
logger.info("接收消息内容 : " + message);
|
logger.info("接收消息内容 : " + message);
|
||||||
|
|
||||||
String[] topicItem=topic.substring(1).split("/");
|
String[] topicItem = topic.substring(1).split("/");
|
||||||
Long productId= Long.valueOf(topicItem[0]);
|
Long productId = Long.valueOf(topicItem[0]);
|
||||||
String deviceNum=topicItem[1];
|
String deviceNum = topicItem[1];
|
||||||
String name=topicItem[2];
|
String name = topicItem[2];
|
||||||
switch (name){
|
switch (name) {
|
||||||
case "info":
|
case "info":
|
||||||
reportDevice(productId,deviceNum,message);
|
reportDevice(productId, deviceNum, message);
|
||||||
break;
|
break;
|
||||||
case "ntp":
|
case "ntp":
|
||||||
publishNtp(productId,deviceNum,message);
|
publishNtp(productId, deviceNum, message);
|
||||||
break;
|
break;
|
||||||
case "property":
|
case "property":
|
||||||
reportProperty(productId,deviceNum,message,false);
|
reportProperty(productId, deviceNum, message, false);
|
||||||
break;
|
break;
|
||||||
case "function":
|
case "function":
|
||||||
reportFunction(productId,deviceNum,message,false);
|
reportFunction(productId, deviceNum, message, false);
|
||||||
break;
|
break;
|
||||||
case "event":
|
case "event":
|
||||||
reportEvent(productId,deviceNum,message);
|
reportEvent(productId, deviceNum, message);
|
||||||
break;
|
break;
|
||||||
case "property-offline":
|
case "property-offline":
|
||||||
reportProperty(productId,deviceNum,message,true);
|
reportProperty(productId, deviceNum, message, true);
|
||||||
break;
|
break;
|
||||||
case "function-offline":
|
case "function-offline":
|
||||||
reportFunction(productId,deviceNum,message,true);
|
reportFunction(productId, deviceNum, message, true);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -106,110 +110,123 @@ public class EmqxService {
|
|||||||
/**
|
/**
|
||||||
* 上报设备信息
|
* 上报设备信息
|
||||||
*/
|
*/
|
||||||
private void reportDevice(Long productId,String deviceNum,String message){
|
private void reportDevice(Long productId, String deviceNum, String message) {
|
||||||
Device device=JSON.parseObject(message,Device.class);
|
try {
|
||||||
device.setProductId(productId);
|
Device device = JSON.parseObject(message, Device.class);
|
||||||
device.setSerialNumber(deviceNum);
|
device.setProductId(productId);
|
||||||
deviceService.reportDevice(device);
|
device.setSerialNumber(deviceNum);
|
||||||
|
deviceService.reportDevice(device);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("接收设备信息,解析数据时异常 message={}", e.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 上报属性
|
* 上报属性
|
||||||
|
*
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
private void reportProperty(Long productId,String deviceNum,String message,boolean isShadow){
|
private void reportProperty(Long productId, String deviceNum, String message, boolean isShadow) {
|
||||||
try {
|
try {
|
||||||
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems = JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
||||||
ThingsModelValuesInput input=new ThingsModelValuesInput();
|
ThingsModelValuesInput input = new ThingsModelValuesInput();
|
||||||
input.setProductId(productId);
|
input.setProductId(productId);
|
||||||
input.setDeviceNumber(deviceNum);
|
input.setDeviceNumber(deviceNum);
|
||||||
input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems);
|
input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems);
|
||||||
deviceService.reportDeviceThingsModelValue(input,1,isShadow);
|
deviceService.reportDeviceThingsModelValue(input, 1, isShadow);
|
||||||
}catch (Exception e){
|
} catch (Exception e) {
|
||||||
logger.error("接收属性数据,解析数据时异常 message={}",e.getMessage());
|
logger.error("接收属性数据,解析数据时异常 message={}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 上报功能
|
* 上报功能
|
||||||
|
*
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
private void reportFunction(Long productId,String deviceNum,String message,boolean isShadow){
|
private void reportFunction(Long productId, String deviceNum, String message, boolean isShadow) {
|
||||||
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
try {
|
||||||
ThingsModelValuesInput input=new ThingsModelValuesInput();
|
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems = JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
||||||
input.setProductId(productId);
|
ThingsModelValuesInput input = new ThingsModelValuesInput();
|
||||||
input.setDeviceNumber(deviceNum);
|
input.setProductId(productId);
|
||||||
input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems);
|
input.setDeviceNumber(deviceNum);
|
||||||
deviceService.reportDeviceThingsModelValue(input,2,isShadow);
|
input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems);
|
||||||
|
deviceService.reportDeviceThingsModelValue(input, 2, isShadow);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("接收功能,解析数据时异常 message={}", e.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 上报事件
|
* 上报事件
|
||||||
|
*
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
private void reportEvent(Long productId,String deviceNum,String message){
|
private void reportEvent(Long productId, String deviceNum, String message) {
|
||||||
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
try {
|
||||||
Device device =deviceService.selectDeviceBySerialNumber(deviceNum);
|
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems = JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
||||||
for(int i=0;i<thingsModelValueRemarkItems.size();i++) {
|
Device device = deviceService.selectDeviceBySerialNumber(deviceNum);
|
||||||
// 添加到设备日志
|
for (int i = 0; i < thingsModelValueRemarkItems.size(); i++) {
|
||||||
DeviceLog deviceLog = new DeviceLog();
|
// 添加到设备日志
|
||||||
deviceLog.setDeviceId(device.getDeviceId());
|
DeviceLog deviceLog = new DeviceLog();
|
||||||
deviceLog.setDeviceName(device.getDeviceName());
|
deviceLog.setDeviceId(device.getDeviceId());
|
||||||
deviceLog.setLogValue(thingsModelValueRemarkItems.get(i).getValue());
|
deviceLog.setDeviceName(device.getDeviceName());
|
||||||
deviceLog.setRemark(thingsModelValueRemarkItems.get(i).getRemark());
|
deviceLog.setLogValue(thingsModelValueRemarkItems.get(i).getValue());
|
||||||
deviceLog.setSerialNumber(device.getSerialNumber());
|
deviceLog.setRemark(thingsModelValueRemarkItems.get(i).getRemark());
|
||||||
deviceLog.setIdentity(thingsModelValueRemarkItems.get(i).getId());
|
deviceLog.setSerialNumber(device.getSerialNumber());
|
||||||
deviceLog.setLogType(3);
|
deviceLog.setIdentity(thingsModelValueRemarkItems.get(i).getId());
|
||||||
deviceLog.setIsMonitor(0);
|
deviceLog.setLogType(3);
|
||||||
deviceLogService.insertDeviceLog(deviceLog);
|
deviceLog.setIsMonitor(0);
|
||||||
|
deviceLogService.insertDeviceLog(deviceLog);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("接收事件,解析数据时异常 message={}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 1.发布设备状态
|
* 1.发布设备状态
|
||||||
*/
|
*/
|
||||||
public void publishStatus(Long productId,String deviceNum,int deviceStatus,int isShadow){
|
public void publishStatus(Long productId, String deviceNum, int deviceStatus, int isShadow) {
|
||||||
String message="{\"status\":"+deviceStatus+",\"isShadow\":"+isShadow+"}";
|
String message = "{\"status\":" + deviceStatus + ",\"isShadow\":" + isShadow + "}";
|
||||||
emqxClient.publish(1,false,"/"+productId+"/"+deviceNum+pStatusTopic, message);
|
emqxClient.publish(1, false, "/" + productId + "/" + deviceNum + pStatusTopic, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 2.发布时钟同步信息
|
* 2.发布时钟同步信息
|
||||||
|
*
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
private void publishNtp(Long productId,String deviceNum,String message){
|
private void publishNtp(Long productId, String deviceNum, String message) {
|
||||||
NtpModel ntpModel=JSON.parseObject(message,NtpModel.class);
|
NtpModel ntpModel = JSON.parseObject(message, NtpModel.class);
|
||||||
ntpModel.setServerRecvTime(System.currentTimeMillis());
|
ntpModel.setServerRecvTime(System.currentTimeMillis());
|
||||||
ntpModel.setServerSendTime(System.currentTimeMillis());
|
ntpModel.setServerSendTime(System.currentTimeMillis());
|
||||||
emqxClient.publish(1, false, "/"+productId+"/"+deviceNum+pNtpTopic, JSON.toJSONString(ntpModel));
|
emqxClient.publish(1, false, "/" + productId + "/" + deviceNum + pNtpTopic, JSON.toJSONString(ntpModel));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 3.发布属性
|
* 3.发布属性
|
||||||
*/
|
*/
|
||||||
public void publishProperty(Long productId,String deviceNum,List<IdentityAndName> thingsList){
|
public void publishProperty(Long productId, String deviceNum, List<IdentityAndName> thingsList) {
|
||||||
if(thingsList==null){
|
if (thingsList == null) {
|
||||||
emqxClient.publish(1,true,"/"+productId+"/"+deviceNum+pPropertyTopic, "");
|
emqxClient.publish(1, true, "/" + productId + "/" + deviceNum + pPropertyTopic, "");
|
||||||
}else{
|
} else {
|
||||||
emqxClient.publish(1,true,"/"+productId+"/"+deviceNum+pPropertyTopic, JSON.toJSONString(thingsList));
|
emqxClient.publish(1, true, "/" + productId + "/" + deviceNum + pPropertyTopic, JSON.toJSONString(thingsList));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 4.发布功能
|
* 4.发布功能
|
||||||
*/
|
*/
|
||||||
public void publishFunction(Long productId,String deviceNum,List<IdentityAndName> thingsList){
|
public void publishFunction(Long productId, String deviceNum, List<IdentityAndName> thingsList) {
|
||||||
if(thingsList==null){
|
if (thingsList == null) {
|
||||||
emqxClient.publish(1,true,"/"+productId+"/"+deviceNum+pFunctionTopic, "");
|
emqxClient.publish(1, true, "/" + productId + "/" + deviceNum + pFunctionTopic, "");
|
||||||
}else{
|
} else {
|
||||||
emqxClient.publish(1,true,"/"+productId+"/"+deviceNum+pFunctionTopic, JSON.toJSONString(thingsList));
|
emqxClient.publish(1, true, "/" + productId + "/" + deviceNum + pFunctionTopic, JSON.toJSONString(thingsList));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user