From ef97f1272450b5b4976cc17b43a967943a6c9d42 Mon Sep 17 00:00:00 2001 From: Yuriy Lytvynchuk Date: Wed, 28 Sep 2022 09:55:58 +0300 Subject: [PATCH 01/11] add checkTenantEntity --- .../actors/ruleChain/DefaultTbContext.java | 16 +++++++ .../rule/engine/api/TbContext.java | 2 + .../rule/engine/debug/TbMsgGeneratorNode.java | 1 + .../engine/filter/TbCheckRelationNode.java | 1 + .../util/EntitiesTenantIdAsyncLoader.java | 46 +++++++++++++++++-- 5 files changed, 62 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 08b711015f..d5ff1ff334 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -31,6 +31,7 @@ import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; +import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.cluster.TbClusterService; @@ -89,6 +90,7 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import java.util.Collections; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -704,6 +706,20 @@ class DefaultTbContext implements TbContext { return metaData; } + public void checkTenantEntity(EntityId entityId) { + try { + TenantId entityTenantId = EntitiesTenantIdAsyncLoader.findTenantIdByEntityId(this, entityId).get(); + if (entityTenantId == null) { + throw new RuntimeException("Entity with id '" + entityId + "'not found!"); + } + if (!entityTenantId.equals(this.getTenantId())) { + throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant."); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private class SimpleTbQueueCallback implements TbQueueCallback { private final Runnable onSuccess; private final Consumer onFailure; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index f74efe7748..1c2cf92256 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -193,6 +193,8 @@ public interface TbContext { boolean isLocalEntity(EntityId entityId); + void checkTenantEntity(EntityId entityId); + RuleNodeId getSelfId(); RuleNode getSelf(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index 32ec34ef39..4e14a6b15b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -80,6 +80,7 @@ public class TbMsgGeneratorNode implements TbNode { } else { originatorId = ctx.getSelfId(); } + ctx.checkTenantEntity(originatorId); updateGeneratorState(ctx); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java index 4832a0af35..5b38655d9a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java @@ -59,6 +59,7 @@ public class TbCheckRelationNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbCheckRelationNodeConfiguration.class); + ctx.checkTenantEntity(EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId())); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java index 4d57e89fda..2699553ff1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -23,9 +23,15 @@ import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.OtaPackageId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; @@ -57,10 +63,42 @@ public class EntitiesTenantIdAsyncLoader { } } + public static ListenableFuture findTenantIdByEntityId(TbContext ctx, EntityId entityId) { + + switch (entityId.getEntityType()) { + case TENANT: + return Futures.immediateFuture((TenantId) entityId); + case CUSTOMER: + return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), (CustomerId) entityId)); + case USER: + return getTenantAsync(ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), (UserId) entityId)); + case ASSET: + return getTenantAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) entityId)); + case DEVICE: + return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) entityId)); + case ALARM: + return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) entityId)); + case RULE_CHAIN: + return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync(ctx.getTenantId(), (RuleChainId) entityId)); + case ENTITY_VIEW: + return getTenantAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) entityId)); + case DASHBOARD: + return getTenantAsync(ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), (DashboardId) entityId)); + case EDGE: + return getTenantAsync(ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) entityId)); + case OTA_PACKAGE: + return getTenantAsync(ctx.getOtaPackageService().findOtaPackageInfoByIdAsync(ctx.getTenantId(), (OtaPackageId) entityId)); + case ASSET_PROFILE: + return getTenantAsync(Futures.immediateFuture(ctx.getAssetProfileCache().get(ctx.getTenantId(), (AssetProfileId) entityId))); + case DEVICE_PROFILE: + return getTenantAsync(Futures.immediateFuture(ctx.getDeviceProfileCache().get(ctx.getTenantId(), (DeviceProfileId) entityId))); + default: + return Futures.immediateFailedFuture(new TbNodeException("Unexpected entityId EntityType " + entityId.getEntityType())); + } + } + private static ListenableFuture getTenantAsync(ListenableFuture future) { - return Futures.transformAsync(future, in -> { - return in != null ? Futures.immediateFuture(in.getTenantId()) - : Futures.immediateFuture(null); - }, MoreExecutors.directExecutor()); + return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getTenantId()) + : Futures.immediateFuture(null), MoreExecutors.directExecutor()); } } From 03842f92d9f61e5dd3d03a3cab8714c461b4ea14 Mon Sep 17 00:00:00 2001 From: Yuriy Lytvynchuk Date: Thu, 6 Oct 2022 12:24:53 +0300 Subject: [PATCH 02/11] refactor code --- .../actors/ruleChain/DefaultTbContext.java | 3 +- .../rule/engine/debug/TbMsgGeneratorNode.java | 2 +- .../engine/filter/TbCheckRelationNode.java | 10 +++- .../util/EntitiesTenantIdAsyncLoader.java | 60 ++++++------------- 4 files changed, 27 insertions(+), 48 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index d5ff1ff334..0e557ea76c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -90,7 +90,6 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import java.util.Collections; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -708,7 +707,7 @@ class DefaultTbContext implements TbContext { public void checkTenantEntity(EntityId entityId) { try { - TenantId entityTenantId = EntitiesTenantIdAsyncLoader.findTenantIdByEntityId(this, entityId).get(); + TenantId entityTenantId = EntitiesTenantIdAsyncLoader.findEntityIdAsync(this, entityId).get(); if (entityTenantId == null) { throw new RuntimeException("Entity with id '" + entityId + "'not found!"); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index 4e14a6b15b..a7752aa116 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -77,10 +77,10 @@ public class TbMsgGeneratorNode implements TbNode { this.currentMsgCount = 0; if (!StringUtils.isEmpty(config.getOriginatorId())) { originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); + ctx.checkTenantEntity(originatorId); } else { originatorId = ctx.getSelfId(); } - ctx.checkTenantEntity(originatorId); updateGeneratorState(ctx); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java index 5b38655d9a..2dacc1a458 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java @@ -55,11 +55,15 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; public class TbCheckRelationNode implements TbNode { private TbCheckRelationNodeConfiguration config; + private EntityId singleEntityId; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbCheckRelationNodeConfiguration.class); - ctx.checkTenantEntity(EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId())); + if (config.isCheckForSingleEntity()) { + this.singleEntityId = EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId()); + ctx.checkTenantEntity(singleEntityId); + } } @Override @@ -77,10 +81,10 @@ public class TbCheckRelationNode implements TbNode { EntityId from; EntityId to; if (EntitySearchDirection.FROM.name().equals(config.getDirection())) { - from = EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId()); + from = singleEntityId; to = msg.getOriginator(); } else { - to = EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId()); + to = singleEntityId; from = msg.getOriginator(); } return ctx.getRelationService().checkRelationAsync(ctx.getTenantId(), from, to, config.getRelationType(), RelationTypeGroup.COMMON); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java index 2699553ff1..a8547bdde4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.util; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.HasTenantId; @@ -37,68 +36,45 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; public class EntitiesTenantIdAsyncLoader { - /** - * @deprecated consider to remove since tenantId is already defined in the TbContext. - */ - @Deprecated - public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { - switch (original.getEntityType()) { - case TENANT: - return Futures.immediateFuture((TenantId) original); - case CUSTOMER: - return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), (CustomerId) original)); - case USER: - return getTenantAsync(ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), (UserId) original)); - case ASSET: - return getTenantAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) original)); - case DEVICE: - return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) original)); - case ALARM: - return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) original)); - case RULE_CHAIN: - return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync(ctx.getTenantId(), (RuleChainId) original)); - default: - return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original.getEntityType())); - } - } - - public static ListenableFuture findTenantIdByEntityId(TbContext ctx, EntityId entityId) { + public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId entityId) { switch (entityId.getEntityType()) { case TENANT: return Futures.immediateFuture((TenantId) entityId); case CUSTOMER: - return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), (CustomerId) entityId)); + return getTenantAsync(ctx, ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), (CustomerId) entityId)); case USER: - return getTenantAsync(ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), (UserId) entityId)); + return getTenantAsync(ctx, ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), (UserId) entityId)); case ASSET: - return getTenantAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) entityId)); + return getTenantAsync(ctx, ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) entityId)); case DEVICE: - return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) entityId)); + return getTenantAsync(ctx, ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) entityId)); case ALARM: - return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) entityId)); + return getTenantAsync(ctx, ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) entityId)); case RULE_CHAIN: - return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync(ctx.getTenantId(), (RuleChainId) entityId)); + return getTenantAsync(ctx, ctx.getRuleChainService().findRuleChainByIdAsync(ctx.getTenantId(), (RuleChainId) entityId)); case ENTITY_VIEW: - return getTenantAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) entityId)); + return getTenantAsync(ctx, ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) entityId)); case DASHBOARD: - return getTenantAsync(ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), (DashboardId) entityId)); + return getTenantAsync(ctx, ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), (DashboardId) entityId)); case EDGE: - return getTenantAsync(ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) entityId)); + return getTenantAsync(ctx, ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) entityId)); case OTA_PACKAGE: - return getTenantAsync(ctx.getOtaPackageService().findOtaPackageInfoByIdAsync(ctx.getTenantId(), (OtaPackageId) entityId)); + return getTenantAsync(ctx, ctx.getOtaPackageService().findOtaPackageInfoByIdAsync(ctx.getTenantId(), (OtaPackageId) entityId)); case ASSET_PROFILE: - return getTenantAsync(Futures.immediateFuture(ctx.getAssetProfileCache().get(ctx.getTenantId(), (AssetProfileId) entityId))); + return getTenantAsync(ctx, Futures.immediateFuture(ctx.getAssetProfileCache().get(ctx.getTenantId(), (AssetProfileId) entityId))); case DEVICE_PROFILE: - return getTenantAsync(Futures.immediateFuture(ctx.getDeviceProfileCache().get(ctx.getTenantId(), (DeviceProfileId) entityId))); + return getTenantAsync(ctx, Futures.immediateFuture(ctx.getDeviceProfileCache().get(ctx.getTenantId(), (DeviceProfileId) entityId))); default: return Futures.immediateFailedFuture(new TbNodeException("Unexpected entityId EntityType " + entityId.getEntityType())); } } - private static ListenableFuture getTenantAsync(ListenableFuture future) { - return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getTenantId()) - : Futures.immediateFuture(null), MoreExecutors.directExecutor()); + private static ListenableFuture getTenantAsync(TbContext ctx, ListenableFuture future) { + return Futures.transformAsync(future, in -> { + return in != null ? Futures.immediateFuture(in.getTenantId()) + : Futures.immediateFuture(null); + }, ctx.getDbCallbackExecutor()); } } From 6edec4a94cf554c28cadc37a4337dba411ce4b57 Mon Sep 17 00:00:00 2001 From: Yuriy Lytvynchuk Date: Fri, 7 Oct 2022 11:01:55 +0300 Subject: [PATCH 03/11] 1. rename checkTenantEntity -> isTenantEntity 2. Refactor code --- .../actors/ruleChain/DefaultTbContext.java | 2 +- .../rule/engine/api/TbContext.java | 3 +- .../rule/engine/debug/TbMsgGeneratorNode.java | 2 +- .../engine/filter/TbCheckRelationNode.java | 2 +- .../util/EntitiesTenantIdAsyncLoader.java | 38 ++++++++++--------- 5 files changed, 24 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 0e557ea76c..239466fe81 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -705,7 +705,7 @@ class DefaultTbContext implements TbContext { return metaData; } - public void checkTenantEntity(EntityId entityId) { + public void isTenantEntity(EntityId entityId) { try { TenantId entityTenantId = EntitiesTenantIdAsyncLoader.findEntityIdAsync(this, entityId).get(); if (entityTenantId == null) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 1c2cf92256..5892633649 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -31,7 +31,6 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; @@ -193,7 +192,7 @@ public interface TbContext { boolean isLocalEntity(EntityId entityId); - void checkTenantEntity(EntityId entityId); + void isTenantEntity(EntityId entityId); RuleNodeId getSelfId(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index a7752aa116..12614f191c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -77,7 +77,7 @@ public class TbMsgGeneratorNode implements TbNode { this.currentMsgCount = 0; if (!StringUtils.isEmpty(config.getOriginatorId())) { originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); - ctx.checkTenantEntity(originatorId); + ctx.isTenantEntity(originatorId); } else { originatorId = ctx.getSelfId(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java index 2dacc1a458..c6707cbd1e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java @@ -62,7 +62,7 @@ public class TbCheckRelationNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbCheckRelationNodeConfiguration.class); if (config.isCheckForSingleEntity()) { this.singleEntityId = EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId()); - ctx.checkTenantEntity(singleEntityId); + ctx.isTenantEntity(singleEntityId); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java index a8547bdde4..6e403af86b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.util; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.HasTenantId; @@ -37,44 +38,45 @@ import org.thingsboard.server.common.data.id.UserId; public class EntitiesTenantIdAsyncLoader { - public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId entityId) { + public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { - switch (entityId.getEntityType()) { + ListeningExecutor executor = ctx.getDbCallbackExecutor(); + switch (original.getEntityType()) { case TENANT: - return Futures.immediateFuture((TenantId) entityId); + return Futures.immediateFuture((TenantId) original); case CUSTOMER: - return getTenantAsync(ctx, ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), (CustomerId) entityId)); + return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), (CustomerId) original), executor); case USER: - return getTenantAsync(ctx, ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), (UserId) entityId)); + return getTenantAsync(ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), (UserId) original), executor); case ASSET: - return getTenantAsync(ctx, ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) entityId)); + return getTenantAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) original), executor); case DEVICE: - return getTenantAsync(ctx, ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) entityId)); + return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) original), executor); case ALARM: - return getTenantAsync(ctx, ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) entityId)); + return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) original), executor); case RULE_CHAIN: - return getTenantAsync(ctx, ctx.getRuleChainService().findRuleChainByIdAsync(ctx.getTenantId(), (RuleChainId) entityId)); + return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync(ctx.getTenantId(), (RuleChainId) original), executor); case ENTITY_VIEW: - return getTenantAsync(ctx, ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) entityId)); + return getTenantAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) original), executor); case DASHBOARD: - return getTenantAsync(ctx, ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), (DashboardId) entityId)); + return getTenantAsync(ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), (DashboardId) original), executor); case EDGE: - return getTenantAsync(ctx, ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) entityId)); + return getTenantAsync(ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) original), executor); case OTA_PACKAGE: - return getTenantAsync(ctx, ctx.getOtaPackageService().findOtaPackageInfoByIdAsync(ctx.getTenantId(), (OtaPackageId) entityId)); + return getTenantAsync(ctx.getOtaPackageService().findOtaPackageInfoByIdAsync(ctx.getTenantId(), (OtaPackageId) original), executor); case ASSET_PROFILE: - return getTenantAsync(ctx, Futures.immediateFuture(ctx.getAssetProfileCache().get(ctx.getTenantId(), (AssetProfileId) entityId))); + return getTenantAsync(Futures.immediateFuture(ctx.getAssetProfileCache().get(ctx.getTenantId(), (AssetProfileId) original)), executor); case DEVICE_PROFILE: - return getTenantAsync(ctx, Futures.immediateFuture(ctx.getDeviceProfileCache().get(ctx.getTenantId(), (DeviceProfileId) entityId))); + return getTenantAsync(Futures.immediateFuture(ctx.getDeviceProfileCache().get(ctx.getTenantId(), (DeviceProfileId) original)), executor); default: - return Futures.immediateFailedFuture(new TbNodeException("Unexpected entityId EntityType " + entityId.getEntityType())); + return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original.getEntityType())); } } - private static ListenableFuture getTenantAsync(TbContext ctx, ListenableFuture future) { + private static ListenableFuture getTenantAsync(ListenableFuture future, ListeningExecutor executor) { return Futures.transformAsync(future, in -> { return in != null ? Futures.immediateFuture(in.getTenantId()) : Futures.immediateFuture(null); - }, ctx.getDbCallbackExecutor()); + }, executor); } } From 030981cedf01daf0b10c0a8dd2317e0d585419a7 Mon Sep 17 00:00:00 2001 From: Yuriy Lytvynchuk Date: Fri, 7 Oct 2022 11:05:51 +0300 Subject: [PATCH 04/11] delete empty row --- .../rule/engine/util/EntitiesTenantIdAsyncLoader.java | 1 - 1 file changed, 1 deletion(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java index 6e403af86b..752ebf5143 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -39,7 +39,6 @@ import org.thingsboard.server.common.data.id.UserId; public class EntitiesTenantIdAsyncLoader { public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { - ListeningExecutor executor = ctx.getDbCallbackExecutor(); switch (original.getEntityType()) { case TENANT: From 15683504fdd0221c2a2708c96348c22e929bb08b Mon Sep 17 00:00:00 2001 From: Yuriy Lytvynchuk Date: Wed, 2 Nov 2022 15:22:33 +0200 Subject: [PATCH 05/11] refactor & update logic --- .../server/actors/ActorSystemContext.java | 12 + .../actors/ruleChain/DefaultTbContext.java | 23 ++ .../DefaultTbApiUsageStateService.java | 5 + .../apiusage/TbApiUsageStateService.java | 3 +- .../rpc/DefaultTbRuleEngineRpcService.java | 14 +- .../api/RuleEngineApiUsageStateService.java | 26 ++ .../rule/engine/api/RuleEngineRpcService.java | 5 + .../rule/engine/api/TbContext.java | 10 + .../util/EntitiesTenantIdAsyncLoader.java | 98 +++-- .../util/EntitiesTenantIdAsyncLoaderTest.java | 347 ++++++++++++++++++ 10 files changed, 520 insertions(+), 23 deletions(-) create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineApiUsageStateService.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoaderTest.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index f4e7eb2463..6fbc2908bd 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -75,6 +75,8 @@ import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.dao.widget.WidgetTypeService; +import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; @@ -377,6 +379,16 @@ public class ActorSystemContext { @Getter private QueueService queueService; + @Lazy + @Autowired(required = false) + @Getter + private WidgetsBundleService widgetsBundleService; + + @Lazy + @Autowired(required = false) + @Getter + private WidgetTypeService widgetTypeService; + @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter private long maxConcurrentSessionsPerDevice; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 3eec2e942b..7d5ad5744f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -25,6 +25,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; +import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService; import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache; import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; import org.thingsboard.rule.engine.api.RuleEngineRpcService; @@ -88,6 +89,8 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.dao.widget.WidgetTypeService; +import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; @@ -719,6 +722,26 @@ class DefaultTbContext implements TbContext { return mainCtx.getTenantProfileCache().get(getTenantId()); } + @Override + public TenantProfile getTenantProfile(TenantId tenantId) { + return mainCtx.getTenantProfileCache().get(tenantId); + } + + @Override + public WidgetsBundleService getWidgetBundleService() { + return mainCtx.getWidgetsBundleService(); + } + + @Override + public WidgetTypeService getWidgetTypeService() { + return mainCtx.getWidgetTypeService(); + } + + @Override + public RuleEngineApiUsageStateService getRuleEngineApiUsageStateService() { + return mainCtx.getApiUsageStateService(); + } + private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("ruleNodeId", ruleNodeId.toString()); diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index f6a590eee2..b8ca8b37c0 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -395,6 +395,11 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService } } + @Override + public ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id) { + return apiUsageStateService.findApiUsageStateById(tenantId, id); + } + private interface StateChecker { boolean check(long threshold, long warnThreshold, long value); } diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java index e6d8ba00ae..1737915f50 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.apiusage; import org.springframework.context.ApplicationListener; +import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; @@ -25,7 +26,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceM import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; -public interface TbApiUsageStateService extends ApplicationListener { +public interface TbApiUsageStateService extends RuleEngineApiUsageStateService, ApplicationListener { void process(TbProtoQueueMsg msg, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 493ab7ef43..96cc19ef70 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -15,10 +15,14 @@ */ package org.thingsboard.server.service.rpc; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.id.RpcId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse; @@ -27,6 +31,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; +import org.thingsboard.server.dao.rpc.RpcService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; @@ -52,6 +57,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi private final PartitionService partitionService; private final TbClusterService clusterService; private final TbServiceInfoProvider serviceInfoProvider; + private final RpcService rpcService; private final ConcurrentMap> toDeviceRpcRequests = new ConcurrentHashMap<>(); @@ -61,10 +67,11 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi public DefaultTbRuleEngineRpcService(PartitionService partitionService, TbClusterService clusterService, - TbServiceInfoProvider serviceInfoProvider) { + TbServiceInfoProvider serviceInfoProvider, RpcService rpcService) { this.partitionService = partitionService; this.clusterService = clusterService; this.serviceInfoProvider = serviceInfoProvider; + this.rpcService = rpcService; } @Autowired(required = false) @@ -119,6 +126,11 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi }); } + @Override + public ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId id) { + return rpcService.findRpcByIdAsync(tenantId, id); + } + @Override public void processRpcResponseFromDevice(FromDeviceRpcResponse response) { log.trace("[{}] Received response to server-side RPC request from Core RPC Service", response.getId()); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineApiUsageStateService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineApiUsageStateService.java new file mode 100644 index 0000000000..03734ffcad --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineApiUsageStateService.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.id.ApiUsageStateId; +import org.thingsboard.server.common.data.id.TenantId; + +public interface RuleEngineApiUsageStateService { + + ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id); + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java index abb5ace0f8..aec2cd4e00 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java @@ -15,7 +15,11 @@ */ package org.thingsboard.rule.engine.api; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RpcId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rpc.Rpc; import java.util.UUID; import java.util.function.Consumer; @@ -29,4 +33,5 @@ public interface RuleEngineRpcService { void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest request, Consumer consumer); + ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId id); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 01f61f1e4f..7c539d50f0 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -61,6 +61,8 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.dao.widget.WidgetTypeService; +import org.thingsboard.server.dao.widget.WidgetsBundleService; import java.util.List; import java.util.Set; @@ -305,4 +307,12 @@ public interface TbContext { void removeListeners(); TenantProfile getTenantProfile(); + + TenantProfile getTenantProfile(TenantId tenantId); + + WidgetsBundleService getWidgetBundleService(); + + WidgetTypeService getWidgetTypeService(); + + RuleEngineApiUsageStateService getRuleEngineApiUsageStateService(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java index 752ebf5143..3401227987 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -20,8 +20,10 @@ import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.id.AlarmId; +import org.thingsboard.server.common.data.id.ApiUsageStateId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CustomerId; @@ -32,50 +34,104 @@ import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.OtaPackageId; +import org.thingsboard.server.common.data.id.QueueId; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TbResourceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.id.WidgetTypeId; +import org.thingsboard.server.common.data.id.WidgetsBundleId; +import org.thingsboard.server.common.data.rule.RuleNode; + +import java.util.UUID; public class EntitiesTenantIdAsyncLoader { public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { - ListeningExecutor executor = ctx.getDbCallbackExecutor(); - switch (original.getEntityType()) { + ListenableFuture hasTenantId; + UUID id = original.getId(); + EntityType entityType = original.getEntityType(); + TenantId tenantId = ctx.getTenantId(); + switch (entityType) { case TENANT: - return Futures.immediateFuture((TenantId) original); + return Futures.immediateFuture(new TenantId(id)); case CUSTOMER: - return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), (CustomerId) original), executor); + hasTenantId = ctx.getCustomerService().findCustomerByIdAsync(tenantId, new CustomerId(id)); + break; case USER: - return getTenantAsync(ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), (UserId) original), executor); + hasTenantId = ctx.getUserService().findUserByIdAsync(tenantId, new UserId(id)); + break; case ASSET: - return getTenantAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) original), executor); + hasTenantId = ctx.getAssetService().findAssetByIdAsync(tenantId, new AssetId(id)); + break; case DEVICE: - return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) original), executor); + hasTenantId = ctx.getDeviceService().findDeviceByIdAsync(tenantId, new DeviceId(id)); + break; case ALARM: - return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) original), executor); + hasTenantId = ctx.getAlarmService().findAlarmByIdAsync(tenantId, new AlarmId(id)); + break; case RULE_CHAIN: - return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync(ctx.getTenantId(), (RuleChainId) original), executor); + hasTenantId = ctx.getRuleChainService().findRuleChainByIdAsync(tenantId, new RuleChainId(id)); + break; case ENTITY_VIEW: - return getTenantAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) original), executor); + hasTenantId = ctx.getEntityViewService().findEntityViewByIdAsync(tenantId, new EntityViewId(id)); + break; case DASHBOARD: - return getTenantAsync(ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), (DashboardId) original), executor); + hasTenantId = ctx.getDashboardService().findDashboardByIdAsync(tenantId, new DashboardId(id)); + break; case EDGE: - return getTenantAsync(ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) original), executor); + hasTenantId = ctx.getEdgeService().findEdgeByIdAsync(tenantId, new EdgeId(id)); + break; case OTA_PACKAGE: - return getTenantAsync(ctx.getOtaPackageService().findOtaPackageInfoByIdAsync(ctx.getTenantId(), (OtaPackageId) original), executor); + hasTenantId = ctx.getOtaPackageService().findOtaPackageInfoByIdAsync(tenantId, new OtaPackageId(id)); + break; case ASSET_PROFILE: - return getTenantAsync(Futures.immediateFuture(ctx.getAssetProfileCache().get(ctx.getTenantId(), (AssetProfileId) original)), executor); + hasTenantId = Futures.immediateFuture(ctx.getAssetProfileCache().get(tenantId, new AssetProfileId(id))); + break; case DEVICE_PROFILE: - return getTenantAsync(Futures.immediateFuture(ctx.getDeviceProfileCache().get(ctx.getTenantId(), (DeviceProfileId) original)), executor); + hasTenantId = Futures.immediateFuture(ctx.getDeviceProfileCache().get(tenantId, new DeviceProfileId(id))); + break; + case WIDGET_TYPE: + hasTenantId = Futures.immediateFuture(ctx.getWidgetTypeService().findWidgetTypeById(tenantId, new WidgetTypeId(id))); + break; + case WIDGETS_BUNDLE: + hasTenantId = Futures.immediateFuture(ctx.getWidgetBundleService().findWidgetsBundleById(tenantId, new WidgetsBundleId(id))); + break; + case RPC: + hasTenantId = ctx.getRpcService().findRpcByIdAsync(ctx.getTenantId(), new RpcId(id)); + break; + case QUEUE: + hasTenantId = Futures.immediateFuture(ctx.getQueueService().findQueueById(tenantId, new QueueId(id))); + break; + case API_USAGE_STATE: + hasTenantId = Futures.immediateFuture(ctx.getRuleEngineApiUsageStateService().findApiUsageStateById(tenantId, new ApiUsageStateId(id))); + break; + case TB_RESOURCE: + hasTenantId = ctx.getResourceService().findResourceInfoByIdAsync(tenantId, new TbResourceId(id)); + break; + case RULE_NODE: + hasTenantId = null; + RuleNode ruleNode = ctx.getRuleChainService().findRuleNodeById(tenantId, new RuleNodeId(id)); + if (ruleNode != null) { + return Futures.immediateFuture(tenantId); + } + break; + case TENANT_PROFILE: + hasTenantId = null; + if (ctx.getTenantProfile() == ctx.getTenantProfile(new TenantId(id))) { + return Futures.immediateFuture(tenantId); + } + break; default: - return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original.getEntityType())); + hasTenantId = Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original.getEntityType())); } + return getTenantIdAsync(hasTenantId, ctx.getDbCallbackExecutor()); } - private static ListenableFuture getTenantAsync(ListenableFuture future, ListeningExecutor executor) { - return Futures.transformAsync(future, in -> { - return in != null ? Futures.immediateFuture(in.getTenantId()) - : Futures.immediateFuture(null); - }, executor); + private static ListenableFuture getTenantIdAsync(ListenableFuture future, ListeningExecutor executor) { + return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getTenantId()) + : Futures.immediateFuture(null), executor); } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoaderTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoaderTest.java new file mode 100644 index 0000000000..885fa7ec40 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoaderTest.java @@ -0,0 +1,347 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import com.google.common.util.concurrent.Futures; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.common.util.AbstractListeningExecutor; +import org.thingsboard.rule.engine.api.RuleEngineAlarmService; +import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService; +import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache; +import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; +import org.thingsboard.rule.engine.api.RuleEngineRpcService; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.Dashboard; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.OtaPackage; +import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.asset.AssetProfile; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.queue.Queue; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.data.widget.WidgetType; +import org.thingsboard.server.common.data.widget.WidgetsBundle; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.customer.CustomerService; +import org.thingsboard.server.dao.dashboard.DashboardService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.edge.EdgeService; +import org.thingsboard.server.dao.entityview.EntityViewService; +import org.thingsboard.server.dao.ota.OtaPackageService; +import org.thingsboard.server.dao.queue.QueueService; +import org.thingsboard.server.dao.resource.ResourceService; +import org.thingsboard.server.dao.rule.RuleChainService; +import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.dao.widget.WidgetTypeService; +import org.thingsboard.server.dao.widget.WidgetsBundleService; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class EntitiesTenantIdAsyncLoaderTest { + + @Mock + private TbContext ctx; + @Mock + private CustomerService customerService; + @Mock + private UserService userService; + @Mock + private AssetService assetService; + @Mock + private DeviceService deviceService; + @Mock + private RuleEngineAlarmService alarmService; + @Mock + private RuleChainService ruleChainService; + @Mock + private EntityViewService entityViewService; + @Mock + private DashboardService dashboardService; + @Mock + private EdgeService edgeService; + @Mock + private OtaPackageService otaPackageService; + @Mock + private RuleEngineAssetProfileCache assetProfileCache; + @Mock + private RuleEngineDeviceProfileCache deviceProfileCache; + @Mock + private WidgetTypeService widgetTypeService; + @Mock + private WidgetsBundleService widgetsBundleService; + @Mock + private QueueService queueService; + @Mock + private ResourceService resourceService; + @Mock + private RuleEngineRpcService rpcService; + @Mock + private RuleEngineApiUsageStateService ruleEngineApiUsageStateService; + + private TenantId tenantId; + private AbstractListeningExecutor dbExecutor; + + @Before + public void before() { + dbExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 3; + } + }; + dbExecutor.init(); + this.tenantId = new TenantId(UUID.randomUUID()); + + when(ctx.getTenantId()).thenReturn(tenantId); + when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor); + + for (EntityType entityType : EntityType.values()) { + initMocks(entityType, tenantId); + } + } + + @After + public void after() { + dbExecutor.destroy(); + } + + private void initMocks(EntityType entityType, TenantId tenantId) { + switch (entityType) { + case TENANT: + break; + case CUSTOMER: + Customer customer = new Customer(); + customer.setTenantId(tenantId); + + when(ctx.getCustomerService()).thenReturn(customerService); + doReturn(Futures.immediateFuture(customer)).when(customerService).findCustomerByIdAsync(eq(tenantId), any()); + + break; + case USER: + User user = new User(); + user.setTenantId(tenantId); + + when(ctx.getUserService()).thenReturn(userService); + doReturn(Futures.immediateFuture(user)).when(userService).findUserByIdAsync(eq(tenantId), any()); + + break; + case ASSET: + Asset asset = new Asset(); + asset.setTenantId(tenantId); + + when(ctx.getAssetService()).thenReturn(assetService); + doReturn(Futures.immediateFuture(asset)).when(assetService).findAssetByIdAsync(eq(tenantId), any()); + + break; + case DEVICE: + Device device = new Device(); + device.setTenantId(tenantId); + + when(ctx.getDeviceService()).thenReturn(deviceService); + doReturn(Futures.immediateFuture(device)).when(deviceService).findDeviceByIdAsync(eq(tenantId), any()); + + break; + case ALARM: + Alarm alarm = new Alarm(); + alarm.setTenantId(tenantId); + + when(ctx.getAlarmService()).thenReturn(alarmService); + doReturn(Futures.immediateFuture(alarm)).when(alarmService).findAlarmByIdAsync(eq(tenantId), any()); + + break; + case RULE_CHAIN: + RuleChain ruleChain = new RuleChain(); + ruleChain.setTenantId(tenantId); + + when(ctx.getRuleChainService()).thenReturn(ruleChainService); + doReturn(Futures.immediateFuture(ruleChain)).when(ruleChainService).findRuleChainByIdAsync(eq(tenantId), any()); + + break; + case ENTITY_VIEW: + EntityView entityView = new EntityView(); + entityView.setTenantId(tenantId); + + when(ctx.getEntityViewService()).thenReturn(entityViewService); + doReturn(Futures.immediateFuture(entityView)).when(entityViewService).findEntityViewByIdAsync(eq(tenantId), any()); + + break; + case DASHBOARD: + Dashboard dashboard = new Dashboard(); + dashboard.setTenantId(tenantId); + + when(ctx.getDashboardService()).thenReturn(dashboardService); + doReturn(Futures.immediateFuture(dashboard)).when(dashboardService).findDashboardByIdAsync(eq(tenantId), any()); + + break; + case EDGE: + Edge edge = new Edge(); + edge.setTenantId(tenantId); + + when(ctx.getEdgeService()).thenReturn(edgeService); + doReturn(Futures.immediateFuture(edge)).when(edgeService).findEdgeByIdAsync(eq(tenantId), any()); + + break; + case OTA_PACKAGE: + OtaPackage otaPackage = new OtaPackage(); + otaPackage.setTenantId(tenantId); + + when(ctx.getOtaPackageService()).thenReturn(otaPackageService); + doReturn(Futures.immediateFuture(otaPackage)).when(otaPackageService).findOtaPackageInfoByIdAsync(eq(tenantId), any()); + + break; + case ASSET_PROFILE: + AssetProfile assetProfile = new AssetProfile(); + assetProfile.setTenantId(tenantId); + + when(ctx.getAssetProfileCache()).thenReturn(assetProfileCache); + doReturn(assetProfile).when(assetProfileCache).get(eq(tenantId), any(AssetProfileId.class)); + + break; + case DEVICE_PROFILE: + DeviceProfile deviceProfile = new DeviceProfile(); + deviceProfile.setTenantId(tenantId); + + when(ctx.getDeviceProfileCache()).thenReturn(deviceProfileCache); + doReturn(deviceProfile).when(deviceProfileCache).get(eq(tenantId), any(DeviceProfileId.class)); + + break; + case WIDGET_TYPE: + WidgetType widgetType = new WidgetType(); + widgetType.setTenantId(tenantId); + + when(ctx.getWidgetTypeService()).thenReturn(widgetTypeService); + doReturn(widgetType).when(widgetTypeService).findWidgetTypeById(eq(tenantId), any()); + + break; + case WIDGETS_BUNDLE: + WidgetsBundle widgetsBundle = new WidgetsBundle(); + widgetsBundle.setTenantId(tenantId); + + when(ctx.getWidgetBundleService()).thenReturn(widgetsBundleService); + doReturn(widgetsBundle).when(widgetsBundleService).findWidgetsBundleById(eq(tenantId), any()); + + break; + case RPC: + Rpc rps = new Rpc(); + rps.setTenantId(tenantId); + + when(ctx.getRpcService()).thenReturn(rpcService); + doReturn(Futures.immediateFuture(rps)).when(rpcService).findRpcByIdAsync(eq(tenantId), any()); + + break; + case QUEUE: + Queue queue = new Queue(); + queue.setTenantId(tenantId); + + when(ctx.getQueueService()).thenReturn(queueService); + doReturn(queue).when(queueService).findQueueById(eq(tenantId), any()); + + break; + case API_USAGE_STATE: + ApiUsageState apiUsageState = new ApiUsageState(); + apiUsageState.setTenantId(tenantId); + + when(ctx.getRuleEngineApiUsageStateService()).thenReturn(ruleEngineApiUsageStateService); + doReturn(apiUsageState).when(ruleEngineApiUsageStateService).findApiUsageStateById(eq(tenantId), any()); + + break; + case TB_RESOURCE: + TbResource tbResource = new TbResource(); + tbResource.setTenantId(tenantId); + + when(ctx.getResourceService()).thenReturn(resourceService); + doReturn(Futures.immediateFuture(tbResource)).when(resourceService).findResourceInfoByIdAsync(eq(tenantId), any()); + + break; + case RULE_NODE: + RuleNode ruleNode = new RuleNode(); + + when(ctx.getRuleChainService()).thenReturn(ruleChainService); + doReturn(ruleNode).when(ruleChainService).findRuleNodeById(eq(tenantId), any()); + + break; + case TENANT_PROFILE: + TenantProfile tenantProfile = new TenantProfile(); + + when(ctx.getTenantProfile()).thenReturn(tenantProfile); + when(ctx.getTenantProfile(any(TenantId.class))).thenReturn(tenantProfile); + + break; + default: + throw new RuntimeException("Unexpected original EntityType " + entityType); + } + + } + + private EntityId getEntityId(EntityType entityType) { + return EntityIdFactory.getByTypeAndUuid(entityType, UUID.randomUUID()); + } + + private void checkTenant(TenantId checkTenantId, boolean equals) throws ExecutionException, InterruptedException { + for (EntityType entityType : EntityType.values()) { + EntityId entityId = EntityType.TENANT.equals(entityType) ? tenantId : getEntityId(entityType); + TenantId targetTenantId = EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, entityId).get(); + + Assert.assertNotNull(targetTenantId); + String msg = "Check entity type <" + entityType.name() + ">:"; + if (equals) { + Assert.assertEquals(msg, targetTenantId, checkTenantId); + } else { + Assert.assertNotEquals(msg, targetTenantId, checkTenantId); + } + } + } + + @Test + public void test_findEntityIdAsync_current_tenant() throws ExecutionException, InterruptedException { + checkTenant(tenantId, true); + } + + @Test + public void test_findEntityIdAsync_other_tenant() throws ExecutionException, InterruptedException { + checkTenant(new TenantId(UUID.randomUUID()), false); + } + +} From 8b9eb433223aff2c7f75f36619b262e24dfb91af Mon Sep 17 00:00:00 2001 From: Yuriy Lytvynchuk Date: Thu, 3 Nov 2022 14:06:42 +0200 Subject: [PATCH 06/11] refactor code --- .../server/actors/ruleChain/DefaultTbContext.java | 5 ----- .../org/thingsboard/rule/engine/api/TbContext.java | 2 -- .../engine/util/EntitiesTenantIdAsyncLoader.java | 12 +++++++----- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 7d5ad5744f..ac1a9b6758 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -722,11 +722,6 @@ class DefaultTbContext implements TbContext { return mainCtx.getTenantProfileCache().get(getTenantId()); } - @Override - public TenantProfile getTenantProfile(TenantId tenantId) { - return mainCtx.getTenantProfileCache().get(tenantId); - } - @Override public WidgetsBundleService getWidgetBundleService() { return mainCtx.getWidgetsBundleService(); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 7c539d50f0..7578ed1009 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -308,8 +308,6 @@ public interface TbContext { TenantProfile getTenantProfile(); - TenantProfile getTenantProfile(TenantId tenantId); - WidgetsBundleService getWidgetBundleService(); WidgetTypeService getWidgetTypeService(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java index 3401227987..6eb4355dd3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -100,7 +100,7 @@ public class EntitiesTenantIdAsyncLoader { hasTenantId = Futures.immediateFuture(ctx.getWidgetBundleService().findWidgetsBundleById(tenantId, new WidgetsBundleId(id))); break; case RPC: - hasTenantId = ctx.getRpcService().findRpcByIdAsync(ctx.getTenantId(), new RpcId(id)); + hasTenantId = ctx.getRpcService().findRpcByIdAsync(tenantId, new RpcId(id)); break; case QUEUE: hasTenantId = Futures.immediateFuture(ctx.getQueueService().findQueueById(tenantId, new QueueId(id))); @@ -112,16 +112,18 @@ public class EntitiesTenantIdAsyncLoader { hasTenantId = ctx.getResourceService().findResourceInfoByIdAsync(tenantId, new TbResourceId(id)); break; case RULE_NODE: - hasTenantId = null; RuleNode ruleNode = ctx.getRuleChainService().findRuleNodeById(tenantId, new RuleNodeId(id)); if (ruleNode != null) { - return Futures.immediateFuture(tenantId); + hasTenantId = ctx.getRuleChainService().findRuleChainByIdAsync(tenantId, ruleNode.getRuleChainId()); + } else { + hasTenantId = Futures.immediateFuture(null); } break; case TENANT_PROFILE: - hasTenantId = null; - if (ctx.getTenantProfile() == ctx.getTenantProfile(new TenantId(id))) { + if (ctx.getTenantProfile().getId().equals(id)) { return Futures.immediateFuture(tenantId); + } else { + hasTenantId = Futures.immediateFuture(null); } break; default: From 646504ac50aa43d753b3aff7fa639096f2c79bbc Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 3 Nov 2022 15:51:18 +0200 Subject: [PATCH 07/11] Tenant Id Loader fixes --- .../actors/ruleChain/DefaultTbContext.java | 19 +-- .../rpc/DefaultTbRuleEngineRpcService.java | 4 +- .../engine/api/RuleEngineAlarmService.java | 2 + .../rule/engine/api/RuleEngineRpcService.java | 2 +- .../rule/engine/api/TbContext.java | 2 + .../rule/engine/debug/TbMsgGeneratorNode.java | 2 +- .../engine/filter/TbCheckRelationNode.java | 2 +- .../util/EntitiesTenantIdAsyncLoader.java | 139 ------------------ .../rule/engine/util/TenantIdLoader.java | 132 +++++++++++++++++ ...oaderTest.java => TenantIdLoaderTest.java} | 58 ++++---- 10 files changed, 178 insertions(+), 184 deletions(-) delete mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TenantIdLoader.java rename rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/{EntitiesTenantIdAsyncLoaderTest.java => TenantIdLoaderTest.java} (82%) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 3e4e459192..dfc248f79b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -36,7 +36,7 @@ import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; -import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader; +import org.thingsboard.rule.engine.util.TenantIdLoader; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.cluster.TbClusterService; @@ -776,17 +776,10 @@ class DefaultTbContext implements TbContext { return metaData; } - public void isTenantEntity(EntityId entityId) { - try { - TenantId entityTenantId = EntitiesTenantIdAsyncLoader.findEntityIdAsync(this, entityId).get(); - if (entityTenantId == null) { - throw new RuntimeException("Entity with id '" + entityId + "'not found!"); - } - if (!entityTenantId.equals(this.getTenantId())) { - throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant."); - } - } catch (Exception e) { - throw new RuntimeException(e); + @Override + public void checkTenantEntity(EntityId entityId) { + if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) { + throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant."); } } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 96cc19ef70..2acedb9107 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -127,8 +127,8 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi } @Override - public ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId id) { - return rpcService.findRpcByIdAsync(tenantId, id); + public Rpc findRpcById(TenantId tenantId, RpcId id) { + return rpcService.findById(tenantId, id); } @Override diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineAlarmService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineAlarmService.java index 69c705c827..21bab255f2 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineAlarmService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineAlarmService.java @@ -51,6 +51,8 @@ public interface RuleEngineAlarmService { ListenableFuture findAlarmByIdAsync(TenantId tenantId, AlarmId alarmId); + Alarm findAlarmById(TenantId tenantId, AlarmId alarmId); + ListenableFuture findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type); ListenableFuture findAlarmInfoByIdAsync(TenantId tenantId, AlarmId alarmId); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java index aec2cd4e00..c2f06c14da 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java @@ -33,5 +33,5 @@ public interface RuleEngineRpcService { void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest request, Consumer consumer); - ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId id); + Rpc findRpcById(TenantId tenantId, RpcId id); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 4a47586f64..9851eead80 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -199,6 +199,8 @@ public interface TbContext { * */ + void checkTenantEntity(EntityId entityId); + boolean isLocalEntity(EntityId entityId); RuleNodeId getSelfId(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index f41ba4d8cf..14825af121 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -78,7 +78,7 @@ public class TbMsgGeneratorNode implements TbNode { this.currentMsgCount = 0; if (!StringUtils.isEmpty(config.getOriginatorId())) { originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); - ctx.isTenantEntity(originatorId); + ctx.checkTenantEntity(originatorId); } else { originatorId = ctx.getSelfId(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java index c6707cbd1e..2dacc1a458 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java @@ -62,7 +62,7 @@ public class TbCheckRelationNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbCheckRelationNodeConfiguration.class); if (config.isCheckForSingleEntity()) { this.singleEntityId = EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId()); - ctx.isTenantEntity(singleEntityId); + ctx.checkTenantEntity(singleEntityId); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java deleted file mode 100644 index 6eb4355dd3..0000000000 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.rule.engine.util; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.common.util.ListeningExecutor; -import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.HasTenantId; -import org.thingsboard.server.common.data.id.AlarmId; -import org.thingsboard.server.common.data.id.ApiUsageStateId; -import org.thingsboard.server.common.data.id.AssetId; -import org.thingsboard.server.common.data.id.AssetProfileId; -import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DashboardId; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; -import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityViewId; -import org.thingsboard.server.common.data.id.OtaPackageId; -import org.thingsboard.server.common.data.id.QueueId; -import org.thingsboard.server.common.data.id.RpcId; -import org.thingsboard.server.common.data.id.RuleChainId; -import org.thingsboard.server.common.data.id.RuleNodeId; -import org.thingsboard.server.common.data.id.TbResourceId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.id.UserId; -import org.thingsboard.server.common.data.id.WidgetTypeId; -import org.thingsboard.server.common.data.id.WidgetsBundleId; -import org.thingsboard.server.common.data.rule.RuleNode; - -import java.util.UUID; - -public class EntitiesTenantIdAsyncLoader { - - public static ListenableFuture findEntityIdAsync(TbContext ctx, EntityId original) { - ListenableFuture hasTenantId; - UUID id = original.getId(); - EntityType entityType = original.getEntityType(); - TenantId tenantId = ctx.getTenantId(); - switch (entityType) { - case TENANT: - return Futures.immediateFuture(new TenantId(id)); - case CUSTOMER: - hasTenantId = ctx.getCustomerService().findCustomerByIdAsync(tenantId, new CustomerId(id)); - break; - case USER: - hasTenantId = ctx.getUserService().findUserByIdAsync(tenantId, new UserId(id)); - break; - case ASSET: - hasTenantId = ctx.getAssetService().findAssetByIdAsync(tenantId, new AssetId(id)); - break; - case DEVICE: - hasTenantId = ctx.getDeviceService().findDeviceByIdAsync(tenantId, new DeviceId(id)); - break; - case ALARM: - hasTenantId = ctx.getAlarmService().findAlarmByIdAsync(tenantId, new AlarmId(id)); - break; - case RULE_CHAIN: - hasTenantId = ctx.getRuleChainService().findRuleChainByIdAsync(tenantId, new RuleChainId(id)); - break; - case ENTITY_VIEW: - hasTenantId = ctx.getEntityViewService().findEntityViewByIdAsync(tenantId, new EntityViewId(id)); - break; - case DASHBOARD: - hasTenantId = ctx.getDashboardService().findDashboardByIdAsync(tenantId, new DashboardId(id)); - break; - case EDGE: - hasTenantId = ctx.getEdgeService().findEdgeByIdAsync(tenantId, new EdgeId(id)); - break; - case OTA_PACKAGE: - hasTenantId = ctx.getOtaPackageService().findOtaPackageInfoByIdAsync(tenantId, new OtaPackageId(id)); - break; - case ASSET_PROFILE: - hasTenantId = Futures.immediateFuture(ctx.getAssetProfileCache().get(tenantId, new AssetProfileId(id))); - break; - case DEVICE_PROFILE: - hasTenantId = Futures.immediateFuture(ctx.getDeviceProfileCache().get(tenantId, new DeviceProfileId(id))); - break; - case WIDGET_TYPE: - hasTenantId = Futures.immediateFuture(ctx.getWidgetTypeService().findWidgetTypeById(tenantId, new WidgetTypeId(id))); - break; - case WIDGETS_BUNDLE: - hasTenantId = Futures.immediateFuture(ctx.getWidgetBundleService().findWidgetsBundleById(tenantId, new WidgetsBundleId(id))); - break; - case RPC: - hasTenantId = ctx.getRpcService().findRpcByIdAsync(tenantId, new RpcId(id)); - break; - case QUEUE: - hasTenantId = Futures.immediateFuture(ctx.getQueueService().findQueueById(tenantId, new QueueId(id))); - break; - case API_USAGE_STATE: - hasTenantId = Futures.immediateFuture(ctx.getRuleEngineApiUsageStateService().findApiUsageStateById(tenantId, new ApiUsageStateId(id))); - break; - case TB_RESOURCE: - hasTenantId = ctx.getResourceService().findResourceInfoByIdAsync(tenantId, new TbResourceId(id)); - break; - case RULE_NODE: - RuleNode ruleNode = ctx.getRuleChainService().findRuleNodeById(tenantId, new RuleNodeId(id)); - if (ruleNode != null) { - hasTenantId = ctx.getRuleChainService().findRuleChainByIdAsync(tenantId, ruleNode.getRuleChainId()); - } else { - hasTenantId = Futures.immediateFuture(null); - } - break; - case TENANT_PROFILE: - if (ctx.getTenantProfile().getId().equals(id)) { - return Futures.immediateFuture(tenantId); - } else { - hasTenantId = Futures.immediateFuture(null); - } - break; - default: - hasTenantId = Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original.getEntityType())); - } - return getTenantIdAsync(hasTenantId, ctx.getDbCallbackExecutor()); - } - - private static ListenableFuture getTenantIdAsync(ListenableFuture future, ListeningExecutor executor) { - return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getTenantId()) - : Futures.immediateFuture(null), executor); - } -} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TenantIdLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TenantIdLoader.java new file mode 100644 index 0000000000..b687246789 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TenantIdLoader.java @@ -0,0 +1,132 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.HasTenantId; +import org.thingsboard.server.common.data.id.AlarmId; +import org.thingsboard.server.common.data.id.ApiUsageStateId; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DashboardId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.OtaPackageId; +import org.thingsboard.server.common.data.id.QueueId; +import org.thingsboard.server.common.data.id.RpcId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TbResourceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.id.WidgetTypeId; +import org.thingsboard.server.common.data.id.WidgetsBundleId; +import org.thingsboard.server.common.data.rule.RuleNode; + +import java.util.UUID; + +public class TenantIdLoader { + + public static TenantId findTenantId(TbContext ctx, EntityId entityId) { + UUID id = entityId.getId(); + EntityType entityType = entityId.getEntityType(); + TenantId ctxTenantId = ctx.getTenantId(); + + HasTenantId tenantEntity; + switch (entityType) { + case TENANT: + return new TenantId(id); + case CUSTOMER: + tenantEntity = ctx.getCustomerService().findCustomerById(ctxTenantId, new CustomerId(id)); + break; + case USER: + tenantEntity = ctx.getUserService().findUserById(ctxTenantId, new UserId(id)); + break; + case ASSET: + tenantEntity = ctx.getAssetService().findAssetById(ctxTenantId, new AssetId(id)); + break; + case DEVICE: + tenantEntity = ctx.getDeviceService().findDeviceById(ctxTenantId, new DeviceId(id)); + break; + case ALARM: + tenantEntity = ctx.getAlarmService().findAlarmById(ctxTenantId, new AlarmId(id)); + break; + case RULE_CHAIN: + tenantEntity = ctx.getRuleChainService().findRuleChainById(ctxTenantId, new RuleChainId(id)); + break; + case ENTITY_VIEW: + tenantEntity = ctx.getEntityViewService().findEntityViewById(ctxTenantId, new EntityViewId(id)); + break; + case DASHBOARD: + tenantEntity = ctx.getDashboardService().findDashboardById(ctxTenantId, new DashboardId(id)); + break; + case EDGE: + tenantEntity = ctx.getEdgeService().findEdgeById(ctxTenantId, new EdgeId(id)); + break; + case OTA_PACKAGE: + tenantEntity = ctx.getOtaPackageService().findOtaPackageInfoById(ctxTenantId, new OtaPackageId(id)); + break; + case ASSET_PROFILE: + tenantEntity = ctx.getAssetProfileCache().get(ctxTenantId, new AssetProfileId(id)); + break; + case DEVICE_PROFILE: + tenantEntity = ctx.getDeviceProfileCache().get(ctxTenantId, new DeviceProfileId(id)); + break; + case WIDGET_TYPE: + tenantEntity = ctx.getWidgetTypeService().findWidgetTypeById(ctxTenantId, new WidgetTypeId(id)); + break; + case WIDGETS_BUNDLE: + tenantEntity = ctx.getWidgetBundleService().findWidgetsBundleById(ctxTenantId, new WidgetsBundleId(id)); + break; + case RPC: + tenantEntity = ctx.getRpcService().findRpcById(ctxTenantId, new RpcId(id)); + break; + case QUEUE: + tenantEntity = ctx.getQueueService().findQueueById(ctxTenantId, new QueueId(id)); + break; + case API_USAGE_STATE: + tenantEntity = ctx.getRuleEngineApiUsageStateService().findApiUsageStateById(ctxTenantId, new ApiUsageStateId(id)); + break; + case TB_RESOURCE: + tenantEntity = ctx.getResourceService().findResourceInfoById(ctxTenantId, new TbResourceId(id)); + break; + case RULE_NODE: + RuleNode ruleNode = ctx.getRuleChainService().findRuleNodeById(ctxTenantId, new RuleNodeId(id)); + if (ruleNode != null) { + tenantEntity = ctx.getRuleChainService().findRuleChainById(ctxTenantId, ruleNode.getRuleChainId()); + } else { + tenantEntity = null; + } + break; + case TENANT_PROFILE: + if (ctx.getTenantProfile().getId().equals(entityId)) { + return ctxTenantId; + } else { + tenantEntity = null; + } + break; + default: + throw new RuntimeException("Unexpected entity type: " + entityId.getEntityType()); + } + return tenantEntity != null ? tenantEntity.getTenantId() : null; + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoaderTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/TenantIdLoaderTest.java similarity index 82% rename from rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoaderTest.java rename to rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/TenantIdLoaderTest.java index 885fa7ec40..00e8a8c6c8 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoaderTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/TenantIdLoaderTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.util; -import com.google.common.util.concurrent.Futures; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,6 +49,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rule.RuleChain; @@ -71,7 +71,6 @@ import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import java.util.UUID; -import java.util.concurrent.ExecutionException; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -79,7 +78,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public class EntitiesTenantIdAsyncLoaderTest { +public class TenantIdLoaderTest { @Mock private TbContext ctx; @@ -121,6 +120,7 @@ public class EntitiesTenantIdAsyncLoaderTest { private RuleEngineApiUsageStateService ruleEngineApiUsageStateService; private TenantId tenantId; + private TenantProfileId tenantProfileId; private AbstractListeningExecutor dbExecutor; @Before @@ -133,9 +133,9 @@ public class EntitiesTenantIdAsyncLoaderTest { }; dbExecutor.init(); this.tenantId = new TenantId(UUID.randomUUID()); + this.tenantProfileId = new TenantProfileId(UUID.randomUUID()); when(ctx.getTenantId()).thenReturn(tenantId); - when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor); for (EntityType entityType : EntityType.values()) { initMocks(entityType, tenantId); @@ -156,7 +156,7 @@ public class EntitiesTenantIdAsyncLoaderTest { customer.setTenantId(tenantId); when(ctx.getCustomerService()).thenReturn(customerService); - doReturn(Futures.immediateFuture(customer)).when(customerService).findCustomerByIdAsync(eq(tenantId), any()); + doReturn(customer).when(customerService).findCustomerById(eq(tenantId), any()); break; case USER: @@ -164,7 +164,7 @@ public class EntitiesTenantIdAsyncLoaderTest { user.setTenantId(tenantId); when(ctx.getUserService()).thenReturn(userService); - doReturn(Futures.immediateFuture(user)).when(userService).findUserByIdAsync(eq(tenantId), any()); + doReturn(user).when(userService).findUserById(eq(tenantId), any()); break; case ASSET: @@ -172,7 +172,7 @@ public class EntitiesTenantIdAsyncLoaderTest { asset.setTenantId(tenantId); when(ctx.getAssetService()).thenReturn(assetService); - doReturn(Futures.immediateFuture(asset)).when(assetService).findAssetByIdAsync(eq(tenantId), any()); + doReturn(asset).when(assetService).findAssetById(eq(tenantId), any()); break; case DEVICE: @@ -180,7 +180,7 @@ public class EntitiesTenantIdAsyncLoaderTest { device.setTenantId(tenantId); when(ctx.getDeviceService()).thenReturn(deviceService); - doReturn(Futures.immediateFuture(device)).when(deviceService).findDeviceByIdAsync(eq(tenantId), any()); + doReturn(device).when(deviceService).findDeviceById(eq(tenantId), any()); break; case ALARM: @@ -188,7 +188,7 @@ public class EntitiesTenantIdAsyncLoaderTest { alarm.setTenantId(tenantId); when(ctx.getAlarmService()).thenReturn(alarmService); - doReturn(Futures.immediateFuture(alarm)).when(alarmService).findAlarmByIdAsync(eq(tenantId), any()); + doReturn(alarm).when(alarmService).findAlarmById(eq(tenantId), any()); break; case RULE_CHAIN: @@ -196,7 +196,7 @@ public class EntitiesTenantIdAsyncLoaderTest { ruleChain.setTenantId(tenantId); when(ctx.getRuleChainService()).thenReturn(ruleChainService); - doReturn(Futures.immediateFuture(ruleChain)).when(ruleChainService).findRuleChainByIdAsync(eq(tenantId), any()); + doReturn(ruleChain).when(ruleChainService).findRuleChainById(eq(tenantId), any()); break; case ENTITY_VIEW: @@ -204,7 +204,7 @@ public class EntitiesTenantIdAsyncLoaderTest { entityView.setTenantId(tenantId); when(ctx.getEntityViewService()).thenReturn(entityViewService); - doReturn(Futures.immediateFuture(entityView)).when(entityViewService).findEntityViewByIdAsync(eq(tenantId), any()); + doReturn(entityView).when(entityViewService).findEntityViewById(eq(tenantId), any()); break; case DASHBOARD: @@ -212,7 +212,7 @@ public class EntitiesTenantIdAsyncLoaderTest { dashboard.setTenantId(tenantId); when(ctx.getDashboardService()).thenReturn(dashboardService); - doReturn(Futures.immediateFuture(dashboard)).when(dashboardService).findDashboardByIdAsync(eq(tenantId), any()); + doReturn(dashboard).when(dashboardService).findDashboardById(eq(tenantId), any()); break; case EDGE: @@ -220,7 +220,7 @@ public class EntitiesTenantIdAsyncLoaderTest { edge.setTenantId(tenantId); when(ctx.getEdgeService()).thenReturn(edgeService); - doReturn(Futures.immediateFuture(edge)).when(edgeService).findEdgeByIdAsync(eq(tenantId), any()); + doReturn(edge).when(edgeService).findEdgeById(eq(tenantId), any()); break; case OTA_PACKAGE: @@ -228,7 +228,7 @@ public class EntitiesTenantIdAsyncLoaderTest { otaPackage.setTenantId(tenantId); when(ctx.getOtaPackageService()).thenReturn(otaPackageService); - doReturn(Futures.immediateFuture(otaPackage)).when(otaPackageService).findOtaPackageInfoByIdAsync(eq(tenantId), any()); + doReturn(otaPackage).when(otaPackageService).findOtaPackageInfoById(eq(tenantId), any()); break; case ASSET_PROFILE: @@ -264,11 +264,11 @@ public class EntitiesTenantIdAsyncLoaderTest { break; case RPC: - Rpc rps = new Rpc(); - rps.setTenantId(tenantId); + Rpc rpc = new Rpc(); + rpc.setTenantId(tenantId); when(ctx.getRpcService()).thenReturn(rpcService); - doReturn(Futures.immediateFuture(rps)).when(rpcService).findRpcByIdAsync(eq(tenantId), any()); + doReturn(rpc).when(rpcService).findRpcById(eq(tenantId), any()); break; case QUEUE: @@ -292,7 +292,7 @@ public class EntitiesTenantIdAsyncLoaderTest { tbResource.setTenantId(tenantId); when(ctx.getResourceService()).thenReturn(resourceService); - doReturn(Futures.immediateFuture(tbResource)).when(resourceService).findResourceInfoByIdAsync(eq(tenantId), any()); + doReturn(tbResource).when(resourceService).findResourceInfoById(eq(tenantId), any()); break; case RULE_NODE: @@ -303,10 +303,9 @@ public class EntitiesTenantIdAsyncLoaderTest { break; case TENANT_PROFILE: - TenantProfile tenantProfile = new TenantProfile(); + TenantProfile tenantProfile = new TenantProfile(tenantProfileId); when(ctx.getTenantProfile()).thenReturn(tenantProfile); - when(ctx.getTenantProfile(any(TenantId.class))).thenReturn(tenantProfile); break; default: @@ -319,12 +318,17 @@ public class EntitiesTenantIdAsyncLoaderTest { return EntityIdFactory.getByTypeAndUuid(entityType, UUID.randomUUID()); } - private void checkTenant(TenantId checkTenantId, boolean equals) throws ExecutionException, InterruptedException { + private void checkTenant(TenantId checkTenantId, boolean equals) { for (EntityType entityType : EntityType.values()) { - EntityId entityId = EntityType.TENANT.equals(entityType) ? tenantId : getEntityId(entityType); - TenantId targetTenantId = EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, entityId).get(); - - Assert.assertNotNull(targetTenantId); + EntityId entityId; + if(EntityType.TENANT.equals(entityType)){ + entityId = tenantId; + } else if(EntityType.TENANT_PROFILE.equals(entityType)){ + entityId = tenantProfileId; + } else { + entityId = getEntityId(entityType); + } + TenantId targetTenantId = TenantIdLoader.findTenantId(ctx, entityId); String msg = "Check entity type <" + entityType.name() + ">:"; if (equals) { Assert.assertEquals(msg, targetTenantId, checkTenantId); @@ -335,12 +339,12 @@ public class EntitiesTenantIdAsyncLoaderTest { } @Test - public void test_findEntityIdAsync_current_tenant() throws ExecutionException, InterruptedException { + public void test_findEntityIdAsync_current_tenant() { checkTenant(tenantId, true); } @Test - public void test_findEntityIdAsync_other_tenant() throws ExecutionException, InterruptedException { + public void test_findEntityIdAsync_other_tenant() { checkTenant(new TenantId(UUID.randomUUID()), false); } From 2f83810fe4aaae8cae7e1048b0e423abc45bd0f5 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Fri, 4 Nov 2022 14:54:56 +0200 Subject: [PATCH 08/11] fix compilation error in alarm service & code cleanup --- .../server/actors/ruleChain/DefaultTbContext.java | 2 +- .../service/apiusage/TbApiUsageStateService.java | 3 --- .../service/rpc/DefaultTbRuleEngineRpcService.java | 4 ++-- .../telemetry/DefaultAlarmSubscriptionService.java | 10 ++++++++++ .../rule/engine/api/RuleEngineRpcService.java | 2 -- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index dfc248f79b..677a9c2641 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java index 668dc288a0..5f01d34877 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.apiusage; import org.springframework.context.ApplicationListener; import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService; -import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; @@ -31,8 +30,6 @@ public interface TbApiUsageStateService extends TbApiUsageStateClient, RuleEngin void process(TbProtoQueueMsg msg, TbCallback callback); - ApiUsageState getApiUsageState(TenantId tenantId); - void onTenantProfileUpdate(TenantProfileId tenantProfileId); void onTenantUpdate(TenantId tenantId); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 2acedb9107..c0f6cc750f 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.rpc; -import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -67,7 +66,8 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi public DefaultTbRuleEngineRpcService(PartitionService partitionService, TbClusterService clusterService, - TbServiceInfoProvider serviceInfoProvider, RpcService rpcService) { + TbServiceInfoProvider serviceInfoProvider, + RpcService rpcService) { this.partitionService = partitionService; this.clusterService = clusterService; this.serviceInfoProvider = serviceInfoProvider; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java index 9c66d14011..d0279c1a87 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java @@ -52,6 +52,7 @@ import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.ExecutionException; /** * Created by ashvayka on 27.03.18. @@ -129,6 +130,15 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService return alarmService.findAlarmByIdAsync(tenantId, alarmId); } + @Override + public Alarm findAlarmById(TenantId tenantId, AlarmId alarmId) { + try { + return alarmService.findAlarmByIdAsync(tenantId, alarmId).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + @Override public ListenableFuture findAlarmInfoByIdAsync(TenantId tenantId, AlarmId alarmId) { return alarmService.findAlarmInfoByIdAsync(tenantId, alarmId); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java index c2f06c14da..779e958793 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.rule.engine.api; -import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rpc.Rpc; From ee98f203ef455d8329b71f7c6b1504b3a3248f14 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Fri, 4 Nov 2022 15:14:43 +0200 Subject: [PATCH 09/11] added findAlarmById method to AlarmService --- .../telemetry/DefaultAlarmSubscriptionService.java | 6 +----- .../org/thingsboard/server/dao/alarm/AlarmService.java | 2 ++ .../thingsboard/server/dao/alarm/BaseAlarmService.java | 9 ++++++++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java index d0279c1a87..64d4630314 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java @@ -132,11 +132,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService @Override public Alarm findAlarmById(TenantId tenantId, AlarmId alarmId) { - try { - return alarmService.findAlarmByIdAsync(tenantId, alarmId).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + return alarmService.findAlarmById(tenantId, alarmId); } @Override diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmService.java index 02cd1a8348..b8041aa3fc 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmService.java @@ -48,6 +48,8 @@ public interface AlarmService { ListenableFuture clearAlarm(TenantId tenantId, AlarmId alarmId, JsonNode details, long clearTs); + Alarm findAlarmById(TenantId tenantId, AlarmId alarmId); + ListenableFuture findAlarmByIdAsync(TenantId tenantId, AlarmId alarmId); ListenableFuture findAlarmInfoByIdAsync(TenantId tenantId, AlarmId alarmId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index 1316e52405..e6012602f1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -262,9 +262,16 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ } @Override - public ListenableFuture findAlarmByIdAsync(TenantId tenantId, AlarmId alarmId) { + public Alarm findAlarmById(TenantId tenantId, AlarmId alarmId) { log.trace("Executing findAlarmById [{}]", alarmId); validateId(alarmId, "Incorrect alarmId " + alarmId); + return alarmDao.findAlarmById(tenantId, alarmId.getId()); + } + + @Override + public ListenableFuture findAlarmByIdAsync(TenantId tenantId, AlarmId alarmId) { + log.trace("Executing findAlarmByIdAsync [{}]", alarmId); + validateId(alarmId, "Incorrect alarmId " + alarmId); return alarmDao.findAlarmByIdAsync(tenantId, alarmId.getId()); } From 74f12faa72a097c0f02f78020a92dd5e62d32592 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Fri, 4 Nov 2022 17:18:28 +0200 Subject: [PATCH 10/11] removed unused import --- .../service/telemetry/DefaultAlarmSubscriptionService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java index 64d4630314..956dde8024 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java @@ -52,7 +52,6 @@ import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import java.util.Collection; import java.util.Optional; -import java.util.concurrent.ExecutionException; /** * Created by ashvayka on 27.03.18. From a02a276cf9b2836cf5893fc09348d14da8e62bbd Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 8 Nov 2022 10:59:38 +0200 Subject: [PATCH 11/11] Fix code formatting issue in TenantIdLoaderTest --- .../org/thingsboard/rule/engine/util/TenantIdLoaderTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/TenantIdLoaderTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/TenantIdLoaderTest.java index 00e8a8c6c8..8a53148204 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/TenantIdLoaderTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/TenantIdLoaderTest.java @@ -321,9 +321,9 @@ public class TenantIdLoaderTest { private void checkTenant(TenantId checkTenantId, boolean equals) { for (EntityType entityType : EntityType.values()) { EntityId entityId; - if(EntityType.TENANT.equals(entityType)){ + if (EntityType.TENANT.equals(entityType)) { entityId = tenantId; - } else if(EntityType.TENANT_PROFILE.equals(entityType)){ + } else if (EntityType.TENANT_PROFILE.equals(entityType)) { entityId = tenantProfileId; } else { entityId = getEntityId(entityType);