debug events(wip)
This commit is contained in:
parent
ebff4abf0a
commit
3818d1cb68
@ -137,6 +137,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ -179,6 +180,8 @@ public class ActorSystemContext {
|
||||
|
||||
private final ConcurrentMap<TenantId, DebugTbRateLimits> debugPerTenantLimits = new ConcurrentHashMap<>();
|
||||
|
||||
private final ConcurrentMap<TenantId, TbRateLimits> cfDebugPerTenantLimits = new ConcurrentHashMap<>();
|
||||
|
||||
public ConcurrentMap<TenantId, DebugTbRateLimits> getDebugPerTenantLimits() {
|
||||
return debugPerTenantLimits;
|
||||
}
|
||||
@ -441,6 +444,11 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private TbCoreToTransportService tbCoreToTransportService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private ApiLimitService apiLimitService;
|
||||
|
||||
/**
|
||||
* The following Service will be null if we operate in tb-core mode
|
||||
*/
|
||||
@ -517,11 +525,6 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private CalculatedFieldExecutionService calculatedFieldExecutionService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private ApiLimitService apiLimitService;
|
||||
|
||||
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
|
||||
@Getter
|
||||
private long maxConcurrentSessionsPerDevice;
|
||||
@ -625,6 +628,14 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private String deviceStateNodeRateLimitConfig;
|
||||
|
||||
@Value("${actors.calculated_fields.debug_mode_rate_limits_per_tenant.enabled:true}")
|
||||
@Getter
|
||||
private boolean cfDebugPerTenantEnabled;
|
||||
|
||||
@Value("${actors.calculated_fields.debug_mode_rate_limits_per_tenant.configuration:50000:3600}")
|
||||
@Getter
|
||||
private String cfDebugPerTenantLimitsConfiguration;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private TbActorSystem actorSystem;
|
||||
@ -753,37 +764,6 @@ public class ActorSystemContext {
|
||||
}
|
||||
}
|
||||
|
||||
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) {
|
||||
try {
|
||||
CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder()
|
||||
.tenantId(tenantId)
|
||||
.entityId(entityId.getId())
|
||||
.serviceId(getServiceId())
|
||||
.calculatedFieldId(calculatedFieldId)
|
||||
.eventEntity(entityId);
|
||||
if (tbMsgId != null) {
|
||||
eventBuilder.msgId(tbMsgId);
|
||||
}
|
||||
if (tbMsgType != null) {
|
||||
eventBuilder.msgType(tbMsgType.name());
|
||||
}
|
||||
if (arguments != null) {
|
||||
eventBuilder.arguments(JacksonUtil.toString(arguments));
|
||||
}
|
||||
if (result != null) {
|
||||
eventBuilder.result(result);
|
||||
}
|
||||
if (error != null) {
|
||||
eventBuilder.error(toString(error));
|
||||
}
|
||||
|
||||
ListenableFuture<Void> future = eventService.saveAsync(eventBuilder.build());
|
||||
Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
|
||||
} catch (IllegalArgumentException ex) {
|
||||
log.warn("Failed to persist calculated field debug message", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkLimits(TenantId tenantId, TbMsg tbMsg, Throwable error) {
|
||||
if (debugPerTenantEnabled) {
|
||||
DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.computeIfAbsent(tenantId, id ->
|
||||
@ -817,6 +797,49 @@ public class ActorSystemContext {
|
||||
Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) {
|
||||
if (cfDebugPerTenantEnabled) {
|
||||
TbRateLimits rateLimits = cfDebugPerTenantLimits.computeIfAbsent(tenantId, id -> new TbRateLimits(cfDebugPerTenantLimitsConfiguration));
|
||||
|
||||
if (rateLimits.tryConsume()) {
|
||||
try {
|
||||
CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder()
|
||||
.tenantId(tenantId)
|
||||
.entityId(calculatedFieldId.getId())
|
||||
.serviceId(getServiceId())
|
||||
.calculatedFieldId(calculatedFieldId)
|
||||
.eventEntity(entityId);
|
||||
if (tbMsgId != null) {
|
||||
eventBuilder.msgId(tbMsgId);
|
||||
}
|
||||
if (tbMsgType != null) {
|
||||
eventBuilder.msgType(tbMsgType.name());
|
||||
}
|
||||
if (arguments != null) {
|
||||
eventBuilder.arguments(JacksonUtil.toString(
|
||||
arguments.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getValue()))
|
||||
));
|
||||
}
|
||||
if (result != null) {
|
||||
eventBuilder.result(result);
|
||||
}
|
||||
if (error != null) {
|
||||
eventBuilder.error(toString(error));
|
||||
}
|
||||
|
||||
ListenableFuture<Void> future = eventService.saveAsync(eventBuilder.build());
|
||||
Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
|
||||
} catch (IllegalArgumentException ex) {
|
||||
log.warn("Failed to persist calculated field debug message", ex);
|
||||
}
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("[{}] Tenant level debug mode rate limit detected: {}", tenantId, calculatedFieldId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Exception toException(Throwable error) {
|
||||
return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
|
||||
}
|
||||
|
||||
@ -61,7 +61,8 @@ public class RocksDBStateService implements CalculatedFieldStateService {
|
||||
@Override
|
||||
public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
|
||||
CalculatedFieldStateProto stateProto = toProto(stateId, state);
|
||||
if (stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) {
|
||||
long maxStateSizeInKBytes = ctx.getMaxStateSizeInKBytes();
|
||||
if (maxStateSizeInKBytes <= 0 || stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) {
|
||||
rocksDBService.put(toProto(stateId), toProto(stateId, state));
|
||||
}
|
||||
callback.onSuccess();
|
||||
|
||||
@ -55,6 +55,7 @@ public class TenantAdminPermissions extends AbstractPermissions {
|
||||
put(Resource.OAUTH2_CONFIGURATION_TEMPLATE, new PermissionChecker.GenericPermissionChecker(Operation.READ));
|
||||
put(Resource.MOBILE_APP, tenantEntityPermissionChecker);
|
||||
put(Resource.MOBILE_APP_BUNDLE, tenantEntityPermissionChecker);
|
||||
put(Resource.CALCULATED_FIELD, tenantEntityPermissionChecker);
|
||||
}
|
||||
|
||||
public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() {
|
||||
|
||||
@ -510,6 +510,12 @@ actors:
|
||||
js_print_interval_ms: "${ACTORS_JS_STATISTICS_PRINT_INTERVAL_MS:10000}"
|
||||
# Actors statistic persistence frequency in milliseconds
|
||||
persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}"
|
||||
calculated_fields:
|
||||
debug_mode_rate_limits_per_tenant:
|
||||
# Enable/Disable the rate limit of persisted debug events for all calculated fields per tenant
|
||||
enabled: "${ACTORS_CF_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}"
|
||||
# The value of DEBUG mode rate limit. By default, no more than 50 thousand events per hour
|
||||
configuration: "${ACTORS_CF_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}"
|
||||
|
||||
debug:
|
||||
settings:
|
||||
|
||||
@ -62,9 +62,9 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
|
||||
try {
|
||||
TenantId tenantId = calculatedField.getTenantId();
|
||||
log.trace("Executing save calculated field, [{}]", calculatedField);
|
||||
updateDebugSettings(tenantId, calculatedField, System.currentTimeMillis());
|
||||
CalculatedField savedCalculatedField = calculatedFieldDao.save(tenantId, calculatedField);
|
||||
createOrUpdateCalculatedFieldLink(tenantId, savedCalculatedField);
|
||||
updateDebugSettings(tenantId, calculatedField, System.currentTimeMillis());
|
||||
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedCalculatedField.getTenantId()).entityId(savedCalculatedField.getId())
|
||||
.entity(savedCalculatedField).oldEntity(oldCalculatedField).created(calculatedField.getId() == null).build());
|
||||
return savedCalculatedField;
|
||||
|
||||
@ -35,7 +35,7 @@ public interface CalculatedFieldDebugEventRepository extends EventRepository<Cal
|
||||
List<CalculatedFieldDebugEventEntity> findLatestEvents(@Param("tenantId") UUID tenantId, @Param("entityId") UUID entityId, @Param("limit") int limit);
|
||||
|
||||
@Override
|
||||
@Query("SELECT e FROM RuleNodeDebugEventEntity e WHERE " +
|
||||
@Query("SELECT e FROM CalculatedFieldDebugEventEntity e WHERE " +
|
||||
"e.tenantId = :tenantId " +
|
||||
"AND e.entityId = :entityId " +
|
||||
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
|
||||
|
||||
@ -955,10 +955,10 @@ CREATE TABLE IF NOT EXISTS cf_debug_event (
|
||||
id uuid NOT NULL,
|
||||
tenant_id uuid NOT NULL ,
|
||||
ts bigint NOT NULL,
|
||||
entity_id uuid NOT NULL,
|
||||
entity_id uuid NOT NULL, -- calculated field id
|
||||
service_id varchar,
|
||||
cf_id uuid NOT NULL,
|
||||
e_entity_id uuid,
|
||||
e_entity_id uuid, -- target entity id
|
||||
e_entity_type varchar,
|
||||
e_msg_id uuid,
|
||||
e_msg_type varchar,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user