diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 587354c0c4..b269e66cad 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -915,55 +915,24 @@ public abstract class BaseController { return null; } - protected void sendRelationNotificationMsg(TenantId tenantId, EntityRelation relation, EdgeEventActionType action) { - try { - if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && - !relation.getTo().getEntityType().equals(EntityType.EDGE)) { - sendNotificationMsgToEdgeService(tenantId, null, null, json.writeValueAsString(relation), EdgeEventType.RELATION, action); - } - } catch (Exception e) { - log.warn("Failed to push relation to core: {}", relation, e); - } - } - protected void sendDeleteNotificationMsg(TenantId tenantId, EntityId entityId, List edgeIds) { - sendDeleteNotificationMsg(tenantId, entityId, edgeIds, null); - } - - protected void sendDeleteNotificationMsg(TenantId tenantId, EntityId entityId, List edgeIds, String body) { if (edgeIds != null && !edgeIds.isEmpty()) { for (EdgeId edgeId : edgeIds) { - sendNotificationMsgToEdgeService(tenantId, edgeId, entityId, body, null, EdgeEventActionType.DELETED); + sendNotificationMsgToEdge(tenantId, edgeId, entityId, null, null, EdgeEventActionType.DELETED); } } } - protected void sendAlarmDeleteNotificationMsg(TenantId tenantId, EntityId entityId, List edgeIds, Alarm alarm) { - try { - sendDeleteNotificationMsg(tenantId, entityId, edgeIds, json.writeValueAsString(alarm)); - } catch (Exception e) { - log.warn("Failed to push delete alarm msg to core: {}", alarm, e); - } - } - - protected void sendEntityAssignToCustomerNotificationMsg(TenantId tenantId, EntityId entityId, CustomerId customerId, EdgeEventActionType action) { - try { - sendNotificationMsgToEdgeService(tenantId, null, entityId, json.writeValueAsString(customerId), null, action); - } catch (Exception e) { - log.warn("Failed to push assign/unassign to/from customer to core: {}", customerId, e); - } - } - protected void sendEntityNotificationMsg(TenantId tenantId, EntityId entityId, EdgeEventActionType action) { - sendNotificationMsgToEdgeService(tenantId, null, entityId, null, null, action); + sendNotificationMsgToEdge(tenantId, null, entityId, null, null, action); } protected void sendEntityAssignToEdgeNotificationMsg(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) { - sendNotificationMsgToEdgeService(tenantId, edgeId, entityId, null, null, action); + sendNotificationMsgToEdge(tenantId, edgeId, entityId, null, null, action); } - private void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) { - tbClusterService.sendNotificationMsgToEdgeService(tenantId, edgeId, entityId, body, type, action); + private void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) { + tbClusterService.sendNotificationMsgToEdge(tenantId, edgeId, entityId, body, type, action); } protected List findRelatedEdgeIds(TenantId tenantId, EntityId entityId) { diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 4dcffa605e..990343350d 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -32,6 +32,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; +import org.thingsboard.rule.engine.flow.TbRuleChainInputNode; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.edge.Edge; @@ -557,7 +558,7 @@ public class EdgeController extends BaseController { edgeId = checkNotNull(edgeId); SecurityUser user = getCurrentUser(); TenantId tenantId = user.getTenantId(); - return edgeService.findMissingToRelatedRuleChains(tenantId, edgeId); + return edgeService.findMissingToRelatedRuleChains(tenantId, edgeId, TbRuleChainInputNode.class.getName()); } catch (Exception e) { throw handleException(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/action/EntityActionService.java b/application/src/main/java/org/thingsboard/server/service/action/EntityActionService.java index 2a64db7385..fb47e4e136 100644 --- a/application/src/main/java/org/thingsboard/server/service/action/EntityActionService.java +++ b/application/src/main/java/org/thingsboard/server/service/action/EntityActionService.java @@ -223,8 +223,8 @@ public class EntityActionService { auditLogService.logEntityAction(user.getTenantId(), customerId, user.getId(), user.getName(), entityId, entity, actionType, e, additionalInfo); } - public void sendEntityNotificationMsgToEdgeService(TenantId tenantId, EntityId entityId, EdgeEventActionType action) { - tbClusterService.sendNotificationMsgToEdgeService(tenantId, null, entityId, null, null, action); + public void sendEntityNotificationMsgToEdge(TenantId tenantId, EntityId entityId, EdgeEventActionType action) { + tbClusterService.sendNotificationMsgToEdge(tenantId, null, entityId, null, null, action); } private T extractParameter(Class clazz, int index, Object... additionalInfo) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructor.java index 0fc62cfe41..db67ecf822 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructor.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.flow.TbRuleChainInputNode; import org.thingsboard.rule.engine.flow.TbRuleChainInputNodeConfiguration; +import org.thingsboard.rule.engine.flow.TbRuleChainOutputNode; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChain; @@ -49,9 +51,8 @@ import java.util.stream.Collectors; @TbCoreComponent public class RuleChainMsgConstructor { - private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final String RULE_CHAIN_INPUT_NODE = "org.thingsboard.rule.engine.flow.TbRuleChainInputNode"; - private static final String TB_RULE_CHAIN_OUTPUT_NODE = "org.thingsboard.rule.engine.flow.TbRuleChainOutputNode"; + private static final String RULE_CHAIN_INPUT_NODE = TbRuleChainInputNode.class.getName(); + private static final String TB_RULE_CHAIN_OUTPUT_NODE = TbRuleChainOutputNode.class.getName(); public RuleChainUpdateMsg constructRuleChainUpdatedMsg(RuleChainId edgeRootRuleChainId, UpdateMsgType msgType, RuleChain ruleChain) { RuleChainUpdateMsg.Builder builder = RuleChainUpdateMsg.newBuilder() @@ -210,13 +211,11 @@ public class RuleChainMsgConstructor { private List filterNodes_V_3_3_0(List nodes) { List result = new ArrayList<>(); for (RuleNode node : nodes) { - switch (node.getType()) { - case RULE_CHAIN_INPUT_NODE: - case TB_RULE_CHAIN_OUTPUT_NODE: - log.trace("Skipping not supported rule node {}", node); - break; - default: - result.add(node); + if (RULE_CHAIN_INPUT_NODE.equals(node.getType()) + || TB_RULE_CHAIN_OUTPUT_NODE.equals(node.getType())) { + log.trace("Skipping not supported rule node {}", node); + } else { + result.add(node); } } return result; @@ -280,7 +279,7 @@ public class RuleChainMsgConstructor { .setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits()) .setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits()) .setType(ruleChainConnectionInfo.getType()) - .setAdditionalInfo(objectMapper.writeValueAsString(additionalInfo)) + .setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(additionalInfo)) .build(); } @@ -291,8 +290,8 @@ public class RuleChainMsgConstructor { .setType(node.getType()) .setName(node.getName()) .setDebugMode(node.isDebugMode()) - .setConfiguration(objectMapper.writeValueAsString(node.getConfiguration())) - .setAdditionalInfo(objectMapper.writeValueAsString(node.getAdditionalInfo())) + .setConfiguration(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getConfiguration())) + .setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getAdditionalInfo())) .build(); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java index 54a5e202ab..9983e4b0b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java @@ -32,6 +32,6 @@ public class TenantWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdge } @Override protected PageData findWidgetsBundles(TenantId tenantId, PageLink pageLink) { - return widgetsBundleService.findAllTenantWidgetsBundlesByTenantIdAndPageLink(tenantId, pageLink); + return widgetsBundleService.findTenantWidgetsBundlesByTenantId(tenantId, pageLink); } } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java index b2962acf97..7ee7696f1a 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java @@ -243,7 +243,7 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS try { if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && !relation.getTo().getEntityType().equals(EntityType.EDGE)) { - sendNotificationMsgToEdgeService(tenantId, null, null, json.writeValueAsString(relation), + sendNotificationMsgToEdge(tenantId, null, null, json.writeValueAsString(relation), EdgeEventType.RELATION, edgeTypeByActionType(actionType)); } } catch (Exception e1) { @@ -267,18 +267,18 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS } private void sendEntityNotificationMsg(TenantId tenantId, EntityId entityId, EdgeEventActionType action) { - sendNotificationMsgToEdgeService(tenantId, null, entityId, null, null, action); + sendNotificationMsgToEdge(tenantId, null, entityId, null, null, action); } private void sendEntityAssignToCustomerNotificationMsg(TenantId tenantId, EntityId entityId, CustomerId customerId, EdgeEventActionType action) { try { - sendNotificationMsgToEdgeService(tenantId, null, entityId, json.writeValueAsString(customerId), null, action); + sendNotificationMsgToEdge(tenantId, null, entityId, json.writeValueAsString(customerId), null, action); } catch (Exception e) { log.warn("Failed to push assign/unassign to/from customer to core: {}", customerId, e); } } - protected void sendAlarmDeleteNotificationMsg(TenantId tenantId, Alarm alarm, List edgeIds, String body) { + private void sendAlarmDeleteNotificationMsg(TenantId tenantId, Alarm alarm, List edgeIds, String body) { try { sendDeleteNotificationMsg(tenantId, alarm.getId(), edgeIds, body); } catch (Exception e) { @@ -286,8 +286,7 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS } } - protected void sendDeleteNotificationMsg(TenantId tenantId, I entityId, E entity, - List edgeIds) { + private void sendDeleteNotificationMsg(TenantId tenantId, I entityId, E entity, List edgeIds) { try { sendDeleteNotificationMsg(tenantId, entityId, edgeIds, null); } catch (Exception e) { @@ -298,17 +297,17 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS private void sendDeleteNotificationMsg(TenantId tenantId, EntityId entityId, List edgeIds, String body) { if (edgeIds != null && !edgeIds.isEmpty()) { for (EdgeId edgeId : edgeIds) { - sendNotificationMsgToEdgeService(tenantId, edgeId, entityId, body, null, EdgeEventActionType.DELETED); + sendNotificationMsgToEdge(tenantId, edgeId, entityId, body, null, EdgeEventActionType.DELETED); } } } private void sendEntityAssignToEdgeNotificationMsg(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) { - sendNotificationMsgToEdgeService(tenantId, edgeId, entityId, null, null, action); + sendNotificationMsgToEdge(tenantId, edgeId, entityId, null, null, action); } - private void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) { - tbClusterService.sendNotificationMsgToEdgeService(tenantId, edgeId, entityId, body, type, action); + private void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) { + tbClusterService.sendNotificationMsgToEdge(tenantId, edgeId, entityId, body, type, action); } private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 6b475f5a5a..01adab5f6f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -441,12 +441,12 @@ public class DefaultTbClusterService implements TbClusterService { sendDeviceStateServiceEvent(device.getTenantId(), device.getId(), created, !created, false); otaPackageStateService.update(device, old); if (!created && notifyEdge) { - sendNotificationMsgToEdgeService(device.getTenantId(), null, device.getId(), null, null, EdgeEventActionType.UPDATED); + sendNotificationMsgToEdge(device.getTenantId(), null, device.getId(), null, null, EdgeEventActionType.UPDATED); } } @Override - public void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) { + public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) { if (!edgesEnabled) { return; } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java index cd9dc17d60..f2e61b8f2d 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java @@ -56,7 +56,7 @@ public class AssetImportService extends BaseEntityImportService findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink); - String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId); + String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId, String tbRuleChainInputNodeClassName); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 8dc764148e..9eee9e4774 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.dao.edge; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; @@ -27,6 +26,7 @@ import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.event.TransactionalEventListener; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; @@ -46,7 +46,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.rule.RuleChain; -import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; +import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.relation.RelationService; @@ -62,6 +62,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; import static org.thingsboard.server.dao.DaoUtil.toUUIDs; @@ -78,8 +79,6 @@ public class EdgeServiceImpl extends AbstractCachedEntityService edgeRuleChains = findEdgeRuleChains(tenantId, edgeId); List edgeRuleChainIds = edgeRuleChains.stream().map(IdBased::getId).collect(Collectors.toList()); - ObjectNode result = mapper.createObjectNode(); + ObjectNode result = JacksonUtil.OBJECT_MAPPER.createObjectNode(); for (RuleChain edgeRuleChain : edgeRuleChains) { - List connectionInfos = - ruleChainService.loadRuleChainMetaData(edgeRuleChain.getTenantId(), edgeRuleChain.getId()).getRuleChainConnections(); - if (connectionInfos != null && !connectionInfos.isEmpty()) { + List ruleNodes = + ruleChainService.loadRuleChainMetaData(edgeRuleChain.getTenantId(), edgeRuleChain.getId()).getNodes(); + if (ruleNodes != null && !ruleNodes.isEmpty()) { List connectedRuleChains = - connectionInfos.stream().map(RuleChainConnectionInfo::getTargetRuleChainId).collect(Collectors.toList()); + ruleNodes.stream() + .filter(rn -> rn.getType().equals(tbRuleChainInputNodeClassName)) + .map(rn -> new RuleChainId(UUID.fromString(rn.getConfiguration().get("ruleChainId").asText()))) + .collect(Collectors.toList()); List missingRuleChains = new ArrayList<>(); for (RuleChainId connectedRuleChain : connectedRuleChains) { if (!edgeRuleChainIds.contains(connectedRuleChain)) { @@ -469,7 +471,7 @@ public class EdgeServiceImpl extends AbstractCachedEntityService