【同步】BOOT 和 CLOUD 的功能(iot)

pull/248/MERGE
YunaiV 2026-05-03 23:25:40 +08:00
parent 6b91b4169d
commit 996ac02c0b
47 changed files with 1898 additions and 155 deletions

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.enums.alert;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@ -15,12 +16,21 @@ import java.util.Arrays;
@Getter
public enum IotAlertReceiveTypeEnum implements ArrayValuable<Integer> {
SMS(1), // 短信
MAIL(2), // 邮箱
NOTIFY(3); // 站内信
SMS(1, "iot_alert_sms"), // 短信
MAIL(2, "iot_alert_mail"), // 邮箱
NOTIFY(3, "iot_alert_notify"); // 站内信
// TODO 待实现(欢迎 pull requestwebhook 4
/**
*
*/
private final Integer type;
/**
*
*
* SmsTemplateDO / MailTemplateDO / NotifyTemplateDO code
*/
private final String templateCode;
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotAlertReceiveTypeEnum::getType).toArray(Integer[]::new);
@ -29,4 +39,8 @@ public enum IotAlertReceiveTypeEnum implements ArrayValuable<Integer> {
return ARRAYS;
}
public static IotAlertReceiveTypeEnum of(Integer type) {
return ArrayUtil.firstMatch(item -> item.getType().equals(type), values());
}
}

View File

