From 3ee3839c3c00cd18bf729515530950235fb8edab Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 29 Oct 2020 15:53:11 +0200 Subject: [PATCH] Protection from infinite loops --- .../RuleChainActorMessageProcessor.java | 1 - .../RuleNodeActorMessageProcessor.java | 48 +++++++++++------ .../actors/shared/ComponentMsgProcessor.java | 6 +++ .../DefaultTenantProfileConfiguration.java | 5 ++ .../profile/TenantProfileConfiguration.java | 3 ++ .../tenant/profile/TenantProfileData.java | 6 --- .../thingsboard/server/common/msg/TbMsg.java | 54 ++++++++++++------- common/message/src/main/proto/tbmsg.proto | 1 + 8 files changed, 82 insertions(+), 42 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index f5df62cfd3..ad5f97ab48 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -334,7 +334,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor extends Abstract this.entityId = id; } + protected TenantProfileConfiguration getTenantProfileConfiguration() { + return systemContext.getTenantProfileCache().get(tenantId).getProfileData().getConfiguration(); + } + public abstract String getComponentName(); public abstract void start(TbActorCtx context) throws Exception; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index 65940916b3..1804731c04 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -61,4 +61,9 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura public TenantProfileType getType() { return TenantProfileType.DEFAULT; } + + @Override + public int getMaxRuleNodeExecsPerMessage() { + return maxRuleNodeExecutionsPerMessage; + } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java index d058d11cfc..c4c6d7563a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileConfiguration.java @@ -37,4 +37,7 @@ public interface TenantProfileConfiguration { @JsonIgnore long getProfileThreshold(ApiUsageRecordKey key); + @JsonIgnore + int getMaxRuleNodeExecsPerMessage(); + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileData.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileData.java index 0996fc9850..297e562d26 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/TenantProfileData.java @@ -15,14 +15,8 @@ */ package org.thingsboard.server.common.data.tenant.profile; -import com.fasterxml.jackson.annotation.JsonAnyGetter; -import com.fasterxml.jackson.annotation.JsonAnySetter; -import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; -import java.util.HashMap; -import java.util.Map; - @Data public class TenantProfileData { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index f0af529be9..3b00e89fd6 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -18,27 +18,27 @@ package org.thingsboard.server.common.msg; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import lombok.AccessLevel; import lombok.Builder; import lombok.Data; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.msg.gen.MsgProtos; -import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.ServiceQueue; import org.thingsboard.server.common.msg.queue.TbMsgCallback; -import java.io.IOException; import java.io.Serializable; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 13.01.18. */ @Data -@Builder @Slf4j public final class TbMsg implements Serializable { @@ -52,51 +52,63 @@ public final class TbMsg implements Serializable { private final String data; private final RuleChainId ruleChainId; private final RuleNodeId ruleNodeId; + @Getter(value = AccessLevel.NONE) + private final AtomicInteger ruleNodeExecCounter; + + public int getAndIncrementRuleNodeCounter() { + return ruleNodeExecCounter.getAndIncrement(); + } + //This field is not serialized because we use queues and there is no need to do it @JsonIgnore transient private final TbMsgCallback callback; public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, + metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); } public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY); + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); } + // REALLY NEW MSG + public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY); + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); } public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, TbMsgCallback.EMPTY); + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY); } + // For Tests only + public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); } public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback); + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, callback); } - public static TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(origMsg.getQueueName(), origMsg.getId(), origMsg.getTs(), type, originator, metaData.copy(), origMsg.getDataType(), - data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback()); + public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { + return new TbMsg(tbMsg.getQueueName(), tbMsg.getId(), tbMsg.getTs(), type, originator, metaData.copy(), tbMsg.getDataType(), + data, tbMsg.getRuleChainId(), tbMsg.getRuleNodeId(), tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); } - public static TbMsg transformMsg(TbMsg origMsg, RuleChainId ruleChainId) { - return new TbMsg(origMsg.queueName, origMsg.id, origMsg.ts, origMsg.type, origMsg.originator, origMsg.metaData, origMsg.dataType, - origMsg.data, ruleChainId, null, origMsg.getCallback()); + public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId) { + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.metaData, tbMsg.dataType, + tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); } public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), - tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); + tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ruleNodeExecCounter.get(), TbMsgCallback.EMPTY); } private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, - RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) { + RuleChainId ruleChainId, RuleNodeId ruleNodeId, int ruleNodeExecCounter, TbMsgCallback callback) { this.id = id; this.queueName = queueName; if (ts > 0) { @@ -111,6 +123,7 @@ public final class TbMsg implements Serializable { this.data = data; this.ruleChainId = ruleChainId; this.ruleNodeId = ruleNodeId; + this.ruleNodeExecCounter = new AtomicInteger(ruleNodeExecCounter); if (callback != null) { this.callback = callback; } else { @@ -147,6 +160,7 @@ public final class TbMsg implements Serializable { builder.setDataType(msg.getDataType().ordinal()); builder.setData(msg.getData()); + builder.setRuleNodeExecCounter(msg.ruleNodeExecCounter.get()); return builder.build().toByteArray(); } @@ -164,18 +178,18 @@ public final class TbMsg implements Serializable { ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); } TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; - return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback); + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getRuleNodeExecCounter(), callback); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); } } public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) { - return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback); + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, this.ruleNodeExecCounter.get(), callback); } public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback); + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ruleNodeExecCounter.get(), callback); } public TbMsgCallback getCallback() { diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto index 25e006121f..53cb90014b 100644 --- a/common/message/src/main/proto/tbmsg.proto +++ b/common/message/src/main/proto/tbmsg.proto @@ -45,4 +45,5 @@ message TbMsgProto { string data = 14; int64 ts = 15; + int32 ruleNodeExecCounter = 16; } \ No newline at end of file