diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index c35aae1fd6..42b66a73e8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -36,6 +36,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.RuleChainTransactionService; import org.thingsboard.server.actors.service.ActorService; +import org.thingsboard.server.actors.tenant.DebugTbRateLimits; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.id.EntityId; @@ -43,6 +44,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; @@ -84,6 +86,8 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; @Slf4j @Component @@ -92,6 +96,12 @@ public class ActorSystemContext { protected final ObjectMapper mapper = new ObjectMapper(); + private final ConcurrentMap debugPerTenantLimits = new ConcurrentHashMap<>(); + + public ConcurrentMap getDebugPerTenantLimits() { + return debugPerTenantLimits; + } + @Getter @Setter private ActorService actorService; @@ -291,6 +301,14 @@ public class ActorSystemContext { @Getter private long sessionReportTimeout; + @Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled}") + @Getter + private boolean debugPerTenantEnabled; + + @Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.configuration}") + @Getter + private String debugPerTenantLimitsConfiguration; + @Getter @Setter private ActorSystem actorSystem; @@ -318,8 +336,6 @@ public class ActorSystemContext { @Getter private CassandraBufferedRateExecutor cassandraBufferedRateExecutor; - - public ActorSystemContext() { config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load()); } @@ -392,46 +408,97 @@ public class ActorSystemContext { } private void persistDebugAsync(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) { - try { - Event event = new Event(); - event.setTenantId(tenantId); - event.setEntityId(entityId); - event.setType(DataConstants.DEBUG_RULE_NODE); + if (checkLimits(tenantId, tbMsg, error)) { + try { + Event event = new Event(); + event.setTenantId(tenantId); + event.setEntityId(entityId); + event.setType(DataConstants.DEBUG_RULE_NODE); - String metadata = mapper.writeValueAsString(tbMsg.getMetaData().getData()); + String metadata = mapper.writeValueAsString(tbMsg.getMetaData().getData()); - ObjectNode node = mapper.createObjectNode() - .put("type", type) - .put("server", getServerAddress()) - .put("entityId", tbMsg.getOriginator().getId().toString()) - .put("entityName", tbMsg.getOriginator().getEntityType().name()) - .put("msgId", tbMsg.getId().toString()) - .put("msgType", tbMsg.getType()) - .put("dataType", tbMsg.getDataType().name()) - .put("relationType", relationType) - .put("data", tbMsg.getData()) - .put("metadata", metadata); + ObjectNode node = mapper.createObjectNode() + .put("type", type) + .put("server", getServerAddress()) + .put("entityId", tbMsg.getOriginator().getId().toString()) + .put("entityName", tbMsg.getOriginator().getEntityType().name()) + .put("msgId", tbMsg.getId().toString()) + .put("msgType", tbMsg.getType()) + .put("dataType", tbMsg.getDataType().name()) + .put("relationType", relationType) + .put("data", tbMsg.getData()) + .put("metadata", metadata); + + if (error != null) { + node = node.put("error", toString(error)); + } + + event.setBody(node); + ListenableFuture future = eventService.saveAsync(event); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Event event) { + + } + + @Override + public void onFailure(Throwable th) { + log.error("Could not save debug Event for Node", th); + } + }); + } catch (IOException ex) { + log.warn("Failed to persist rule node debug message", ex); + } + } + } + + private boolean checkLimits(TenantId tenantId, TbMsg tbMsg, Throwable error) { + if (debugPerTenantEnabled) { + DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.computeIfAbsent(tenantId, id -> + new DebugTbRateLimits(new TbRateLimits(debugPerTenantLimitsConfiguration), false)); + + if (!debugTbRateLimits.getTbRateLimits().tryConsume()) { + if (!debugTbRateLimits.isRuleChainEventSaved()) { + persistRuleChainDebugModeEvent(tenantId, tbMsg.getRuleChainId(), error); + debugTbRateLimits.setRuleChainEventSaved(true); + } + if (log.isTraceEnabled()) { + log.trace("[{}] Tenant level debug mode rate limit detected: {}", tenantId, tbMsg); + } + return false; + } + } + return true; + } + + private void persistRuleChainDebugModeEvent(TenantId tenantId, EntityId entityId, Throwable error) { + Event event = new Event(); + event.setTenantId(tenantId); + event.setEntityId(entityId); + event.setType(DataConstants.DEBUG_RULE_CHAIN); + + ObjectNode node = mapper.createObjectNode() + //todo: what fields are needed here? + .put("server", getServerAddress()) + .put("message", "Reached debug mode rate limit!"); + + if (error != null) { + node = node.put("error", toString(error)); + } + + event.setBody(node); + ListenableFuture future = eventService.saveAsync(event); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Event event) { - if (error != null) { - node = node.put("error", toString(error)); } - event.setBody(node); - ListenableFuture future = eventService.saveAsync(event); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Event event) { - - } - - @Override - public void onFailure(Throwable th) { - log.error("Could not save debug Event for Node", th); - } - }); - } catch (IOException ex) { - log.warn("Failed to persist rule node debug message", ex); - } + @Override + public void onFailure(Throwable th) { + log.error("Could not save debug Event for Rule Chain", th); + } + }); } public static Exception toException(Throwable error) { diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/DebugTbRateLimits.java b/application/src/main/java/org/thingsboard/server/actors/tenant/DebugTbRateLimits.java new file mode 100644 index 0000000000..7ae764392c --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/DebugTbRateLimits.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.tenant; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.thingsboard.server.common.msg.tools.TbRateLimits; + +@Data +@AllArgsConstructor +public class DebugTbRateLimits { + + private TbRateLimits tbRateLimits; + private boolean ruleChainEventSaved; + +} diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java index 2b0387049d..14fffc7486 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.util.StringUtils; @@ -34,6 +35,8 @@ import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import org.thingsboard.rule.engine.api.ScriptEngine; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.tenant.DebugTbRateLimits; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Event; @@ -56,10 +59,10 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @Slf4j @@ -78,6 +81,12 @@ public class RuleChainController extends BaseController { @Autowired private JsInvokeService jsInvokeService; + @Autowired(required = false) + private ActorSystemContext actorContext; + + @Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled}") + private boolean debugPerTenantEnabled; + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") @RequestMapping(value = "/ruleChain/{ruleChainId}", method = RequestMethod.GET) @ResponseBody @@ -182,8 +191,17 @@ public class RuleChainController extends BaseController { @ResponseBody public RuleChainMetaData saveRuleChainMetaData(@RequestBody RuleChainMetaData ruleChainMetaData) throws ThingsboardException { try { + TenantId tenantId = getTenantId(); + if (debugPerTenantEnabled) { + ConcurrentMap debugPerTenantLimits = actorContext.getDebugPerTenantLimits(); + DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.getOrDefault(tenantId, null); + if (debugTbRateLimits != null) { + debugPerTenantLimits.remove(tenantId, debugTbRateLimits); + } + } + RuleChain ruleChain = checkRuleChain(ruleChainMetaData.getRuleChainId(), Operation.WRITE); - RuleChainMetaData savedRuleChainMetaData = checkNotNull(ruleChainService.saveRuleChainMetaData(getTenantId(), ruleChainMetaData)); + RuleChainMetaData savedRuleChainMetaData = checkNotNull(ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData)); actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(), ComponentLifecycleEvent.UPDATED); @@ -236,7 +254,7 @@ public class RuleChainController extends BaseController { referencingRuleChainIds.remove(ruleChain.getId()); referencingRuleChainIds.forEach(referencingRuleChainId -> - actorService.onEntityStateChange(ruleChain.getTenantId(), referencingRuleChainId, ComponentLifecycleEvent.UPDATED)); + actorService.onEntityStateChange(ruleChain.getTenantId(), referencingRuleChainId, ComponentLifecycleEvent.UPDATED)); actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(), ComponentLifecycleEvent.DELETED); @@ -291,7 +309,8 @@ public class RuleChainController extends BaseController { String data = inputParams.get("msg").asText(); JsonNode metadataJson = inputParams.get("metadata"); - Map metadata = objectMapper.convertValue(metadataJson, new TypeReference>() {}); + Map metadata = objectMapper.convertValue(metadataJson, new TypeReference>() { + }); String msgType = inputParams.get("msgType").asText(); String output = ""; String errorText = ""; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 36767ca2d8..7fa01ccc05 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -212,6 +212,9 @@ actors: chain: # Errors for particular actor are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}" + debug_mode_rate_limits_per_tenant: + enabled: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}" + configuration: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:500:3600}" node: # Errors for particular actor are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 7b87fa6cd3..45aa39e8f2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -38,6 +38,7 @@ public class DataConstants { public static final String LC_EVENT = "LC_EVENT"; public static final String STATS = "STATS"; public static final String DEBUG_RULE_NODE = "DEBUG_RULE_NODE"; + public static final String DEBUG_RULE_CHAIN = "DEBUG_RULE_CHAIN"; public static final String ONEWAY = "ONEWAY"; public static final String TWOWAY = "TWOWAY";