diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 97e276f673..c4ae3b851d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -186,14 +186,25 @@ class DefaultTbContext implements TbContext { } public TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId) { - try { - ObjectNode entityNode = mapper.valueToTree(alarm); - return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, alarm.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L); - } catch (JsonProcessingException | IllegalArgumentException e) { - throw new RuntimeException("Failed to process alarm created msg: " + e); - } + return alarmMsg(alarm, ruleNodeId, DataConstants.ENTITY_CREATED); } + public TbMsg alarmUpdatedMsg(Alarm alarm, RuleNodeId ruleNodeId) { + return alarmMsg(alarm, ruleNodeId, DataConstants.ENTITY_UPDATED); + } + + public TbMsg alarmClearedMsg(Alarm alarm, RuleNodeId ruleNodeId) { + return alarmMsg(alarm, ruleNodeId, DataConstants.ALARM_CLEAR); + } + + private TbMsg alarmMsg(Alarm alarm, RuleNodeId ruleNodeId, String type) { + try { + ObjectNode entityNode = mapper.valueToTree(alarm); + return new TbMsg(UUIDs.timeBased(), type, alarm.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L); + } catch (JsonProcessingException | IllegalArgumentException e) { + throw new RuntimeException("Failed to process alarm created, updated or cleared msg: " + e); + } + } @Override public RuleNodeId getSelfId() { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index b06e4dcf9c..d21f8e8f66 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.thingsboard.server.actors.service.ActorService; +import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.customer.CustomerService; @@ -60,6 +61,10 @@ public class EdgeContextComponent { @Autowired private RelationService relationService; + @Lazy + @Autowired + private AlarmService alarmService; + @Lazy @Autowired private ActorService actorService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index b9dbe393e3..c04dcf9e16 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -32,6 +32,8 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmSeverity; +import org.thingsboard.server.common.data.alarm.AlarmStatus; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeQueueEntry; @@ -180,7 +182,7 @@ public final class EdgeGrpcSession implements Cloneable { case ENTITY_UPDATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE: case ALARM_ACK_RPC_MESSAGE: - case ALARM_CLEARK_RPC_MESSAGE: + case ALARM_CLEAR_RPC_MESSAGE: processEntityCRUDMessage(entry, msgType); break; case RULE_CHAIN_CUSTOM_MESSAGE: @@ -379,7 +381,7 @@ public final class EdgeGrpcSession implements Cloneable { AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder() .setMsgType(msgType) .setName(alarm.getName()) - .setType(alarm.getName()) + .setType(alarm.getType()) .setOriginatorName(entityName) .setOriginatorType(alarm.getOriginator().getEntityType().name()) .setSeverity(alarm.getSeverity().name()) @@ -409,6 +411,10 @@ public final class EdgeGrpcSession implements Cloneable { case DataConstants.ENTITY_DELETED: case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE; + case DataConstants.ALARM_ACK: + return UpdateMsgType.ALARM_ACK_RPC_MESSAGE; + case DataConstants.ALARM_CLEAR: + return UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE; default: throw new RuntimeException("Unsupported msgType [" + msgType + "]"); } @@ -532,7 +538,7 @@ public final class EdgeGrpcSession implements Cloneable { DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder() .setMsgType(msgType) .setName(device.getName()) - .setType(device.getName()); + .setType(device.getType()); return builder.build(); } @@ -540,7 +546,7 @@ public final class EdgeGrpcSession implements Cloneable { AssetUpdateMsg.Builder builder = AssetUpdateMsg.newBuilder() .setMsgType(msgType) .setName(asset.getName()) - .setType(asset.getName()); + .setType(asset.getType()); return builder.build(); } @@ -562,7 +568,7 @@ public final class EdgeGrpcSession implements Cloneable { EntityViewUpdateMsg.Builder builder = EntityViewUpdateMsg.newBuilder() .setMsgType(msgType) .setName(entityView.getName()) - .setType(entityView.getName()) + .setType(entityView.getType()) .setRelatedName(relatedName) .setRelatedType(relatedType) .setRelatedEntityType(relatedEntityType); @@ -618,6 +624,11 @@ public final class EdgeGrpcSession implements Cloneable { } } } + if (uplinkMsg.getAlarmUpdatemsgList() != null && !uplinkMsg.getAlarmUpdatemsgList().isEmpty()) { + for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdatemsgList()) { + onAlarmUpdate(alarmUpdateMsg); + } + } } catch (Exception e) { return UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(e.getMessage()).build(); } @@ -625,6 +636,65 @@ public final class EdgeGrpcSession implements Cloneable { return UplinkResponseMsg.newBuilder().setSuccess(true).build(); } + private EntityId getAlarmOriginator(String entityName, org.thingsboard.server.common.data.EntityType entityType) { + switch (entityType) { + case DEVICE: + return ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), entityName).getId(); + case ASSET: + return ctx.getAssetService().findAssetByTenantIdAndName(edge.getTenantId(), entityName).getId(); + case ENTITY_VIEW: + return ctx.getEntityViewService().findEntityViewByTenantIdAndName(edge.getTenantId(), entityName).getId(); + default: + return null; + } + } + + private void onAlarmUpdate(AlarmUpdateMsg alarmUpdateMsg) { + EntityId originatorId = getAlarmOriginator(alarmUpdateMsg.getOriginatorName(), org.thingsboard.server.common.data.EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); + if (originatorId != null) { + try { + Alarm existentAlarm = ctx.getAlarmService().findLatestByOriginatorAndType(edge.getTenantId(), originatorId, alarmUpdateMsg.getType()).get(); + switch (alarmUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + if (existentAlarm == null) { + existentAlarm = new Alarm(); + existentAlarm.setTenantId(edge.getTenantId()); + existentAlarm.setType(alarmUpdateMsg.getName()); + existentAlarm.setOriginator(originatorId); + existentAlarm.setSeverity(AlarmSeverity.valueOf(alarmUpdateMsg.getSeverity())); + existentAlarm.setStatus(AlarmStatus.valueOf(alarmUpdateMsg.getStatus())); + existentAlarm.setStartTs(alarmUpdateMsg.getStartTs()); + existentAlarm.setAckTs(alarmUpdateMsg.getAckTs()); + existentAlarm.setClearTs(alarmUpdateMsg.getClearTs()); + existentAlarm.setPropagate(alarmUpdateMsg.getPropagate()); + } + existentAlarm.setEndTs(alarmUpdateMsg.getEndTs()); + existentAlarm.setDetails(objectMapper.readTree(alarmUpdateMsg.getDetails())); + ctx.getAlarmService().createOrUpdateAlarm(existentAlarm); + break; + case ALARM_ACK_RPC_MESSAGE: + if (existentAlarm != null) { + ctx.getAlarmService().ackAlarm(edge.getTenantId(), existentAlarm.getId(), alarmUpdateMsg.getAckTs()); + } + break; + case ALARM_CLEAR_RPC_MESSAGE: + if (existentAlarm != null) { + ctx.getAlarmService().clearAlarm(edge.getTenantId(), existentAlarm.getId(), objectMapper.readTree(alarmUpdateMsg.getDetails()), alarmUpdateMsg.getAckTs()); + } + break; + case ENTITY_DELETED_RPC_MESSAGE: + if (existentAlarm != null) { + ctx.getAlarmService().deleteAlarm(edge.getTenantId(), existentAlarm.getId()); + } + break; + } + } catch (Exception e) { + log.error("Error during finding existent alarm", e); + } + } + } + private Device getOrCreateDevice(String deviceName, String deviceType) { Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName); if (device == null) { @@ -647,6 +717,7 @@ public final class EdgeGrpcSession implements Cloneable { device.setTenantId(edge.getTenantId()); device.setCustomerId(edge.getCustomerId()); device = ctx.getDeviceService().saveDevice(device); + device = ctx.getDeviceService().assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId()); createRelationFromEdge(device.getId()); ctx.getActorService().onDeviceAdded(device); pushDeviceCreatedEventToRuleEngine(device); diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index fe84cd5fd1..7f4dfba92b 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -26,6 +26,7 @@ + diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 74a3321cd9..ad13539ae1 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -89,7 +89,7 @@ enum UpdateMsgType { ENTITY_UPDATED_RPC_MESSAGE = 1; ENTITY_DELETED_RPC_MESSAGE = 2; ALARM_ACK_RPC_MESSAGE = 3; - ALARM_CLEARK_RPC_MESSAGE = 4; + ALARM_CLEAR_RPC_MESSAGE = 4; RULE_CHAIN_CUSTOM_MESSAGE = 5; } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 241a1e1822..171cf564d6 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -79,6 +79,10 @@ public interface TbContext { TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId); + TbMsg alarmUpdatedMsg(Alarm alarm, RuleNodeId ruleNodeId); + + TbMsg alarmClearedMsg(Alarm alarm, RuleNodeId ruleNodeId); + RuleNodeId getSelfId(); TenantId getTenantId(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java index 5fc482348d..6f14454683 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java @@ -63,8 +63,10 @@ public abstract class TbAbstractAlarmNode ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());