@ -19,9 +19,9 @@ public enum IotDataSinkTypeEnum implements ArrayValuable<Integer> {
TCP(2, "TCP"),
WEBSOCKET(3, "WebSocket"),
MQTT(10, "MQTT"), // TODO @puhui999待实现
MQTT(10, "MQTT"),
DATABASE(20, "Database"), // TODO @puhui999待实现
DATABASE(20, "Database"),
REDIS(21, "Redis"),
ROCKETMQ(30, "RocketMQ"),

View File

@ -36,13 +36,11 @@ public enum IotSceneRuleConditionOperatorEnum implements ArrayValuable<String> {
// ========== 特殊:不放在字典里 ==========
// TODO @puhui999@芋艿:需要测试下
DATE_TIME_GREATER_THAN("date_time_>", "#source > #value"), // 在时间之后:时间戳
DATE_TIME_LESS_THAN("date_time_<", "#source < #value"), // 在时间之前:时间戳
DATE_TIME_BETWEEN("date_time_between", // 在时间之间:时间戳
"(#source >= #values.get(0)) && (#source <= #values.get(1))"),
// TODO @puhui999@芋艿:需要测试下
TIME_GREATER_THAN("time_>", "#source.isAfter(#value)"), // 在当日时间之后HH:mm:ss
TIME_LESS_THAN("time_<", "#source.isBefore(#value)"), // 在当日时间之前HH:mm:ss
TIME_BETWEEN("time_between", // 在当日时间之间HH:mm:ss

View File

@ -1,15 +1,17 @@
package cn.iocoder.yudao.module.iot.core.util;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
/**
* IoT
@ -54,22 +56,59 @@ public class IotDeviceMessageUtils {
* @param message
* @return
*/
@SuppressWarnings("unchecked")
public static String getIdentifier(IotDeviceMessage message) {
if (message.getParams() == null) {
if (message == null || message.getParams() == null) {
return null;
}
Object params = message.getParams();
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "identifier");
} else if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "state");
return StrUtil.toStringOrNull(readField(params, "identifier"));
} else if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
return StrUtil.toStringOrNull(readField(params, "state"));
}
return null;
}
/**
* params Map POJODTO
*
* WhyMQ JSON params Map线 producer DTO IotDeviceEventPostReqDTO
* matcher JVM 线
*/
private static Object readField(Object params, String fieldName) {
if (params == null) {
return null;
}
if (params instanceof Map) {
return ((Map<?, ?>) params).get(fieldName);
}
try {
return ReflectUtil.getFieldValue(params, fieldName);
} catch (Exception ignored) {
return null;
}
}
/**
*
*
* { temperature: 25.5, humidity: 60 } key
*
* @param message
* @return null
*/
public static Set<String> getPropertyIdentifiers(IotDeviceMessage message) {
if (message == null) {
return new LinkedHashSet<>();
}
Map<String, Object> params = parseParamsToMap(message.getParams());
if (params == null) {
return new LinkedHashSet<>();
}
return new LinkedHashSet<>(params.keySet());
}
/**
*
* <p>
@ -82,8 +121,9 @@ public class IotDeviceMessageUtils {
* @param identifier
* @return
*/
@SuppressWarnings("unchecked")
public static boolean containsIdentifier(IotDeviceMessage message, String identifier) {
if (message.getParams() == null || StrUtil.isBlank(identifier)) {
if (message == null || message.getParams() == null || StrUtil.isBlank(identifier)) {
return false;
}
// EVENT_POST / SERVICE_INVOKE / STATE_UPDATE使用原有逻辑
@ -91,10 +131,17 @@ public class IotDeviceMessageUtils {
if (messageIdentifier != null) {
return identifier.equals(messageIdentifier);
}
// PROPERTY_POST检查 params 中是否包含该属性 key
// PROPERTY_POST检查 params 中是否包含该属性 key(支持扁平和嵌套 properties 结构)
if (StrUtil.equals(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) {
Map<String, Object> params = parseParamsToMap(message.getParams());
return params != null && params.containsKey(identifier);
if (params == null) {
return false;
}
if (params.containsKey(identifier)) {
return true;
}
Object properties = params.get("properties");
return properties instanceof Map && ((Map<String, Object>) properties).containsKey(identifier);
}
return false;
}
@ -132,9 +179,6 @@ public class IotDeviceMessageUtils {
/**
*
* -
* -
* -
* <p>
*
* 1. params Map
@ -150,7 +194,7 @@ public class IotDeviceMessageUtils {
*/
@SuppressWarnings("unchecked")
public static Object extractPropertyValue(IotDeviceMessage message, String identifier) {
Object params = message.getParams();
Object params = message != null ? message.getParams() : null;
if (params == null) {
return null;
}
@ -206,6 +250,19 @@ public class IotDeviceMessageUtils {
return null;
}
/**
*
* <p>
* params {"identifier": "xxx", "value": ...} value
* value // {level, message}
*
* @param message
* @return null
*/
public static Object extractEventValue(IotDeviceMessage message) {
return readField(message != null ? message.getParams() : null, "value");
}
/**
*
* <p>
@ -220,23 +277,16 @@ public class IotDeviceMessageUtils {
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> extractServiceInputParams(IotDeviceMessage message) {
// 1. 参数校验
if (message == null || message.getParams() == null) {
return null;
}
Object params = message.getParams();
if (params == null) {
return null;
}
if (!(params instanceof Map)) {
return null;
}
Map<String, Object> paramsMap = (Map<String, Object>) params;
// 尝试从 inputData 字段获取
Object inputData = paramsMap.get("inputData");
// 兼容 Map 和 POJO如 IotDeviceServiceInvokeReqDTO两种 params 形态
Object inputData = readField(params, "inputData");
if (inputData instanceof Map) {
return (Map<String, Object>) inputData;
}
// 尝试从 inputParams 字段获取
Object inputParams = paramsMap.get("inputParams");
Object inputParams = readField(params, "inputParams");
if (inputParams instanceof Map) {
return (Map<String, Object>) inputParams;
}

View File

@ -2,10 +2,12 @@ package cn.iocoder.yudao.module.iot.core.util;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.*;
@ -71,7 +73,7 @@ public class IotDeviceMessageUtilsTest {
@Test
public void testExtractPropertyValue_valueField() {
// 测试 value 字段
// 测试 value 字段(策略 5
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("identifier", "temperature");
@ -84,7 +86,7 @@ public class IotDeviceMessageUtilsTest {
@Test
public void testExtractPropertyValue_singleValueMap() {
// 测试单值 Map包含 identifier 和一个值
// 测试单值 Map策略 6包含 identifier 和一个其他字段
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("identifier", "temperature");
@ -139,6 +141,88 @@ public class IotDeviceMessageUtilsTest {
assertEquals(25.5, result); // 应该返回直接标识符的值
}
// ========== extractEventValue 测试 ==========
@Test
public void testExtractEventValue_scalar() {
// 标量事件值:{identifier: "gzzt", value: "normal"}
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("identifier", "gzzt");
params.put("value", "normal");
message.setParams(params);
Object result = IotDeviceMessageUtils.extractEventValue(message);
assertEquals("normal", result);
}
@Test
public void testExtractEventValue_struct() {
// 结构体事件值:{identifier: "alarm", value: {level: "high", message: "..."}}
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> eventValue = new HashMap<>();
eventValue.put("level", "high");
eventValue.put("message", "over temperature");
Map<String, Object> params = new HashMap<>();
params.put("identifier", "alarm");
params.put("value", eventValue);
message.setParams(params);
Object result = IotDeviceMessageUtils.extractEventValue(message);
assertEquals(eventValue, result);
}
@Test
public void testExtractEventValue_nullParams() {
IotDeviceMessage message = new IotDeviceMessage();
message.setParams(null);
Object result = IotDeviceMessageUtils.extractEventValue(message);
assertNull(result);
}
@Test
public void testExtractEventValue_paramsWithoutValueField() {
// params 是字符串等非结构化对象,无 value 字段,应返回 null
IotDeviceMessage message = new IotDeviceMessage();
message.setParams("not a map");
Object result = IotDeviceMessageUtils.extractEventValue(message);
assertNull(result);
}
@Test
public void testExtractEventValue_missingValueField() {
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("identifier", "gzzt");
message.setParams(params);
Object result = IotDeviceMessageUtils.extractEventValue(message);
assertNull(result);
}
@Test
public void testExtractEventValue_pojoParams() {
// 本地总线场景params 是 IotDeviceEventPostReqDTO POJO未经 JSON 反序列化),应能反射取到 value
IotDeviceMessage message = new IotDeviceMessage();
message.setParams(IotDeviceEventPostReqDTO.of("gzzt", "normal"));
Object result = IotDeviceMessageUtils.extractEventValue(message);
assertEquals("normal", result);
}
@Test
public void testGetIdentifier_eventPostPojoParams() {
// 本地总线场景EVENT_POST 消息 params 是 DTO POJO仍应能解析出 identifier
IotDeviceMessage message = new IotDeviceMessage();
message.setMethod(IotDeviceMessageMethodEnum.EVENT_POST.getMethod());
message.setParams(IotDeviceEventPostReqDTO.of("gzzt", "normal"));
assertEquals("gzzt", IotDeviceMessageUtils.getIdentifier(message));
}
// ========== notContainsIdentifier 测试 ==========
/**
@ -206,4 +290,62 @@ public class IotDeviceMessageUtilsTest {
assertTrue(notContainsResult);
assertEquals(!containsResult, notContainsResult);
}
// ========== getPropertyIdentifiers 测试 ==========
@Test
public void testGetPropertyIdentifiers_flatStructure() {
// 扁平结构:顶层 key 即标识符
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("temperature", 25.5);
params.put("humidity", 60);
message.setParams(params);
Set<String> identifiers = IotDeviceMessageUtils.getPropertyIdentifiers(message);
assertEquals(2, identifiers.size());
assertTrue(identifiers.contains("temperature"));
assertTrue(identifiers.contains("humidity"));
}
@Test
public void testGetPropertyIdentifiers_nullMessage() {
// 入参为 null返回空集合
Set<String> identifiers = IotDeviceMessageUtils.getPropertyIdentifiers(null);
assertNotNull(identifiers);
assertTrue(identifiers.isEmpty());
}
@Test
public void testGetPropertyIdentifiers_nullParams() {
// params 为 null返回空集合
IotDeviceMessage message = new IotDeviceMessage();
message.setParams(null);
Set<String> identifiers = IotDeviceMessageUtils.getPropertyIdentifiers(message);
assertTrue(identifiers.isEmpty());
}
@Test
public void testGetPropertyIdentifiers_emptyParams() {
// params 为空 Map返回空集合
IotDeviceMessage message = new IotDeviceMessage();
message.setParams(new HashMap<>());
Set<String> identifiers = IotDeviceMessageUtils.getPropertyIdentifiers(message);
assertTrue(identifiers.isEmpty());
}
@Test
public void testGetPropertyIdentifiers_jsonStringParams() {
// params 为 JSON 字符串parseParamsToMap 解析后正常提取顶层标识符
IotDeviceMessage message = new IotDeviceMessage();
message.setParams("{\"temperature\":25.5,\"humidity\":60}");
Set<String> identifiers = IotDeviceMessageUtils.getPropertyIdentifiers(message);
assertEquals(2, identifiers.size());
assertTrue(identifiers.contains("temperature"));
assertTrue(identifiers.contains("humidity"));
}
}

View File

@ -135,6 +135,13 @@
<scope>test</scope>
</dependency>
<!-- MQTT -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<optional>true</optional>
</dependency>
<!-- 消息队列相关 -->
<!-- TODO @芋艿:临时打开 -->
<dependency>

View File

@ -20,6 +20,7 @@ import lombok.Data;
@JsonSubTypes.Type(value = IotDataSinkTcpConfig.class, name = "2"),
@JsonSubTypes.Type(value = IotDataSinkWebSocketConfig.class, name = "3"),
@JsonSubTypes.Type(value = IotDataSinkMqttConfig.class, name = "10"),
@JsonSubTypes.Type(value = IotDataSinkDatabaseConfig.class, name = "20"),
@JsonSubTypes.Type(value = IotDataSinkRedisConfig.class, name = "21"),
@JsonSubTypes.Type(value = IotDataSinkRocketMQConfig.class, name = "30"),
@JsonSubTypes.Type(value = IotDataSinkRabbitMQConfig.class, name = "31"),

View File

@ -0,0 +1,42 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT Database {@link IotAbstractDataSinkConfig}
*
* JDBC
* MySQLPostgreSQLOracleSQL ServerDM
* HikariCP JDBC URL
*
* @author HUIHUI
*/
@Data
public class IotDataSinkDatabaseConfig extends IotAbstractDataSinkConfig {
/**
* JDBC
*
* jdbc:mysql://localhost:3306/iot_data
* jdbc:postgresql://localhost:5432/iot_data
* jdbc:dm://localhost:5236/iot_data
*
* HikariCP URL JDBC
*/
private String jdbcUrl;
/**
*
*/
private String username;
/**
*
*/
private String password;
/**
*
*
*
*/
private String tableName;
}

View File

@ -9,7 +9,6 @@ import lombok.Data;
import java.util.List;
// TODO @puhui999感觉这个是不是放到 dal 里会好点?(讨论下,先不改哈)
/**
* IoT
*

View File

@ -81,6 +81,7 @@ public interface IotDevicePropertyMapper {
void insert(@Param("device") IotDeviceDO device,
@Param("properties") Map<String, Object> properties,
@Param("ts") Long ts,
@Param("reportTime") Long reportTime);
List<IotDevicePropertyRespVO> selectListByHistory(@Param("reqVO") IotDevicePropertyHistoryListReqVO reqVO);

View File

@ -5,6 +5,8 @@ import cn.iocoder.yudao.module.iot.controller.admin.alert.vo.recrod.IotAlertReco
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertConfigDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertRecordDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.Collection;
@ -58,8 +60,11 @@ public interface IotAlertRecordService {
* @param config
* @param sceneRuleId
* @param deviceMessage
* @param device
* @return
*/
Long createAlertRecord(IotAlertConfigDO config, Long sceneRuleId, IotDeviceMessage deviceMessage);
@SuppressWarnings("UnusedReturnValue")
Long createAlertRecord(IotAlertConfigDO config, Long sceneRuleId,
@Nullable IotDeviceMessage deviceMessage, @Nullable IotDeviceDO device);
}

View File

@ -8,7 +8,6 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertConfigDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertRecordDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.mysql.alert.IotAlertRecordMapper;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
@ -28,9 +27,6 @@ public class IotAlertRecordServiceImpl implements IotAlertRecordService {
@Resource
private IotAlertRecordMapper alertRecordMapper;
@Resource
private IotDeviceService deviceService;
@Override
public IotAlertRecordDO getAlertRecord(Long id) {
return alertRecordMapper.selectById(id);
@ -57,20 +53,18 @@ public class IotAlertRecordServiceImpl implements IotAlertRecordService {
}
@Override
public Long createAlertRecord(IotAlertConfigDO config, Long sceneRuleId, IotDeviceMessage message) {
public Long createAlertRecord(IotAlertConfigDO config, Long sceneRuleId,
IotDeviceMessage message, IotDeviceDO device) {
// 构建告警记录
IotAlertRecordDO.IotAlertRecordDOBuilder builder = IotAlertRecordDO.builder()
.configId(config.getId()).configName(config.getName()).configLevel(config.getLevel())
.sceneRuleId(sceneRuleId).processStatus(false);
if (message != null) {
builder.deviceMessage(message);
// 填充设备信息
IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId());
if (device != null) {
builder.productId(device.getProductId()).deviceId(device.getId());
}
}
if (device != null) {
builder.productId(device.getProductId()).deviceId(device.getId());
}
// 插入记录
IotAlertRecordDO record = builder.build();
alertRecordMapper.insert(record);

View File

@ -95,7 +95,18 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
if (messageDO.getData() != null) {
messageDO.setData(JsonUtils.toJsonString(messageDO.getData()));
}
deviceMessageMapper.insert(messageDO);
if (messageDO.getTs() == null) {
messageDO.setTs(System.currentTimeMillis());
}
try {
deviceMessageMapper.insert(messageDO);
} catch (Exception ex) {
// 特殊:@Async 方法的异常默认会被 handler 吞掉,这里显式记录便于排查
log.error("[createDeviceLogAsync][消息日志写入失败 deviceId({}) messageId({}) paramsLen({}) dataLen({})]",
messageDO.getDeviceId(), messageDO.getId(),
StrUtil.length((String) messageDO.getParams()),
StrUtil.length((String) messageDO.getData()), ex);
}
}
@Override

View File

@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
@ -149,33 +150,38 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
List<IotThingModelDO> thingModels = thingModelService.getThingModelListByProductIdFromCache(device.getProductId());
Map<String, Object> properties = new HashMap<>();
params.forEach((key, value) -> {
IotThingModelDO thingModel = CollUtil.findOne(thingModels, o -> o.getIdentifier().equals(key));
// 忽略大小写匹配物模型,避免设备上报的 key 与 identifier 大小写不一致导致丢失
IotThingModelDO thingModel = CollUtil.findOne(thingModels,
o -> StrUtil.equalsIgnoreCase(o.getIdentifier(), (CharSequence) key));
if (thingModel == null || thingModel.getProperty() == null) {
log.error("[saveDeviceProperty][消息({}) 的属性({}) 不存在]", message, key);
return;
}
String identifier = thingModel.getIdentifier(); // 统一以物模型 identifier 作为 key避免大小写问题
String dataType = thingModel.getProperty().getDataType();
if (ObjectUtils.equalsAny(dataType,
IotDataSpecsDataTypeEnum.STRUCT.getDataType(), IotDataSpecsDataTypeEnum.ARRAY.getDataType())) {
// 特殊STRUCT 和 ARRAY 类型,在 TDengine 里,有没对应数据类型,只能通过 JSON 来存储
properties.put((String) key, JsonUtils.toJsonString(value));
properties.put(identifier, JsonUtils.toJsonString(value));
} else if (IotDataSpecsDataTypeEnum.INT.getDataType().equals(dataType)) {
properties.put((String) key, Convert.toInt(value));
properties.put(identifier, Convert.toInt(value));
} else if (IotDataSpecsDataTypeEnum.FLOAT.getDataType().equals(dataType)) {
properties.put((String) key, Convert.toFloat(value));
properties.put(identifier, Convert.toFloat(value));
} else if (IotDataSpecsDataTypeEnum.DOUBLE.getDataType().equals(dataType)) {
properties.put((String) key, Convert.toDouble(value));
} else if (IotDataSpecsDataTypeEnum.BOOL.getDataType().equals(dataType)) {
properties.put((String) key, Convert.toByte(value));
} else {
properties.put((String) key, value);
properties.put(identifier, Convert.toDouble(value));
} else if (IotDataSpecsDataTypeEnum.BOOL.getDataType().equals(dataType)) {
properties.put(identifier, Convert.toBool(value, false) ? (byte) 1 : (byte) 0);
} else {
properties.put(identifier, value);
}
});
if (CollUtil.isEmpty(properties)) {
log.error("[saveDeviceProperty][消息({}) 没有合法的属性]", message);
} else {
// 2.1 保存设备属性【数据】
devicePropertyMapper.insert(device, properties, LocalDateTimeUtil.toEpochMilli(message.getReportTime()));
devicePropertyMapper.insert(device, properties,
System.currentTimeMillis(),
LocalDateTimeUtil.toEpochMilli(message.getReportTime()));
// 2.2 保存设备属性【日志】
Map<String, IotDevicePropertyDO> properties2 = convertMap(properties.entrySet(), Map.Entry::getKey, entry ->

View File

@ -10,6 +10,7 @@ import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
import cn.iocoder.yudao.framework.common.util.spring.SpringUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRulePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.rule.IotDataRuleSaveReqVO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
@ -212,34 +213,76 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
@Override
public void executeDataRule(IotDeviceMessage message) {
try {
// 1. 获取匹配的数据流转规则
Long deviceId = message.getDeviceId();
String method = message.getMethod();
String identifier = IotDeviceMessageUtils.getIdentifier(message);
List<IotDataRuleDO> rules = getSelf().getDataRuleListByConditionFromCache(deviceId, method, identifier);
if (CollUtil.isEmpty(rules)) {
// 1. 匹配命中的规则
List<IotDataRuleDO> matchedRules;
Object identifierForLog;
if (IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod().equals(method)) {
// 属性上报params 含多个属性 key每个 key 都可能命中规则
Set<String> identifiers = IotDeviceMessageUtils.getPropertyIdentifiers(message);
matchedRules = matchPropertyPostDataRules(deviceId, method, identifiers);
identifierForLog = identifiers;
} else {
// 其他消息(事件 / 服务调用 / 状态):单一 identifier
String identifier = IotDeviceMessageUtils.getIdentifier(message);
matchedRules = getSelf().getDataRuleListByConditionFromCache(deviceId, method, identifier);
identifierForLog = identifier;
}
if (CollUtil.isEmpty(matchedRules)) {
log.debug("[executeDataRule][设备({}) 方法({}) 标识符({}) 没有匹配的数据流转规则]",
deviceId, method, identifier);
deviceId, method, identifierForLog);
return;
}
log.info("[executeDataRule][设备({}) 方法({}) 标识符({}) 匹配到 {} 条数据流转规则]",
deviceId, method, identifier, rules.size());
deviceId, method, identifierForLog, matchedRules.size());
// 2. 遍历规则,执行数据流转
rules.forEach(rule -> executeDataRule(message, rule));
// 2. 跨规则去重 sink避免多条规则命中同一数据目的时重复推送
Set<Long> processedSinkIds = new HashSet<>();
matchedRules.forEach(rule -> executeDataRule(message, rule, processedSinkIds));
} catch (Exception e) {
log.error("[executeDataRule][消息({}) 执行数据流转规则异常]", message, e);
}
}
/**
*
*
* identifier ruleId
*
* @param deviceId
* @param method
* @param identifiers
* @return
*/
private List<IotDataRuleDO> matchPropertyPostDataRules(Long deviceId, String method, Set<String> identifiers) {
LinkedHashMap<Long, IotDataRuleDO> matchedRuleMap = new LinkedHashMap<>();
// 情况一:先匹配未填 identifier 的「任意属性」规则,默认就匹配
collectMatchedRules(matchedRuleMap, deviceId, method, null);
// 情况二:再针对每个上报的属性标识符匹配限定具体 identifier 的规则
identifiers.forEach(identifier -> collectMatchedRules(matchedRuleMap, deviceId, method, identifier));
return new ArrayList<>(matchedRuleMap.values());
}
private void collectMatchedRules(Map<Long, IotDataRuleDO> matchedRuleMap,
Long deviceId, String method, String identifier) {
getSelf().getDataRuleListByConditionFromCache(deviceId, method, identifier)
.forEach(rule -> matchedRuleMap.putIfAbsent(rule.getId(), rule));
}
/**
*
*
* @param message
* @param rule
* @param message
* @param rule
* @param processedSinkIds
*/
private void executeDataRule(IotDeviceMessage message, IotDataRuleDO rule) {
private void executeDataRule(IotDeviceMessage message, IotDataRuleDO rule, Set<Long> processedSinkIds) {
rule.getSinkIds().forEach(sinkId -> {
// 同一消息下,多条规则命中同一数据目的时只推送一次
if (!processedSinkIds.add(sinkId)) {
return;
}
try {
// 获取数据目的配置
IotDataSinkDO dataSink = dataSinkService.getDataSinkFromCache(sinkId);

View File

@ -12,9 +12,6 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
// TODO @芋艿:数据库
// TODO @芋艿mqtt
/**
* {@link IotDataRuleAction}
*
@ -77,6 +74,18 @@ public abstract class IotDataRuleCacheableAction<Config, Producer> implements Io
return PRODUCER_CACHE.get(config);
}
/**
* 使 Producer
*
* Producer
* {@link #getProducer(Object)} {@link #initProducer(Object)}
*
* @param config
*/
protected void invalidateProducer(Config config) {
PRODUCER_CACHE.invalidate(config);
}
/**
*
*

View File

@ -0,0 +1,84 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkDatabaseConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
/**
* Database {@link IotDataRuleAction}
*
* JDBC
* MySQLPostgreSQLOracleSQL ServerDM
* HikariCP JDBC URL
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotDatabaseDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkDatabaseConfig, JdbcTemplate> {
@Override
public Integer getType() {
return IotDataSinkTypeEnum.DATABASE.getType();
}
@Override
public void execute(IotDeviceMessage message, IotDataSinkDatabaseConfig config) throws Exception {
try {
// 1. 获取或创建 JdbcTemplate
JdbcTemplate jdbcTemplate = getProducer(config);
// 2. 构建并执行 INSERT SQL
String sql = StrUtil.format(
"INSERT INTO {} (id, device_id, tenant_id, method, report_time, data, create_time) VALUES (?, ?, ?, ?, ?, ?, NOW())",
config.getTableName());
String messageJson = JsonUtils.toJsonString(message);
jdbcTemplate.update(sql,
message.getId(),
message.getDeviceId(),
message.getTenantId(),
message.getMethod(),
message.getReportTime(),
messageJson);
log.info("[execute][message({}) config({}) 写入数据库成功table: {}]",
message.getId(), config, config.getTableName());
} catch (Exception e) {
log.error("[execute][message({}) config({}) 写入数据库失败]", message, config, e);
throw e;
}
}
@Override
protected JdbcTemplate initProducer(IotDataSinkDatabaseConfig config) throws Exception {
// 使用 HikariCP 连接池HikariCP 会根据 JDBC URL 自动检测并加载对应的数据库驱动
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(config.getJdbcUrl());
hikariConfig.setUsername(config.getUsername());
hikariConfig.setPassword(config.getPassword());
// 连接池配置
hikariConfig.setMaximumPoolSize(5); // 数据流转场景,不需要太多连接
hikariConfig.setMinimumIdle(1);
hikariConfig.setConnectionTimeout(10000); // 连接超时 10 秒
hikariConfig.setIdleTimeout(300000); // 空闲超时 5 分钟
hikariConfig.setMaxLifetime(600000); // 最大生命周期 10 分钟
HikariDataSource dataSource = new HikariDataSource(hikariConfig);
log.info("[initProducer][数据库连接池创建成功jdbcUrl: {}]", config.getJdbcUrl());
return new JdbcTemplate(dataSource);
}
@Override
protected void closeProducer(JdbcTemplate producer) throws Exception {
if (producer.getDataSource() instanceof HikariDataSource) {
((HikariDataSource) producer.getDataSource()).close();
}
}
}

View File

@ -0,0 +1,108 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkMqttConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* MQTT {@link IotDataRuleAction}
*
* @author HUIHUI
*/
@ConditionalOnClass(name = "org.eclipse.paho.client.mqttv3.MqttClient")
@Component
@Slf4j
public class IotMqttDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkMqttConfig, MqttClient> {
/**
* QoS
*/
private static final int DEFAULT_QOS = 1;
@Override
public Integer getType() {
return IotDataSinkTypeEnum.MQTT.getType();
}
@Override
public void execute(IotDeviceMessage message, IotDataSinkMqttConfig config) throws Exception {
try {
// 1. 获取或创建 MqttClient
MqttClient mqttClient = getProducer(config);
// 2.1 检查连接状态,如果断开则踢出缓存并重新创建
if (!mqttClient.isConnected()) {
log.warn("[execute][MQTT 连接已断开,重新创建客户端,服务器: {}]", config.getUrl());
invalidateProducer(config); // 踢出旧的断连客户端,触发 closeProducer
mqttClient = getProducer(config); // 触发 initProducer 创建全新连接
}
// 2.2 构建并发送消息
MqttMessage mqttMessage = new MqttMessage(JsonUtils.toJsonString(message).getBytes(StandardCharsets.UTF_8));
mqttMessage.setQos(DEFAULT_QOS);
mqttClient.publish(config.getTopic(), mqttMessage);
log.info("[execute][message({}) 发送成功MQTT 服务器: {}topic: {}]",
message.getId(), config.getUrl(), config.getTopic());
} catch (Exception e) {
log.error("[execute][message({}) 发送失败MQTT 服务器: {}]",
message.getId(), config.getUrl(), e);
throw e;
}
}
@Override
protected MqttClient initProducer(IotDataSinkMqttConfig config) throws Exception {
// 1. 创建 MqttClient使用内存持久化
// 拼接时间戳后缀,避免多个规则指向同一 Broker 时 clientId 冲突
String clientId = config.getClientId() + "_" + System.currentTimeMillis();
MqttClient mqttClient = new MqttClient(config.getUrl(), clientId, new MemoryPersistence());
// 2. 连接到 MQTT Broker
mqttClient.connect(buildConnectOptions(config));
log.info("[initProducer][MQTT 客户端创建并连接成功,服务器: {}clientId: {}]",
config.getUrl(), clientId);
return mqttClient;
}
@Override
protected void closeProducer(MqttClient producer) throws Exception {
if (producer.isConnected()) {
producer.disconnect();
}
producer.close();
}
/**
* MQTT
*
* @param config MQTT
* @return
*/
private MqttConnectOptions buildConnectOptions(IotDataSinkMqttConfig config) {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(10); // 连接超时 10 秒
options.setKeepAliveInterval(20); // 心跳间隔 20 秒
// 注意:不开启 automaticReconnect由 execute() 中的 isConnected() 手动控制重连,避免竞争
// 设置认证信息(如果有)
if (config.getUsername() != null) {
options.setUserName(config.getUsername());
}
if (config.getPassword() != null) {
options.setPassword(config.getPassword().toCharArray());
}
return options;
}
}

View File

@ -97,10 +97,11 @@ public class IotWebSocketDataRuleAction extends
}
}
// TODO @puhui999为什么这里要加锁呀
/**
* 使线
*
* 线线使 + 线
*
* @param webSocketClient WebSocket
* @param config
*/

View File

@ -33,6 +33,13 @@ public class IotWebSocketClient {
private volatile WebSocket webSocket;
private final AtomicBoolean connected = new AtomicBoolean(false);
/**
* WebSocket
*
* @see <a href="https://tools.ietf.org/html/rfc6455#section-7.4.1">RFC 6455 - </a>
*/
private static final int NORMAL_CLOSURE_STATUS = 1000;
public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) {
this.serverUrl = serverUrl;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_CONNECT_TIMEOUT_MS;
@ -123,9 +130,8 @@ public class IotWebSocketClient {
public void close() {
try {
if (webSocket != null) {
// 发送正常关闭帧,状态码 1000 表示正常关闭
// TODO @puhui999有没 1000 的枚举哈?在 okhttp 里
webSocket.close(1000, "客户端主动关闭");
// 发送正常关闭帧
webSocket.close(NORMAL_CLOSURE_STATUS, "客户端主动关闭");
webSocket = null;
}
if (okHttpClient != null) {

View File

@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.condition.IotCurrentTimeConditionMatcher;
import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotTimerConditionEvaluator;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
@ -22,6 +23,7 @@ import java.util.List;
* @author HUIHUI
*/
@Slf4j
@UtilityClass
public class IotSceneRuleTimeHelper {
/**
@ -34,11 +36,6 @@ public class IotSceneRuleTimeHelper {
*/
private static final DateTimeFormatter TIME_FORMATTER_SHORT = DateTimeFormatter.ofPattern("HH:mm");
// TODO @puhui999可以使用 lombok 简化
private IotSceneRuleTimeHelper() {
// 工具类,禁止实例化
}
/**
*
*
@ -136,7 +133,6 @@ public class IotSceneRuleTimeHelper {
}
long startTimestamp = Long.parseLong(timestampRange.get(0).trim());
long endTimestamp = Long.parseLong(timestampRange.get(1).trim());
// TODO @puhui999hutool 里,看看有没 between 方法
return currentTimestamp >= startTimestamp && currentTimestamp <= endTimestamp;
}
@ -188,7 +184,6 @@ public class IotSceneRuleTimeHelper {
}
LocalTime startTime = parseTime(timeRange.get(0).trim());
LocalTime endTime = parseTime(timeRange.get(1).trim());
// TODO @puhui999hutool 里,看看有没 between 方法
return !currentTime.isBefore(startTime) && !currentTime.isAfter(endTime);
}

View File

@ -1,21 +1,34 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.action;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertConfigDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.DictTypeConstants;
import cn.iocoder.yudao.module.iot.enums.alert.IotAlertReceiveTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleActionTypeEnum;
import cn.iocoder.yudao.module.iot.service.alert.IotAlertConfigService;
import cn.iocoder.yudao.module.iot.service.alert.IotAlertRecordService;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.system.api.mail.MailSendApi;
import cn.iocoder.yudao.module.system.api.mail.dto.MailSendSingleToUserReqDTO;
import cn.iocoder.yudao.module.system.api.notify.NotifyMessageSendApi;
import cn.iocoder.yudao.module.system.api.notify.dto.NotifySendSingleToUserReqDTO;
import cn.iocoder.yudao.module.system.api.sms.SmsSendApi;
import cn.iocoder.yudao.module.system.api.sms.dto.send.SmsSendSingleToUserReqDTO;
import jakarta.annotation.Nullable;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* IoT {@link IotSceneRuleAction}
@ -23,12 +36,15 @@ import java.util.List;
* @author
*/
@Component
@Slf4j
public class IotAlertTriggerSceneRuleAction implements IotSceneRuleAction {
@Resource
private IotAlertConfigService alertConfigService;
@Resource
private IotAlertRecordService alertRecordService;
@Resource
private IotDeviceService deviceService;
@Resource
private SmsSendApi smsSendApi;
@ -45,19 +61,67 @@ public class IotAlertTriggerSceneRuleAction implements IotSceneRuleAction {
if (CollUtil.isEmpty(alertConfigs)) {
return;
}
// 获得设备信息
IotDeviceDO device = message != null ? deviceService.getDeviceFromCache(message.getDeviceId()) : null;
alertConfigs.forEach(alertConfig -> {
// 记录告警记录传递场景规则ID
alertRecordService.createAlertRecord(alertConfig, rule.getId(), message);
// 创建告警记录
alertRecordService.createAlertRecord(alertConfig, rule.getId(), message, device);
// 发送告警消息
sendAlertMessage(alertConfig, message);
sendAlertMessage(alertConfig, message, device);
});
}
private void sendAlertMessage(IotAlertConfigDO config, IotDeviceMessage deviceMessage) {
// TODO @芋艿:等场景联动开发完,再实现
// TODO @芋艿:短信
// TODO @芋艿:邮箱
// TODO @芋艿:站内信
private void sendAlertMessage(IotAlertConfigDO config,
@Nullable IotDeviceMessage deviceMessage,
@Nullable IotDeviceDO device) {
if (CollUtil.isEmpty(config.getReceiveUserIds()) || CollUtil.isEmpty(config.getReceiveTypes())) {
return;
}
Map<String, Object> templateParams = buildTemplateParams(config, deviceMessage, device);
config.getReceiveUserIds().forEach(userId ->
config.getReceiveTypes().forEach(receiveType -> sendAlertMessageToUser(userId, receiveType, templateParams)));
}
/**
*
*/
private void sendAlertMessageToUser(Long userId, Integer receiveType, Map<String, Object> templateParams) {
IotAlertReceiveTypeEnum typeEnum = IotAlertReceiveTypeEnum.of(receiveType);
if (typeEnum == null) {
return;
}
try {
switch (typeEnum) {
case SMS:
smsSendApi.sendSingleSmsToAdmin(new SmsSendSingleToUserReqDTO().setUserId(userId)
.setTemplateCode(typeEnum.getTemplateCode()).setTemplateParams(templateParams));
break;
case MAIL:
mailSendApi.sendSingleMailToAdmin(new MailSendSingleToUserReqDTO().setUserId(userId)
.setTemplateCode(typeEnum.getTemplateCode()).setTemplateParams(templateParams));
break;
case NOTIFY:
notifyMessageSendApi.sendSingleMessageToAdmin(new NotifySendSingleToUserReqDTO().setUserId(userId)
.setTemplateCode(typeEnum.getTemplateCode()).setTemplateParams(templateParams));
break;
}
} catch (Exception ex) {
log.error("[sendAlertMessageToUser][用户({}) 模板参数({}) 发送 {} 告警失败]",
userId, templateParams, typeEnum, ex);
}
}
private Map<String, Object> buildTemplateParams(IotAlertConfigDO config,
@Nullable IotDeviceMessage deviceMessage,
@Nullable IotDeviceDO device) {
Map<String, Object> params = new HashMap<>();
params.put("configName", config.getName());
params.put("configDescription", config.getDescription());
params.put("configLevel", DictFrameworkUtils.parseDictDataLabel(DictTypeConstants.ALERT_LEVEL, config.getLevel()));
params.put("deviceName", device != null ? device.getDeviceName() : null);
params.put("reportTime", deviceMessage != null
? LocalDateTimeUtil.format(deviceMessage.getReportTime(), DatePattern.NORM_DATETIME_PATTERN) : null);
return params;
}
@Override

View File

@ -142,6 +142,7 @@ public final class IotSceneRuleMatcherHelper {
* @param trigger
* @return
*/
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public static boolean isBasicTriggerValid(IotSceneRuleDO.Trigger trigger) {
return trigger != null && trigger.getType() != null;
}
@ -152,6 +153,7 @@ public final class IotSceneRuleMatcherHelper {
* @param trigger
* @return
*/
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public static boolean isTriggerOperatorAndValueValid(IotSceneRuleDO.Trigger trigger) {
return StrUtil.isNotBlank(trigger.getOperator()) && StrUtil.isNotBlank(trigger.getValue());
}
@ -163,7 +165,9 @@ public final class IotSceneRuleMatcherHelper {
* @param trigger
*/
public static void logTriggerMatchSuccess(IotDeviceMessage message, IotSceneRuleDO.Trigger trigger) {
log.debug("[isMatched][message({}) trigger({}) 匹配触发器成功]", message.getRequestId(), trigger.getType());
log.debug("[isMatched][message({}) trigger({}) 匹配触发器成功]",
message != null ? message.getRequestId() : null,
trigger != null ? trigger.getType() : null);
}
/**
@ -174,7 +178,10 @@ public final class IotSceneRuleMatcherHelper {
* @param reason
*/
public static void logTriggerMatchFailure(IotDeviceMessage message, IotSceneRuleDO.Trigger trigger, String reason) {
log.debug("[isMatched][message({}) trigger({}) reason({}) 匹配触发器失败]", message.getRequestId(), trigger.getType(), reason);
log.debug("[isMatched][message({}) trigger({}) reason({}) 匹配触发器失败]",
message != null ? message.getRequestId() : null,
trigger != null ? trigger.getType() : null,
reason);
}
// ========== 【条件】相关工具方法 ==========
@ -185,6 +192,7 @@ public final class IotSceneRuleMatcherHelper {
* @param condition
* @return
*/
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public static boolean isBasicConditionValid(IotSceneRuleDO.TriggerCondition condition) {
return condition != null && condition.getType() != null;
}
@ -195,6 +203,7 @@ public final class IotSceneRuleMatcherHelper {
* @param condition
* @return
*/
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public static boolean isConditionOperatorAndParamValid(IotSceneRuleDO.TriggerCondition condition) {
return StrUtil.isNotBlank(condition.getOperator()) && StrUtil.isNotBlank(condition.getParam());
}
@ -206,7 +215,9 @@ public final class IotSceneRuleMatcherHelper {
* @param condition
*/
public static void logConditionMatchSuccess(IotDeviceMessage message, IotSceneRuleDO.TriggerCondition condition) {
log.debug("[isMatched][message({}) condition({}) 匹配条件成功]", message.getRequestId(), condition.getType());
log.debug("[isMatched][message({}) condition({}) 匹配条件成功]",
message != null ? message.getRequestId() : null,
condition != null ? condition.getType() : null);
}
/**
@ -217,7 +228,10 @@ public final class IotSceneRuleMatcherHelper {
* @param reason
*/
public static void logConditionMatchFailure(IotDeviceMessage message, IotSceneRuleDO.TriggerCondition condition, String reason) {
log.debug("[isMatched][message({}) condition({}) reason({}) 匹配条件失败]", message.getRequestId(), condition.getType(), reason);
log.debug("[isMatched][message({}) condition({}) reason({}) 匹配条件失败]",
message != null ? message.getRequestId() : null,
condition != null ? condition.getType() : null,
reason);
}
// ========== 【通用】工具方法 ==========
@ -229,6 +243,7 @@ public final class IotSceneRuleMatcherHelper {
* @param actualIdentifier
* @return
*/
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public static boolean isIdentifierMatched(String expectedIdentifier, String actualIdentifier) {
return StrUtil.isNotBlank(expectedIdentifier) && expectedIdentifier.equals(actualIdentifier);
}

View File

@ -30,10 +30,10 @@ public class IotDevicePropertyConditionMatcher implements IotSceneRuleConditionM
return false;
}
// 1.2 检查标识符是否匹配
String messageIdentifier = IotDeviceMessageUtils.getIdentifier(message);
if (!IotSceneRuleMatcherHelper.isIdentifierMatched(condition.getIdentifier(), messageIdentifier)) {
IotSceneRuleMatcherHelper.logConditionMatchFailure(message, condition, "标识符不匹配,期望: " + condition.getIdentifier() + ", 实际: " + messageIdentifier);
// 1.2 检查消息中是否包含条件指定的属性标识符
// 注意:属性上报可能同时上报多个属性,所以需要判断 condition.getIdentifier() 是否在 message 的 params 中
if (IotDeviceMessageUtils.notContainsIdentifier(message, condition.getIdentifier())) {
IotSceneRuleMatcherHelper.logConditionMatchFailure(message, condition, "消息中不包含属性: " + condition.getIdentifier());
return false;
}

View File

@ -1,14 +1,20 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.trigger;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
/**
*
*
@ -45,16 +51,22 @@ public class IotDeviceEventPostTriggerMatcher implements IotSceneRuleTriggerMatc
return false;
}
// 2. 对于事件触发器,通常不需要检查操作符和值,只要事件发生即匹配
// 但如果配置了操作符和值,则需要进行条件匹配
// 2. 对于事件触发器,通常不需要检查操作符和值,只要事件发生即匹配。但如果配置了操作符和值,则需要进行条件匹配
if (StrUtil.isNotBlank(trigger.getOperator()) && StrUtil.isNotBlank(trigger.getValue())) {
Object eventParams = message.getParams();
if (eventParams == null) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中事件参数为空");
Object eventValue = IotDeviceMessageUtils.extractEventValue(message);
if (eventValue == null) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中事件为空");
return false;
}
boolean matched = IotSceneRuleMatcherHelper.evaluateCondition(eventParams, trigger.getOperator(), trigger.getValue());
boolean matched;
if (eventValue instanceof Map || eventValue instanceof Collection) {
// 结构体/数组事件值:把比较值按 JSON 解析后整体相等比较HashMap.equals 与 key 顺序无关;仅支持 = / !=
matched = matchStructuredEventValue(eventValue, trigger);
} else {
// 标量事件值(字符串/数字/布尔):走 SpEL支持 = != > < 等运算
matched = IotSceneRuleMatcherHelper.evaluateCondition(eventValue, trigger.getOperator(), trigger.getValue());
}
if (!matched) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "事件数据条件不匹配");
return false;
@ -65,6 +77,16 @@ public class IotDeviceEventPostTriggerMatcher implements IotSceneRuleTriggerMatc
return true;
}
private boolean matchStructuredEventValue(Object eventValue, IotSceneRuleDO.Trigger trigger) {
// 比较值非合法 JSON 时返回 null结构体场景下视为不匹配
Object expected = JsonUtils.parseObjectQuietly(trigger.getValue(), Object.class);
if (expected == null) {
return false;
}
boolean equal = Objects.equals(eventValue, expected);
return IotSceneRuleConditionOperatorEnum.NOT_EQUALS.getOperator().equals(trigger.getOperator()) != equal;
}
@Override
public int getPriority() {
return 30; // 中等优先级

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.trigger;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@ -73,8 +74,7 @@ public class IotDeviceServiceInvokeTriggerMatcher implements IotSceneRuleTrigger
private boolean matchParameterCondition(IotDeviceMessage message, IotSceneRuleDO.Trigger trigger) {
// 1.1 从消息中提取服务调用的输入参数
Map<String, Object> inputParams = IotDeviceMessageUtils.extractServiceInputParams(message);
// TODO @puhui999要考虑 empty 的情况么?
if (inputParams == null) {
if (CollUtil.isEmpty(inputParams)) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中缺少服务输入参数");
return false;
}

View File

@ -16,8 +16,8 @@
identifier NCHAR(100),
request_id NCHAR(50),
method NCHAR(100),
params NCHAR(2048),
data NCHAR(2048),
params VARCHAR(8192),
data VARCHAR(8192),
code INT,
msg NCHAR(256)
) TAGS (
@ -38,7 +38,7 @@
USING device_message
TAGS (#{deviceId})
VALUES (
NOW, #{id}, #{reportTime}, #{tenantId}, #{serverId},
#{ts}, #{id}, #{reportTime}, #{tenantId}, #{serverId},
#{upstream}, #{reply}, #{identifier}, #{requestId}, #{method},
#{params}, #{data}, #{code}, #{msg}
)

View File

@ -55,7 +55,7 @@
</foreach>
)
VALUES
(NOW, #{reportTime},
(#{ts}, #{reportTime},
<foreach item="value" collection="properties.values" separator=",">
#{value}
</foreach>

View File

@ -0,0 +1,276 @@
package cn.iocoder.yudao.module.iot.service.device.message;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDeviceMessageMapper;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.property.IotDevicePropertyService;
import cn.iocoder.yudao.module.iot.service.ota.IotOtaTaskRecordService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.*;
/**
* {@link IotDeviceMessageServiceImpl}
*
* TDengine embedded mapper service mock
* handleUpstreamDeviceMessage sendDeviceMessage SpringUtil.getBean
* createDeviceLogAsync
*
* @author
*/
public class IotDeviceMessageServiceImplTest extends BaseMockitoUnitTest {
@InjectMocks
private IotDeviceMessageServiceImpl service;
@Mock
private IotDeviceService deviceService;
@Mock
private IotDevicePropertyService devicePropertyService;
@Mock
private IotOtaTaskRecordService otaTaskRecordService;
@Mock
private IotDeviceMessageMapper deviceMessageMapper;
@Mock
private IotDeviceMessageProducer deviceMessageProducer;
// ========== defineDeviceMessageStable ==========
@Test
public void testDefineDeviceMessageStable_whenTableExists_skipCreate() {
// 准备showSTable 返回非空 → 表已存在
when(deviceMessageMapper.showSTable()).thenReturn("device_message");
// 调用
service.defineDeviceMessageStable();
// 断言:跳过 createSTable
verify(deviceMessageMapper, never()).createSTable();
}
@Test
public void testDefineDeviceMessageStable_whenTableMissing_create() {
// 准备showSTable 返回空 → 表不存在
when(deviceMessageMapper.showSTable()).thenReturn("");
// 调用
service.defineDeviceMessageStable();
// 断言:触发 createSTable
verify(deviceMessageMapper, times(1)).createSTable();
}
// ========== createDeviceLogAsync ==========
@Test
public void testCreateDeviceLogAsync_tsFallback_whenNull() {
// 准备:构造一条 ts 为 null 的消息
IotDeviceMessage message = buildMessage(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod());
long before = System.currentTimeMillis();
// 调用
service.createDeviceLogAsync(message);
long after = System.currentTimeMillis();
// 断言mapper.insert 接收到的 messageDO 已被填上 ts值在 [before, after] 区间
ArgumentCaptor<IotDeviceMessageDO> captor = ArgumentCaptor.forClass(IotDeviceMessageDO.class);
verify(deviceMessageMapper).insert(captor.capture());
Long actualTs = captor.getValue().getTs();
assertNotNull(actualTs, "ts 不应为空");
assertTrue(actualTs >= before && actualTs <= after,
"ts 应在调用前后区间内; 实际 = " + actualTs);
}
@Test
public void testCreateDeviceLogAsync_swallowMapperException() {
// 准备mapper.insert 抛异常,验证 @Async 方法内部 try/catch 兜底,不向上抛
IotDeviceMessage message = buildMessage(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod());
doThrow(new RuntimeException("DB unavailable")).when(deviceMessageMapper).insert(any());
// 调用 & 断言
assertDoesNotThrow(() -> service.createDeviceLogAsync(message));
verify(deviceMessageMapper).insert(any(IotDeviceMessageDO.class));
}
// ========== sendDeviceMessage ==========
@Test
public void testSendDeviceMessage_upstream_publishToProducer() {
// 准备上行消息PROPERTY_POST
IotDeviceDO device = buildDevice();
IotDeviceMessage message = buildMessage(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod());
// 调用
IotDeviceMessage result = service.sendDeviceMessage(message, device);
// 断言:走 producer.sendDeviceMessage不进入下行链路
assertSame(message, result);
verify(deviceMessageProducer, times(1)).sendDeviceMessage(message);
verify(deviceMessageProducer, never()).sendDeviceMessageToGateway(any(), any());
verify(devicePropertyService, never()).getDeviceServerId(any());
}
@Test
public void testSendDeviceMessage_downstream_serverIdMissing_throwException() {
// 准备下行消息SERVICE_INVOKEdevicePropertyService 也查不到 serverId
IotDeviceDO device = buildDevice();
IotDeviceMessage message = buildMessage(IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod());
when(devicePropertyService.getDeviceServerId(device.getId())).thenReturn(null);
// 调用 & 断言:抛 DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL
ServiceException ex = assertThrows(ServiceException.class,
() -> service.sendDeviceMessage(message, device));
assertEquals(1_050_003_007, ex.getCode().intValue());
verify(deviceMessageProducer, never()).sendDeviceMessageToGateway(any(), any());
}
// ========== getDeviceMessagePage ==========
@Test
public void testGetDeviceMessagePage_normal() {
// 准备
IotDeviceMessagePageReqVO reqVO = new IotDeviceMessagePageReqVO();
reqVO.setPageNo(1);
reqVO.setPageSize(10);
IotDeviceMessageDO record = new IotDeviceMessageDO().setId("msg-1");
Page<IotDeviceMessageDO> page = new Page<>(1, 10, 1L);
page.setRecords(Collections.singletonList(record));
when(deviceMessageMapper.selectPage(any(), eq(reqVO))).thenReturn(page);
// 调用
PageResult<IotDeviceMessageDO> result = service.getDeviceMessagePage(reqVO);
// 断言
assertEquals(1L, result.getTotal());
assertEquals(1, result.getList().size());
assertEquals("msg-1", result.getList().get(0).getId());
}
@Test
public void testGetDeviceMessagePage_whenTableMissing_returnEmpty() {
// 准备mapper 抛 "Table does not exist" → 视为表未创建,返回空结果
IotDeviceMessagePageReqVO reqVO = new IotDeviceMessagePageReqVO();
reqVO.setPageNo(1);
reqVO.setPageSize(10);
when(deviceMessageMapper.selectPage(any(), any()))
.thenThrow(new RuntimeException("Table does not exist"));
// 调用
PageResult<IotDeviceMessageDO> result = service.getDeviceMessagePage(reqVO);
// 断言
assertEquals(0L, result.getTotal());
assertTrue(result.getList().isEmpty());
}
@Test
public void testGetDeviceMessagePage_otherException_rethrow() {
// 准备mapper 抛非「表不存在」的异常 → 应向上抛
IotDeviceMessagePageReqVO reqVO = new IotDeviceMessagePageReqVO();
reqVO.setPageNo(1);
reqVO.setPageSize(10);
when(deviceMessageMapper.selectPage(any(), any()))
.thenThrow(new RuntimeException("Connection refused"));
// 调用 & 断言
assertThrows(RuntimeException.class, () -> service.getDeviceMessagePage(reqVO));
}
// ========== getDeviceMessageListByRequestIdsAndReply ==========
@Test
public void testGetDeviceMessageListByRequestIdsAndReply_emptyIds_returnEmpty() {
// 调用 & 断言requestIds 为空直接返回空列表,不查 DB
List<IotDeviceMessageDO> result = service.getDeviceMessageListByRequestIdsAndReply(
1L, Collections.emptyList(), true);
assertTrue(result.isEmpty());
verify(deviceMessageMapper, never()).selectListByRequestIdsAndReply(any(), any(), any());
}
@Test
public void testGetDeviceMessageListByRequestIdsAndReply_normal_delegateToMapper() {
// 准备
List<String> requestIds = Collections.singletonList("req-1");
IotDeviceMessageDO record = new IotDeviceMessageDO().setId("msg-1");
when(deviceMessageMapper.selectListByRequestIdsAndReply(1L, requestIds, true))
.thenReturn(Collections.singletonList(record));
// 调用
List<IotDeviceMessageDO> result = service.getDeviceMessageListByRequestIdsAndReply(
1L, requestIds, true);
// 断言
assertEquals(1, result.size());
assertEquals("msg-1", result.get(0).getId());
}
// ========== getDeviceMessageCount ==========
@Test
public void testGetDeviceMessageCount_whenCreateTimeNull_passNullToMapper() {
// 准备
when(deviceMessageMapper.selectCountByCreateTime(isNull())).thenReturn(123L);
// 调用
Long count = service.getDeviceMessageCount(null);
// 断言
assertEquals(123L, count);
verify(deviceMessageMapper).selectCountByCreateTime(isNull());
}
@Test
public void testGetDeviceMessageCount_withCreateTime_passEpochMilli() {
// 准备service 内部用 LocalDateTimeUtil.toEpochMilli 做转换,断言时也用同函数得到期望值
LocalDateTime createTime = LocalDateTime.of(2026, 1, 1, 0, 0);
long expectedMs = LocalDateTimeUtil.toEpochMilli(createTime);
when(deviceMessageMapper.selectCountByCreateTime(expectedMs)).thenReturn(456L);
// 调用
Long count = service.getDeviceMessageCount(createTime);
// 断言
assertEquals(456L, count);
}
// ========== 辅助方法 ==========
/** 构造一条最简消息(指定 method 决定上下行分支) */
private IotDeviceMessage buildMessage(String method) {
IotDeviceMessage message = new IotDeviceMessage();
message.setId("msg-1");
message.setDeviceId(2L);
message.setMethod(method);
message.setParams(new HashMap<>());
return message;
}
/** 构造最简设备 */
private IotDeviceDO buildDevice() {
return IotDeviceDO.builder().id(2L).build();
}
}

View File

@ -0,0 +1,204 @@
package cn.iocoder.yudao.module.iot.service.device.property;
import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.model.ThingModelProperty;
import cn.iocoder.yudao.module.iot.dal.redis.device.DevicePropertyRedisDAO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDevicePropertyMapper;
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum;
import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
/**
* {@link IotDevicePropertyServiceImpl}
*
* @author
*/
public class IotDevicePropertyServiceImplTest extends BaseMockitoUnitTest {
@InjectMocks
private IotDevicePropertyServiceImpl service;
@Mock
private IotThingModelService thingModelService;
@Mock
private IotDevicePropertyMapper devicePropertyMapper;
@Mock
private DevicePropertyRedisDAO deviceDataRedisDAO;
@Test
public void testSaveDeviceProperty_identifierCaseInsensitive() {
// 准备参数:物模型 identifier 是 "LightStatus",设备上报的 key 是 "LIGHTSTATUS"(全大写)
IotDeviceDO device = buildDevice();
IotThingModelDO thingModel = buildThingModel("LightStatus", IotDataSpecsDataTypeEnum.INT.getDataType());
Map<String, Object> params = new HashMap<>();
params.put("LIGHTSTATUS", 100);
IotDeviceMessage message = buildMessage(params);
// mock 行为
when(thingModelService.getThingModelListByProductIdFromCache(device.getProductId()))
.thenReturn(singletonList(thingModel));
// 调用
service.saveDeviceProperty(device, message);
// 断言properties 落库 / 入缓存时 key 应为物模型 identifier "LightStatus",而不是上报的 "LIGHTSTATUS"
Map<String, Object> dbProperties = captureMapperInsertProperties();
assertTrue(dbProperties.containsKey("LightStatus"));
assertFalse(dbProperties.containsKey("LIGHTSTATUS"));
assertEquals(100, dbProperties.get("LightStatus"));
Map<String, IotDevicePropertyDO> redisProperties = captureRedisPutAllProperties(device.getId());
assertTrue(redisProperties.containsKey("LightStatus"));
assertFalse(redisProperties.containsKey("LIGHTSTATUS"));
}
@Test
public void testSaveDeviceProperty_identifierNotInThingModel() {
// 准备参数:上报的 key 在物模型里完全不存在(连忽略大小写都匹配不到)
IotDeviceDO device = buildDevice();
IotThingModelDO thingModel = buildThingModel("LightStatus", IotDataSpecsDataTypeEnum.INT.getDataType());
Map<String, Object> params = new HashMap<>();
params.put("UnknownProperty", 1);
IotDeviceMessage message = buildMessage(params);
// mock 行为
when(thingModelService.getThingModelListByProductIdFromCache(device.getProductId()))
.thenReturn(singletonList(thingModel));
// 调用
service.saveDeviceProperty(device, message);
// 断言:没有合法属性,不会写入 TDengine 与 Redis
verify(devicePropertyMapper, never()).insert(any(), any(), anyLong(), anyLong());
verify(deviceDataRedisDAO, never()).putAll(anyLong(), any());
}
@Test
public void testSaveDeviceProperty_boolFromBooleanTrue() {
// 准备参数:物模型为 BOOL设备上报原生 boolean true
assertBoolValueConvertedToByte(true, (byte) 1);
}
@Test
public void testSaveDeviceProperty_boolFromBooleanFalse() {
// 准备参数:物模型为 BOOL设备上报原生 boolean false
assertBoolValueConvertedToByte(false, (byte) 0);
}
@Test
public void testSaveDeviceProperty_boolFromStringTrue() {
// 准备参数:物模型为 BOOL设备上报字符串 "true"
assertBoolValueConvertedToByte("true", (byte) 1);
}
@Test
public void testSaveDeviceProperty_boolFromStringFalse() {
// 准备参数:物模型为 BOOL设备上报字符串 "false"
assertBoolValueConvertedToByte("false", (byte) 0);
}
@Test
public void testSaveDeviceProperty_boolFromNumberOne() {
// 准备参数:物模型为 BOOL设备上报数字 1
assertBoolValueConvertedToByte(1, (byte) 1);
}
@Test
public void testSaveDeviceProperty_boolFromNumberZero() {
// 准备参数:物模型为 BOOL设备上报数字 0
assertBoolValueConvertedToByte(0, (byte) 0);
}
/**
* BOOL properties Map
*/
private void assertBoolValueConvertedToByte(Object reportedValue, byte expected) {
// 准备参数
IotDeviceDO device = buildDevice();
IotThingModelDO thingModel = buildThingModel("PowerSwitch", IotDataSpecsDataTypeEnum.BOOL.getDataType());
Map<String, Object> params = new HashMap<>();
params.put("PowerSwitch", reportedValue);
IotDeviceMessage message = buildMessage(params);
// mock 行为
when(thingModelService.getThingModelListByProductIdFromCache(device.getProductId()))
.thenReturn(singletonList(thingModel));
// 调用:不能抛异常
assertDoesNotThrow(() -> service.saveDeviceProperty(device, message));
// 断言:写入的 value 是 byte 类型,且值匹配
Map<String, Object> dbProperties = captureMapperInsertProperties();
Object actual = dbProperties.get("PowerSwitch");
assertTrue(actual instanceof Byte, "BOOL 属性应被转为 Byte 类型,实际为 " + (actual == null ? "null" : actual.getClass()));
assertEquals(expected, actual);
}
// ========== 辅助方法 ==========
/**
* IotDeviceDO id productId
*/
private IotDeviceDO buildDevice() {
return IotDeviceDO.builder().id(1L).productId(2L).build();
}
/**
* saveDeviceProperty identifier + property.dataType
*/
private IotThingModelDO buildThingModel(String identifier, String dataType) {
ThingModelProperty property = new ThingModelProperty();
property.setIdentifier(identifier);
property.setDataType(dataType);
return IotThingModelDO.builder().identifier(identifier).property(property).build();
}
/**
*
*/
private IotDeviceMessage buildMessage(Map<String, Object> params) {
IotDeviceMessage message = new IotDeviceMessage();
message.setMethod(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod());
message.setParams(params);
message.setReportTime(LocalDateTime.now());
return message;
}
/**
* mapper.insert properties
*/
@SuppressWarnings("unchecked")
private Map<String, Object> captureMapperInsertProperties() {
ArgumentCaptor<Map<String, Object>> captor = ArgumentCaptor.forClass(Map.class);
verify(devicePropertyMapper).insert(any(IotDeviceDO.class), captor.capture(), anyLong(), anyLong());
return captor.getValue();
}
/**
* redisDAO.putAll properties
*/
@SuppressWarnings("unchecked")
private Map<String, IotDevicePropertyDO> captureRedisPutAllProperties(Long deviceId) {
ArgumentCaptor<Map<String, IotDevicePropertyDO>> captor = ArgumentCaptor.forClass(Map.class);
verify(deviceDataRedisDAO).putAll(eq(deviceId), captor.capture());
return captor.getValue();
}
}

View File

@ -0,0 +1,235 @@
package cn.iocoder.yudao.module.iot.service.rule.data;
import cn.hutool.core.map.MapUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.test.core.ut.BaseDbUnitTest;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataRuleDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataSinkDO;
import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataRuleMapper;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import cn.iocoder.yudao.module.iot.service.rule.data.action.IotDataRuleAction;
import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import java.util.Map;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomLongId;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomPojo;
import static java.util.Collections.singletonList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* {@link IotDataRuleServiceImpl}
*
* @author
*/
@Import(IotDataRuleServiceImpl.class)
class IotDataRuleServiceImplTest extends BaseDbUnitTest {
@Resource
private IotDataRuleServiceImpl dataRuleService;
@Resource
private IotDataRuleMapper dataRuleMapper;
@MockitoBean
private IotDataSinkService dataSinkService;
@MockitoBean
private IotDataRuleAction dataRuleAction;
@MockitoBean
private IotProductService productService;
@MockitoBean
private IotDeviceService deviceService;
@MockitoBean
private IotThingModelService thingModelService;
@Test
public void testExecuteDataRule_propertyPost_singleIdentifierMatched() {
// 准备参数
Long deviceId = randomLongId();
String identifier = "temperature";
IotDeviceMessage message = createPropertyPostMessage(deviceId,
MapUtil.<String, Object>builder().put(identifier, 25.5).build());
// mock 数据:插入一条限定 identifier=temperature 的规则
Long sinkId = randomLongId();
insertEnabledRule(deviceId, IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), identifier, sinkId);
// mock 方法
IotDataSinkDO sink = mockEnabledSink(sinkId);
// 调用
dataRuleService.executeDataRule(message);
// 断言sink action 被调用一次
verify(dataRuleAction).execute(eq(message), eq(sink));
}
@Test
public void testExecuteDataRule_propertyPost_multiIdentifierOneMatched() {
// 准备参数:上报 {temperature, humidity},规则只限定 humidity
Long deviceId = randomLongId();
IotDeviceMessage message = createPropertyPostMessage(deviceId,
MapUtil.<String, Object>builder().put("temperature", 25.5).put("humidity", 60).build());
// mock 数据
Long sinkId = randomLongId();
insertEnabledRule(deviceId, IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), "humidity", sinkId);
// mock 方法
IotDataSinkDO sink = mockEnabledSink(sinkId);
// 调用
dataRuleService.executeDataRule(message);
// 断言
verify(dataRuleAction).execute(eq(message), eq(sink));
}
@Test
public void testExecuteDataRule_propertyPost_multiIdentifierDeduped() {
// 准备参数:上报 {temperature, humidity},规则 identifier=null 不限定属性
Long deviceId = randomLongId();
IotDeviceMessage message = createPropertyPostMessage(deviceId,
MapUtil.<String, Object>builder().put("temperature", 25.5).put("humidity", 60).build());
// mock 数据identifier=null 时两个属性 key 都会命中同一条规则,需在 sink 调用前去重
Long sinkId = randomLongId();
insertEnabledRule(deviceId, IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), null, sinkId);
// mock 方法
IotDataSinkDO sink = mockEnabledSink(sinkId);
// 调用
dataRuleService.executeDataRule(message);
// 断言:去重后只触发一次,而不是 2 次
verify(dataRuleAction).execute(eq(message), eq(sink));
}
@Test
public void testExecuteDataRule_propertyPost_multiRuleSameSinkDeduped() {
// 准备参数:上报 {temperature, humidity},两条规则分别命中不同 identifier但都指向同一 sink
Long deviceId = randomLongId();
IotDeviceMessage message = createPropertyPostMessage(deviceId,
MapUtil.<String, Object>builder().put("temperature", 25.5).put("humidity", 60).build());
// mock 数据插入两条规则identifier 分别为 temperature 与 humiditysinkId 相同
Long sinkId = randomLongId();
insertEnabledRule(deviceId, IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), "temperature", sinkId);
insertEnabledRule(deviceId, IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), "humidity", sinkId);
// mock 方法
IotDataSinkDO sink = mockEnabledSink(sinkId);
// 调用
dataRuleService.executeDataRule(message);
// 断言跨规则去重后sink action 只触发一次,而不是 2 次
verify(dataRuleAction).execute(eq(message), eq(sink));
}
@Test
public void testExecuteDataRule_propertyPost_emptyParamsMatchesWildcardRule() {
// 准备参数:上报空属性,规则 identifier=null 不限定属性,按"任意 property report 都同步"语义应命中
Long deviceId = randomLongId();
IotDeviceMessage message = createPropertyPostMessage(deviceId,
MapUtil.<String, Object>builder().build());
// mock 数据
Long sinkId = randomLongId();
insertEnabledRule(deviceId, IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), null, sinkId);
// mock 方法
IotDataSinkDO sink = mockEnabledSink(sinkId);
// 调用
dataRuleService.executeDataRule(message);
// 断言
verify(dataRuleAction).execute(eq(message), eq(sink));
}
@Test
public void testExecuteDataRule_propertyPost_noIdentifierMatched() {
// 准备参数:上报 {temperature},规则限定 humidity
Long deviceId = randomLongId();
IotDeviceMessage message = createPropertyPostMessage(deviceId,
MapUtil.<String, Object>builder().put("temperature", 25.5).build());
// mock 数据
Long sinkId = randomLongId();
insertEnabledRule(deviceId, IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), "humidity", sinkId);
// 调用
dataRuleService.executeDataRule(message);
// 断言sink action 不应被调用
verify(dataRuleAction, never()).execute(any(), any());
}
@Test
public void testExecuteDataRule_eventPost_singleIdentifierMatched() {
// 准备参数:事件触发器走单 identifier 路径(与改动前行为保持一致)
Long deviceId = randomLongId();
String identifier = "alarm";
IotDeviceMessage message = randomPojo(IotDeviceMessage.class, o -> {
o.setDeviceId(deviceId);
o.setMethod(IotDeviceMessageMethodEnum.EVENT_POST.getMethod());
o.setParams(MapUtil.<String, Object>builder()
.put("identifier", identifier).put("value", "fired").build());
});
// mock 数据
Long sinkId = randomLongId();
insertEnabledRule(deviceId, IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), identifier, sinkId);
// mock 方法
IotDataSinkDO sink = mockEnabledSink(sinkId);
// 调用
dataRuleService.executeDataRule(message);
// 断言
verify(dataRuleAction).execute(eq(message), eq(sink));
}
// ========== 辅助方法 ==========
private IotDeviceMessage createPropertyPostMessage(Long deviceId, Map<String, Object> params) {
return randomPojo(IotDeviceMessage.class, o -> {
o.setDeviceId(deviceId);
o.setMethod(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod());
o.setParams(params);
});
}
/**
* H2 {@code sinkId}
*/
private void insertEnabledRule(Long deviceId, String method, String identifier, Long sinkId) {
IotDataRuleDO.SourceConfig config = randomPojo(IotDataRuleDO.SourceConfig.class, o -> {
o.setDeviceId(deviceId);
o.setMethod(method);
o.setIdentifier(identifier);
});
IotDataRuleDO rule = randomPojo(IotDataRuleDO.class, o -> {
o.setId(null);
o.setStatus(CommonStatusEnum.ENABLE.getStatus());
o.setSourceConfigs(singletonList(config));
o.setSinkIds(singletonList(sinkId));
});
dataRuleMapper.insert(rule);
}
/**
* mock sink
*/
private IotDataSinkDO mockEnabledSink(Long sinkId) {
IotDataSinkDO sink = randomPojo(IotDataSinkDO.class, o -> {
o.setId(sinkId);
o.setStatus(CommonStatusEnum.ENABLE.getStatus());
});
when(dataSinkService.getDataSinkFromCache(sinkId)).thenReturn(sink);
when(dataRuleAction.getType()).thenReturn(sink.getType());
return sink;
}
}

View File

@ -0,0 +1,65 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;
/**
* {@link IotDatabaseDataRuleAction}
*
* @author HUIHUI
*/
class IotDatabaseDataRuleActionTest {
private IotDatabaseDataRuleAction databaseDataRuleAction;
@Mock
private JdbcTemplate jdbcTemplate;
@BeforeEach
public void setUp() {
MockitoAnnotations.openMocks(this);
databaseDataRuleAction = new IotDatabaseDataRuleAction();
}
@Test
public void testGetType() {
// 调用 & 断言:返回 Database 类型枚举值
assertEquals(IotDataSinkTypeEnum.DATABASE.getType(), databaseDataRuleAction.getType());
}
@Test
public void testCloseProducer_whenHikari() throws Exception {
// 准备:底层是 HikariDataSource
HikariDataSource hikari = mock(HikariDataSource.class);
when(jdbcTemplate.getDataSource()).thenReturn(hikari);
// 调用
databaseDataRuleAction.closeProducer(jdbcTemplate);
// 断言HikariDataSource 被关闭
verify(hikari, times(1)).close();
}
@Test
public void testCloseProducer_whenNotHikari() throws Exception {
// 准备:底层不是 HikariDataSource避免误调 close
DataSource other = mock(DataSource.class);
when(jdbcTemplate.getDataSource()).thenReturn(other);
// 调用 & 断言:不抛异常,且不会尝试关闭非 Hikari 数据源
assertDoesNotThrow(() -> databaseDataRuleAction.closeProducer(jdbcTemplate));
verifyNoInteractions(other);
}
}

View File

@ -0,0 +1,63 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;
/**
* {@link IotMqttDataRuleAction}
*
* @author HUIHUI
*/
class IotMqttDataRuleActionTest {
private IotMqttDataRuleAction mqttDataRuleAction;
@Mock
private MqttClient mqttClient;
@BeforeEach
public void setUp() {
MockitoAnnotations.openMocks(this);
mqttDataRuleAction = new IotMqttDataRuleAction();
}
@Test
public void testGetType() {
// 调用 & 断言:返回 MQTT 类型枚举值
assertEquals(IotDataSinkTypeEnum.MQTT.getType(), mqttDataRuleAction.getType());
}
@Test
public void testCloseProducer_whenConnected() throws Exception {
// 准备:连接中状态
when(mqttClient.isConnected()).thenReturn(true);
// 调用
mqttDataRuleAction.closeProducer(mqttClient);
// 断言:先 disconnect 再 close
verify(mqttClient, times(1)).disconnect();
verify(mqttClient, times(1)).close();
}
@Test
public void testCloseProducer_whenAlreadyDisconnected() throws Exception {
// 准备:已断开状态
when(mqttClient.isConnected()).thenReturn(false);
// 调用
mqttDataRuleAction.closeProducer(mqttClient);
// 断言:跳过 disconnect仅 close
verify(mqttClient, never()).disconnect();
verify(mqttClient, times(1)).close();
}
}

View File

@ -5,7 +5,6 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@ -18,7 +17,6 @@ import static org.mockito.Mockito.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
class IotTcpDataRuleActionTest {
private IotTcpDataRuleAction tcpDataRuleAction;
@ -44,9 +42,8 @@ class IotTcpDataRuleActionTest {
assertEquals(expectedType, actualType);
}
// TODO @puhui999_ 后面是小写哈,单测的命名规则。
@Test
public void testInitProducer_Success() throws Exception {
public void testInitProducer_success() throws Exception {
// 准备参数
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
config.setHost("localhost");
@ -62,7 +59,7 @@ class IotTcpDataRuleActionTest {
}
@Test
public void testInitProducer_InvalidHost() {
public void testInitProducer_invalidHost() {
// 准备参数
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
config.setHost("");
@ -80,7 +77,7 @@ class IotTcpDataRuleActionTest {
}
@Test
public void testInitProducer_InvalidPort() {
public void testInitProducer_invalidPort() {
// 准备参数
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
config.setHost("localhost");
@ -107,7 +104,7 @@ class IotTcpDataRuleActionTest {
}
@Test
public void testExecute_WithValidConfig() {
public void testExecute_withValidConfig() {
// 准备参数
IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.report",
"{\"temperature\": 25.5, \"humidity\": 60}");
@ -127,7 +124,7 @@ class IotTcpDataRuleActionTest {
}
@Test
public void testConfig_DefaultValues() {
public void testConfig_defaultValues() {
// 准备参数
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();

View File

@ -5,11 +5,12 @@ import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotSceneRuleSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotSceneRuleMapper;
import cn.iocoder.yudao.module.iot.framework.job.core.IotSchedulerManager;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotSceneRuleAction;
import org.junit.jupiter.api.Disabled;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherManager;
import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotSceneRuleTimerHandler;
import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotTimerConditionEvaluator;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -30,7 +31,6 @@ import static org.mockito.Mockito.*;
*
* @author
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotSceneRuleServiceSimpleTest extends BaseMockitoUnitTest {
@InjectMocks
@ -43,7 +43,13 @@ public class IotSceneRuleServiceSimpleTest extends BaseMockitoUnitTest {
private List<IotSceneRuleAction> sceneRuleActions;
@Mock
private IotSchedulerManager schedulerManager;
private IotSceneRuleTimerHandler timerHandler;
@Mock
private IotTimerConditionEvaluator timerConditionEvaluator;
@Mock
private IotSceneRuleMatcherManager sceneRuleMatcherManager;
@Mock
private IotProductService productService;
@ -52,7 +58,7 @@ public class IotSceneRuleServiceSimpleTest extends BaseMockitoUnitTest {
private IotDeviceService deviceService;
@Test
public void testCreateScene_Rule_success() {
public void testCreateScene_rule_success() {
// 准备参数
IotSceneRuleSaveReqVO createReqVO = randomPojo(IotSceneRuleSaveReqVO.class, o -> {
o.setId(null);
@ -78,7 +84,7 @@ public class IotSceneRuleServiceSimpleTest extends BaseMockitoUnitTest {
}
@Test
public void testUpdateScene_Rule_success() {
public void testUpdateScene_rule_success() {
// 准备参数
Long id = randomLongId();
IotSceneRuleSaveReqVO updateReqVO = randomPojo(IotSceneRuleSaveReqVO.class, o -> {

View File

@ -14,8 +14,10 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.property.IotDevicePropertyService;
import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotSceneRuleAction;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherManager;
import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotSceneRuleTimerHandler;
import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotTimerConditionEvaluator;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import org.junit.jupiter.api.*;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -41,7 +43,6 @@ import static org.mockito.Mockito.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotSceneRuleTimerConditionIntegrationTest extends BaseMockitoUnitTest {
@InjectMocks
@ -62,6 +63,12 @@ public class IotSceneRuleTimerConditionIntegrationTest extends BaseMockitoUnitTe
@Mock
private IotSceneRuleTimerHandler timerHandler;
@Mock
private IotSceneRuleMatcherManager sceneRuleMatcherManager;
@Mock
private IotProductService productService;
private IotTimerConditionEvaluator timerConditionEvaluator;
// 测试常量

View File

@ -0,0 +1,272 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.action;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertConfigDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.alert.IotAlertReceiveTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleActionTypeEnum;
import cn.iocoder.yudao.module.iot.service.alert.IotAlertConfigService;
import cn.iocoder.yudao.module.iot.service.alert.IotAlertRecordService;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.system.api.mail.MailSendApi;
import cn.iocoder.yudao.module.system.api.mail.dto.MailSendSingleToUserReqDTO;
import cn.iocoder.yudao.module.system.api.notify.NotifyMessageSendApi;
import cn.iocoder.yudao.module.system.api.notify.dto.NotifySendSingleToUserReqDTO;
import cn.iocoder.yudao.module.system.api.sms.SmsSendApi;
import cn.iocoder.yudao.module.system.api.sms.dto.send.SmsSendSingleToUserReqDTO;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomLongId;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomPojo;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* {@link IotAlertTriggerSceneRuleAction}
*
* @author
*/
public class IotAlertTriggerSceneRuleActionTest extends BaseMockitoUnitTest {
@InjectMocks
private IotAlertTriggerSceneRuleAction action;
@Mock
private IotAlertConfigService alertConfigService;
@Mock
private IotAlertRecordService alertRecordService;
@Mock
private IotDeviceService deviceService;
@Mock
private SmsSendApi smsSendApi;
@Mock
private MailSendApi mailSendApi;
@Mock
private NotifyMessageSendApi notifyMessageSendApi;
@Test
public void testGetType() {
// 调用并断言
assertEquals(IotSceneRuleActionTypeEnum.ALERT_TRIGGER, action.getType());
}
@Test
public void testExecute_noAlertConfigs() throws Exception {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO rule = randomPojo(IotSceneRuleDO.class);
IotSceneRuleDO.Action actionConfig = randomPojo(IotSceneRuleDO.Action.class);
// mock 行为:返回空列表
when(alertConfigService.getAlertConfigListBySceneRuleIdAndStatus(rule.getId(), CommonStatusEnum.ENABLE.getStatus()))
.thenReturn(Collections.emptyList());
// 调用
action.execute(message, rule, actionConfig);
// 断言:不查设备、不创建记录、不发消息
verify(deviceService, never()).getDeviceFromCache(anyLong());
verify(alertRecordService, never()).createAlertRecord(any(), any(), any(), any());
verify(smsSendApi, never()).sendSingleSmsToAdmin(any());
verify(mailSendApi, never()).sendSingleMailToAdmin(any());
verify(notifyMessageSendApi, never()).sendSingleMessageToAdmin(any());
}
@Test
public void testExecute_deviceTrigger_sendAllChannels() throws Exception {
// 准备参数
Long userId = randomLongId();
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO rule = randomPojo(IotSceneRuleDO.class);
IotSceneRuleDO.Action actionConfig = randomPojo(IotSceneRuleDO.Action.class);
IotAlertConfigDO config = randomPojo(IotAlertConfigDO.class, c -> {
c.setReceiveUserIds(Collections.singletonList(userId));
c.setReceiveTypes(Arrays.asList(
IotAlertReceiveTypeEnum.SMS.getType(),
IotAlertReceiveTypeEnum.MAIL.getType(),
IotAlertReceiveTypeEnum.NOTIFY.getType()));
});
IotDeviceDO device = randomPojo(IotDeviceDO.class);
// mock 行为
when(alertConfigService.getAlertConfigListBySceneRuleIdAndStatus(rule.getId(), CommonStatusEnum.ENABLE.getStatus()))
.thenReturn(Collections.singletonList(config));
when(deviceService.getDeviceFromCache(message.getDeviceId())).thenReturn(device);
// 调用mockStatic 需包住整个调用链buildTemplateParams 内有 DictFrameworkUtils 静态调用)
try (MockedStatic<DictFrameworkUtils> dictMock = mockStatic(DictFrameworkUtils.class)) {
dictMock.when(() -> DictFrameworkUtils.parseDictDataLabel(any(), any(Integer.class)))
.thenReturn("WARN");
action.execute(message, rule, actionConfig);
}
// 断言:设备只查一次
verify(deviceService, times(1)).getDeviceFromCache(message.getDeviceId());
// 断言:告警记录创建一次,参数透传
verify(alertRecordService, times(1))
.createAlertRecord(eq(config), eq(rule.getId()), eq(message), eq(device));
// 断言:三条通道各发一次,模板编号匹配
ArgumentCaptor<SmsSendSingleToUserReqDTO> smsCaptor = ArgumentCaptor.forClass(SmsSendSingleToUserReqDTO.class);
verify(smsSendApi, times(1)).sendSingleSmsToAdmin(smsCaptor.capture());
assertEquals(userId, smsCaptor.getValue().getUserId());
assertEquals(IotAlertReceiveTypeEnum.SMS.getTemplateCode(), smsCaptor.getValue().getTemplateCode());
ArgumentCaptor<MailSendSingleToUserReqDTO> mailCaptor = ArgumentCaptor.forClass(MailSendSingleToUserReqDTO.class);
verify(mailSendApi, times(1)).sendSingleMailToAdmin(mailCaptor.capture());
assertEquals(IotAlertReceiveTypeEnum.MAIL.getTemplateCode(), mailCaptor.getValue().getTemplateCode());
ArgumentCaptor<NotifySendSingleToUserReqDTO> notifyCaptor = ArgumentCaptor.forClass(NotifySendSingleToUserReqDTO.class);
verify(notifyMessageSendApi, times(1)).sendSingleMessageToAdmin(notifyCaptor.capture());
assertEquals(IotAlertReceiveTypeEnum.NOTIFY.getTemplateCode(), notifyCaptor.getValue().getTemplateCode());
}
@Test
public void testExecute_timerTrigger_skipDeviceLookup() throws Exception {
// 准备参数定时触发message 为 null
Long userId = randomLongId();
IotSceneRuleDO rule = randomPojo(IotSceneRuleDO.class);
IotSceneRuleDO.Action actionConfig = randomPojo(IotSceneRuleDO.Action.class);
IotAlertConfigDO config = randomPojo(IotAlertConfigDO.class, c -> {
c.setReceiveUserIds(Collections.singletonList(userId));
c.setReceiveTypes(Collections.singletonList(IotAlertReceiveTypeEnum.NOTIFY.getType()));
});
// mock 行为
when(alertConfigService.getAlertConfigListBySceneRuleIdAndStatus(rule.getId(), CommonStatusEnum.ENABLE.getStatus()))
.thenReturn(Collections.singletonList(config));
// 调用
try (MockedStatic<DictFrameworkUtils> dictMock = mockStatic(DictFrameworkUtils.class)) {
dictMock.when(() -> DictFrameworkUtils.parseDictDataLabel(any(), any(Integer.class)))
.thenReturn("INFO");
action.execute(null, rule, actionConfig);
}
// 断言跳过设备查询message 与 device 都用 null 创建告警记录
verify(deviceService, never()).getDeviceFromCache(anyLong());
verify(alertRecordService, times(1))
.createAlertRecord(eq(config), eq(rule.getId()), eq(null), eq(null));
verify(notifyMessageSendApi, times(1)).sendSingleMessageToAdmin(any(NotifySendSingleToUserReqDTO.class));
}
@Test
public void testExecute_emptyReceiveUsers_skipSend() throws Exception {
// 准备参数:接收用户为空
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO rule = randomPojo(IotSceneRuleDO.class);
IotSceneRuleDO.Action actionConfig = randomPojo(IotSceneRuleDO.Action.class);
IotAlertConfigDO config = randomPojo(IotAlertConfigDO.class, c -> {
c.setReceiveUserIds(Collections.emptyList());
c.setReceiveTypes(Collections.singletonList(IotAlertReceiveTypeEnum.SMS.getType()));
});
IotDeviceDO device = randomPojo(IotDeviceDO.class);
// mock 行为
when(alertConfigService.getAlertConfigListBySceneRuleIdAndStatus(rule.getId(), CommonStatusEnum.ENABLE.getStatus()))
.thenReturn(Collections.singletonList(config));
when(deviceService.getDeviceFromCache(message.getDeviceId())).thenReturn(device);
// 调用
action.execute(message, rule, actionConfig);
// 断言:告警记录仍然创建,但不发送任何消息
verify(alertRecordService, times(1))
.createAlertRecord(eq(config), eq(rule.getId()), eq(message), eq(device));
verify(smsSendApi, never()).sendSingleSmsToAdmin(any());
}
@Test
public void testExecute_unknownReceiveType_skipSend() throws Exception {
// 准备参数:接收类型为未知值
Long userId = randomLongId();
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO rule = randomPojo(IotSceneRuleDO.class);
IotSceneRuleDO.Action actionConfig = randomPojo(IotSceneRuleDO.Action.class);
IotAlertConfigDO config = randomPojo(IotAlertConfigDO.class, c -> {
c.setReceiveUserIds(Collections.singletonList(userId));
c.setReceiveTypes(Collections.singletonList(99));
});
IotDeviceDO device = randomPojo(IotDeviceDO.class);
// mock 行为
when(alertConfigService.getAlertConfigListBySceneRuleIdAndStatus(rule.getId(), CommonStatusEnum.ENABLE.getStatus()))
.thenReturn(Collections.singletonList(config));
when(deviceService.getDeviceFromCache(message.getDeviceId())).thenReturn(device);
// 调用
try (MockedStatic<DictFrameworkUtils> dictMock = mockStatic(DictFrameworkUtils.class)) {
dictMock.when(() -> DictFrameworkUtils.parseDictDataLabel(any(), any(Integer.class)))
.thenReturn("WARN");
action.execute(message, rule, actionConfig);
}
// 断言:未知类型不发送
verify(smsSendApi, never()).sendSingleSmsToAdmin(any());
verify(mailSendApi, never()).sendSingleMailToAdmin(any());
verify(notifyMessageSendApi, never()).sendSingleMessageToAdmin(any());
}
@Test
public void testExecute_smsFailure_doesNotBlockOthers() throws Exception {
// 准备参数
Long userId = randomLongId();
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO rule = randomPojo(IotSceneRuleDO.class);
IotSceneRuleDO.Action actionConfig = randomPojo(IotSceneRuleDO.Action.class);
IotAlertConfigDO config = randomPojo(IotAlertConfigDO.class, c -> {
c.setReceiveUserIds(Collections.singletonList(userId));
c.setReceiveTypes(Arrays.asList(
IotAlertReceiveTypeEnum.SMS.getType(),
IotAlertReceiveTypeEnum.MAIL.getType()));
});
IotDeviceDO device = randomPojo(IotDeviceDO.class);
// mock 行为sms 抛异常
when(alertConfigService.getAlertConfigListBySceneRuleIdAndStatus(rule.getId(), CommonStatusEnum.ENABLE.getStatus()))
.thenReturn(Collections.singletonList(config));
when(deviceService.getDeviceFromCache(message.getDeviceId())).thenReturn(device);
when(smsSendApi.sendSingleSmsToAdmin(any())).thenThrow(new RuntimeException("sms 渠道异常"));
// 调用
try (MockedStatic<DictFrameworkUtils> dictMock = mockStatic(DictFrameworkUtils.class)) {
dictMock.when(() -> DictFrameworkUtils.parseDictDataLabel(any(), any(Integer.class)))
.thenReturn("ERROR");
action.execute(message, rule, actionConfig);
}
// 断言sms 抛错时邮件依旧发送
verify(smsSendApi, times(1)).sendSingleSmsToAdmin(any());
verify(mailSendApi, times(1)).sendSingleMailToAdmin(any());
}
/**
* reportTime
*/
private IotDeviceMessage createDeviceMessage() {
IotDeviceMessage message = new IotDeviceMessage();
message.setId(randomString());
message.setDeviceId(randomLongId());
message.setReportTime(LocalDateTime.now());
return message;
}
}

View File

@ -6,7 +6,6 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
@ -21,7 +20,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherTest {
private IotCurrentTimeConditionMatcher matcher;
@ -61,7 +59,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
// ========== 时间戳条件测试 ==========
@Test
public void testMatches_DateTimeGreaterThan_success() {
public void testMatches_dateTimeGreaterThan_success() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
long pastTimestamp = LocalDateTime.now().minusHours(1).toEpochSecond(ZoneOffset.of("+8"));
@ -78,7 +76,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
}
@Test
public void testMatches_DateTimeGreaterThan_fail() {
public void testMatches_dateTimeGreaterThan_fail() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
long futureTimestamp = LocalDateTime.now().plusHours(1).toEpochSecond(ZoneOffset.of("+8"));
@ -95,7 +93,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
}
@Test
public void testMatches_DateTimeLessThan_success() {
public void testMatches_dateTimeLessThan_success() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
long futureTimestamp = LocalDateTime.now().plusHours(1).toEpochSecond(ZoneOffset.of("+8"));
@ -112,7 +110,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
}
@Test
public void testMatches_DateTimeBetween_success() {
public void testMatches_dateTimeBetween_success() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
long startTimestamp = LocalDateTime.now().minusHours(1).toEpochSecond(ZoneOffset.of("+8"));
@ -130,7 +128,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
}
@Test
public void testMatches_DateTimeBetween_fail() {
public void testMatches_dateTimeBetween_fail() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
long startTimestamp = LocalDateTime.now().plusHours(1).toEpochSecond(ZoneOffset.of("+8"));
@ -150,7 +148,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
// ========== 当日时间条件测试 ==========
@Test
public void testMatches_TimeGreaterThan_earlyMorning() {
public void testMatches_timeGreaterThan_earlyMorning() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO.TriggerCondition condition = createTimeCondition(
@ -167,7 +165,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
}
@Test
public void testMatches_TimeLessThan_lateNight() {
public void testMatches_timeLessThan_lateNight() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO.TriggerCondition condition = createTimeCondition(
@ -184,7 +182,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
}
@Test
public void testMatches_TimeBetween_allDay() {
public void testMatches_timeBetween_allDay() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO.TriggerCondition condition = createTimeCondition(
@ -200,7 +198,7 @@ public class IotCurrentTimeConditionMatcherTest extends IotBaseConditionMatcherT
}
@Test
public void testMatches_TimeBetween_workingHours() {
public void testMatches_timeBetween_workingHours() {
// 准备参数
IotDeviceMessage message = createDeviceMessage();
IotSceneRuleDO.TriggerCondition condition = createTimeCondition(

View File

@ -6,7 +6,6 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@ -20,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotDevicePropertyConditionMatcherTest extends IotBaseConditionMatcherTest {
private IotDevicePropertyConditionMatcher matcher;

View File

@ -7,7 +7,6 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomLongId;
@ -19,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotDeviceStateConditionMatcherTest extends IotBaseConditionMatcherTest {
private IotDeviceStateConditionMatcher matcher;

View File

@ -7,7 +7,6 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@ -23,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotDeviceEventPostTriggerMatcherTest extends IotBaseConditionMatcherTest {
private IotDeviceEventPostTriggerMatcher matcher;

View File

@ -8,7 +8,6 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@ -25,7 +24,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotDevicePropertyPostTriggerMatcherTest extends IotBaseConditionMatcherTest {
private IotDevicePropertyPostTriggerMatcher matcher;

View File

@ -7,7 +7,6 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@ -23,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotDeviceServiceInvokeTriggerMatcherTest extends IotBaseConditionMatcherTest {
private IotDeviceServiceInvokeTriggerMatcher matcher;

View File

@ -8,7 +8,6 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomLongId;
@ -19,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotDeviceStateUpdateTriggerMatcherTest extends IotBaseConditionMatcherTest {
private IotDeviceStateUpdateTriggerMatcher matcher;

View File

@ -5,7 +5,6 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomLongId;
@ -17,7 +16,6 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author HUIHUI
*/
@Disabled // TODO @puhui999单测有报错先屏蔽
public class IotTimerTriggerMatcherTest extends IotBaseConditionMatcherTest {
private IotTimerTriggerMatcher matcher;

View File

@ -8,3 +8,4 @@ DELETE FROM "iot_alert_record";
DELETE FROM "iot_ota_firmware";
DELETE FROM "iot_ota_task";
DELETE FROM "iot_ota_record";
DELETE FROM "iot_data_rule";

View File

@ -180,3 +180,19 @@ CREATE TABLE IF NOT EXISTS "iot_ota_record" (
"tenant_id" bigint NOT NULL DEFAULT '0',
PRIMARY KEY ("id")
) COMMENT 'IoT OTA 升级记录表';
CREATE TABLE IF NOT EXISTS "iot_data_rule" (
"id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
"name" varchar(128) NOT NULL,
"description" varchar(256) DEFAULT '',
"status" int NOT NULL,
"source_configs" varchar(10000) NOT NULL,
"sink_ids" varchar(512) NOT NULL,
"creator" varchar(64) DEFAULT '',
"create_time" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updater" varchar(64) DEFAULT '',
"update_time" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"deleted" bit NOT NULL DEFAULT FALSE,
"tenant_id" bigint NOT NULL DEFAULT '0',
PRIMARY KEY ("id")
) COMMENT 'IoT 数据流转规则';