Merge pull request #1607 from dmytro-landiak/feature/debug-rate-limits
events debug mode rate limits added
This commit is contained in:
commit
090a99dda4
@ -36,6 +36,7 @@ import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.rule.engine.api.MailService;
|
||||
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
|
||||
import org.thingsboard.server.actors.service.ActorService;
|
||||
import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
@ -43,6 +44,7 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
|
||||
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
|
||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
@ -84,6 +86,8 @@ import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ -92,6 +96,12 @@ public class ActorSystemContext {
|
||||
|
||||
protected final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
private final ConcurrentMap<TenantId, DebugTbRateLimits> debugPerTenantLimits = new ConcurrentHashMap<>();
|
||||
|
||||
public ConcurrentMap<TenantId, DebugTbRateLimits> getDebugPerTenantLimits() {
|
||||
return debugPerTenantLimits;
|
||||
}
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private ActorService actorService;
|
||||
@ -291,6 +301,14 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private long sessionReportTimeout;
|
||||
|
||||
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled}")
|
||||
@Getter
|
||||
private boolean debugPerTenantEnabled;
|
||||
|
||||
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.configuration}")
|
||||
@Getter
|
||||
private String debugPerTenantLimitsConfiguration;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private ActorSystem actorSystem;
|
||||
@ -318,8 +336,6 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private CassandraBufferedRateExecutor cassandraBufferedRateExecutor;
|
||||
|
||||
|
||||
|
||||
public ActorSystemContext() {
|
||||
config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
|
||||
}
|
||||
@ -392,46 +408,97 @@ public class ActorSystemContext {
|
||||
}
|
||||
|
||||
private void persistDebugAsync(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) {
|
||||
try {
|
||||
Event event = new Event();
|
||||
event.setTenantId(tenantId);
|
||||
event.setEntityId(entityId);
|
||||
event.setType(DataConstants.DEBUG_RULE_NODE);
|
||||
if (checkLimits(tenantId, tbMsg, error)) {
|
||||
try {
|
||||
Event event = new Event();
|
||||
event.setTenantId(tenantId);
|
||||
event.setEntityId(entityId);
|
||||
event.setType(DataConstants.DEBUG_RULE_NODE);
|
||||
|
||||
String metadata = mapper.writeValueAsString(tbMsg.getMetaData().getData());
|
||||
String metadata = mapper.writeValueAsString(tbMsg.getMetaData().getData());
|
||||
|
||||
ObjectNode node = mapper.createObjectNode()
|
||||
.put("type", type)
|
||||
.put("server", getServerAddress())
|
||||
.put("entityId", tbMsg.getOriginator().getId().toString())
|
||||
.put("entityName", tbMsg.getOriginator().getEntityType().name())
|
||||
.put("msgId", tbMsg.getId().toString())
|
||||
.put("msgType", tbMsg.getType())
|
||||
.put("dataType", tbMsg.getDataType().name())
|
||||
.put("relationType", relationType)
|
||||
.put("data", tbMsg.getData())
|
||||
.put("metadata", metadata);
|
||||
ObjectNode node = mapper.createObjectNode()
|
||||
.put("type", type)
|
||||
.put("server", getServerAddress())
|
||||
.put("entityId", tbMsg.getOriginator().getId().toString())
|
||||
.put("entityName", tbMsg.getOriginator().getEntityType().name())
|
||||
.put("msgId", tbMsg.getId().toString())
|
||||
.put("msgType", tbMsg.getType())
|
||||
.put("dataType", tbMsg.getDataType().name())
|
||||
.put("relationType", relationType)
|
||||
.put("data", tbMsg.getData())
|
||||
.put("metadata", metadata);
|
||||
|
||||
if (error != null) {
|
||||
node = node.put("error", toString(error));
|
||||
}
|
||||
|
||||
event.setBody(node);
|
||||
ListenableFuture<Event> future = eventService.saveAsync(event);
|
||||
Futures.addCallback(future, new FutureCallback<Event>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Event event) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable th) {
|
||||
log.error("Could not save debug Event for Node", th);
|
||||
}
|
||||
});
|
||||
} catch (IOException ex) {
|
||||
log.warn("Failed to persist rule node debug message", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkLimits(TenantId tenantId, TbMsg tbMsg, Throwable error) {
|
||||
if (debugPerTenantEnabled) {
|
||||
DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.computeIfAbsent(tenantId, id ->
|
||||
new DebugTbRateLimits(new TbRateLimits(debugPerTenantLimitsConfiguration), false));
|
||||
|
||||
if (!debugTbRateLimits.getTbRateLimits().tryConsume()) {
|
||||
if (!debugTbRateLimits.isRuleChainEventSaved()) {
|
||||
persistRuleChainDebugModeEvent(tenantId, tbMsg.getRuleChainId(), error);
|
||||
debugTbRateLimits.setRuleChainEventSaved(true);
|
||||
}
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("[{}] Tenant level debug mode rate limit detected: {}", tenantId, tbMsg);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void persistRuleChainDebugModeEvent(TenantId tenantId, EntityId entityId, Throwable error) {
|
||||
Event event = new Event();
|
||||
event.setTenantId(tenantId);
|
||||
event.setEntityId(entityId);
|
||||
event.setType(DataConstants.DEBUG_RULE_CHAIN);
|
||||
|
||||
ObjectNode node = mapper.createObjectNode()
|
||||
//todo: what fields are needed here?
|
||||
.put("server", getServerAddress())
|
||||
.put("message", "Reached debug mode rate limit!");
|
||||
|
||||
if (error != null) {
|
||||
node = node.put("error", toString(error));
|
||||
}
|
||||
|
||||
event.setBody(node);
|
||||
ListenableFuture<Event> future = eventService.saveAsync(event);
|
||||
Futures.addCallback(future, new FutureCallback<Event>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Event event) {
|
||||
|
||||
if (error != null) {
|
||||
node = node.put("error", toString(error));
|
||||
}
|
||||
|
||||
event.setBody(node);
|
||||
ListenableFuture<Event> future = eventService.saveAsync(event);
|
||||
Futures.addCallback(future, new FutureCallback<Event>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Event event) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable th) {
|
||||
log.error("Could not save debug Event for Node", th);
|
||||
}
|
||||
});
|
||||
} catch (IOException ex) {
|
||||
log.warn("Failed to persist rule node debug message", ex);
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable th) {
|
||||
log.error("Could not save debug Event for Rule Chain", th);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static Exception toException(Throwable error) {
|
||||
|
||||
@ -0,0 +1,29 @@
|
||||
/**
|
||||
* Copyright © 2016-2019 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.actors.tenant;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class DebugTbRateLimits {
|
||||
|
||||
private TbRateLimits tbRateLimits;
|
||||
private boolean ruleChainEventSaved;
|
||||
|
||||
}
|
||||
@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.security.access.prepost.PreAuthorize;
|
||||
import org.springframework.util.StringUtils;
|
||||
@ -34,6 +35,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.thingsboard.rule.engine.api.ScriptEngine;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
@ -56,10 +59,10 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
|
||||
import org.thingsboard.server.service.security.permission.Operation;
|
||||
import org.thingsboard.server.service.security.permission.Resource;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@ -78,6 +81,12 @@ public class RuleChainController extends BaseController {
|
||||
@Autowired
|
||||
private JsInvokeService jsInvokeService;
|
||||
|
||||
@Autowired(required = false)
|
||||
private ActorSystemContext actorContext;
|
||||
|
||||
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled}")
|
||||
private boolean debugPerTenantEnabled;
|
||||
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
||||
@RequestMapping(value = "/ruleChain/{ruleChainId}", method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
@ -182,8 +191,17 @@ public class RuleChainController extends BaseController {
|
||||
@ResponseBody
|
||||
public RuleChainMetaData saveRuleChainMetaData(@RequestBody RuleChainMetaData ruleChainMetaData) throws ThingsboardException {
|
||||
try {
|
||||
TenantId tenantId = getTenantId();
|
||||
if (debugPerTenantEnabled) {
|
||||
ConcurrentMap<TenantId, DebugTbRateLimits> debugPerTenantLimits = actorContext.getDebugPerTenantLimits();
|
||||
DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.getOrDefault(tenantId, null);
|
||||
if (debugTbRateLimits != null) {
|
||||
debugPerTenantLimits.remove(tenantId, debugTbRateLimits);
|
||||
}
|
||||
}
|
||||
|
||||
RuleChain ruleChain = checkRuleChain(ruleChainMetaData.getRuleChainId(), Operation.WRITE);
|
||||
RuleChainMetaData savedRuleChainMetaData = checkNotNull(ruleChainService.saveRuleChainMetaData(getTenantId(), ruleChainMetaData));
|
||||
RuleChainMetaData savedRuleChainMetaData = checkNotNull(ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData));
|
||||
|
||||
actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(), ComponentLifecycleEvent.UPDATED);
|
||||
|
||||
@ -236,7 +254,7 @@ public class RuleChainController extends BaseController {
|
||||
referencingRuleChainIds.remove(ruleChain.getId());
|
||||
|
||||
referencingRuleChainIds.forEach(referencingRuleChainId ->
|
||||
actorService.onEntityStateChange(ruleChain.getTenantId(), referencingRuleChainId, ComponentLifecycleEvent.UPDATED));
|
||||
actorService.onEntityStateChange(ruleChain.getTenantId(), referencingRuleChainId, ComponentLifecycleEvent.UPDATED));
|
||||
|
||||
actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(), ComponentLifecycleEvent.DELETED);
|
||||
|
||||
@ -291,7 +309,8 @@ public class RuleChainController extends BaseController {
|
||||
|
||||
String data = inputParams.get("msg").asText();
|
||||
JsonNode metadataJson = inputParams.get("metadata");
|
||||
Map<String, String> metadata = objectMapper.convertValue(metadataJson, new TypeReference<Map<String, String>>() {});
|
||||
Map<String, String> metadata = objectMapper.convertValue(metadataJson, new TypeReference<Map<String, String>>() {
|
||||
});
|
||||
String msgType = inputParams.get("msgType").asText();
|
||||
String output = "";
|
||||
String errorText = "";
|
||||
|
||||
@ -212,6 +212,9 @@ actors:
|
||||
chain:
|
||||
# Errors for particular actor are persisted once per specified amount of milliseconds
|
||||
error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
|
||||
debug_mode_rate_limits_per_tenant:
|
||||
enabled: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}"
|
||||
configuration: "${ACTORS_RULE_CHAIN_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:500:3600}"
|
||||
node:
|
||||
# Errors for particular actor are persisted once per specified amount of milliseconds
|
||||
error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
|
||||
|
||||
@ -38,6 +38,7 @@ public class DataConstants {
|
||||
public static final String LC_EVENT = "LC_EVENT";
|
||||
public static final String STATS = "STATS";
|
||||
public static final String DEBUG_RULE_NODE = "DEBUG_RULE_NODE";
|
||||
public static final String DEBUG_RULE_CHAIN = "DEBUG_RULE_CHAIN";
|
||||
|
||||
public static final String ONEWAY = "ONEWAY";
|
||||
public static final String TWOWAY = "TWOWAY";
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user