Refactor RateLimitsUpdater; handle rate limits disabling
This commit is contained in:
parent
ffd2a02742
commit
d99a8066fe
@ -15,14 +15,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.config;
|
package org.thingsboard.server.config;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.security.core.Authentication;
|
import org.springframework.security.core.Authentication;
|
||||||
import org.springframework.security.core.context.SecurityContextHolder;
|
import org.springframework.security.core.context.SecurityContextHolder;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.web.filter.GenericFilterBean;
|
import org.springframework.web.filter.GenericFilterBean;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.id.CustomerId;
|
||||||
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
|
import org.thingsboard.server.common.msg.tools.TbRateLimits;
|
||||||
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
||||||
@ -36,6 +36,7 @@ import javax.servlet.ServletRequest;
|
|||||||
import javax.servlet.ServletResponse;
|
import javax.servlet.ServletResponse;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
@ -55,37 +56,38 @@ public class RateLimitProcessingFilter extends GenericFilterBean {
|
|||||||
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
|
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
|
||||||
SecurityUser user = getCurrentUser();
|
SecurityUser user = getCurrentUser();
|
||||||
if (user != null && !user.isSystemAdmin()) {
|
if (user != null && !user.isSystemAdmin()) {
|
||||||
var profileConfiguration = tenantProfileCache.get(user.getTenantId()).getDefaultTenantProfileConfiguration();
|
var profileConfiguration = tenantProfileCache.get(user.getTenantId()).getDefaultProfileConfiguration();
|
||||||
if (profileConfiguration != null) {
|
if (!checkRateLimits(user.getTenantId(), profileConfiguration.getTenantServerRestLimitsConfiguration(), perTenantLimits, response)) {
|
||||||
if (user.isTenantAdmin() && StringUtils.isNotEmpty(profileConfiguration.getTenantServerRestLimitsConfiguration())) {
|
return;
|
||||||
TbRateLimits rateLimits = perTenantLimits.get(user.getTenantId());
|
}
|
||||||
if (rateLimits == null || !rateLimits.getConfiguration().equals(profileConfiguration.getTenantServerRestLimitsConfiguration())) {
|
if (user.isCustomerUser()) {
|
||||||
// fixme: or maybe handle component lifecycle event ?
|
if (!checkRateLimits(user.getCustomerId(), profileConfiguration.getCustomerServerRestLimitsConfiguration(), perCustomerLimits, response)) {
|
||||||
rateLimits = new TbRateLimits(profileConfiguration.getTenantServerRestLimitsConfiguration());
|
return;
|
||||||
perTenantLimits.put(user.getTenantId(), rateLimits);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!rateLimits.tryConsume()) {
|
|
||||||
errorResponseHandler.handle(new TbRateLimitsException(EntityType.TENANT), (HttpServletResponse) response);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else if (user.isCustomerUser() && StringUtils.isNotEmpty(profileConfiguration.getCustomerServerRestLimitsConfiguration())) {
|
|
||||||
TbRateLimits rateLimits = perCustomerLimits.get(user.getCustomerId());
|
|
||||||
if (rateLimits == null || !rateLimits.getConfiguration().equals(profileConfiguration.getCustomerServerRestLimitsConfiguration())) {
|
|
||||||
rateLimits = new TbRateLimits(profileConfiguration.getCustomerServerRestLimitsConfiguration());
|
|
||||||
perCustomerLimits.put(user.getCustomerId(), rateLimits);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!rateLimits.tryConsume()) {
|
|
||||||
errorResponseHandler.handle(new TbRateLimitsException(EntityType.CUSTOMER), (HttpServletResponse) response);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
chain.doFilter(request, response);
|
chain.doFilter(request, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <I extends EntityId> boolean checkRateLimits(I ownerId, String rateLimitConfig, Map<I, TbRateLimits> rateLimitsMap, ServletResponse response) {
|
||||||
|
if (StringUtils.isNotEmpty(rateLimitConfig)) {
|
||||||
|
TbRateLimits rateLimits = rateLimitsMap.get(ownerId);
|
||||||
|
if (rateLimits == null || !rateLimits.getConfiguration().equals(rateLimitConfig)) {
|
||||||
|
rateLimits = new TbRateLimits(rateLimitConfig);
|
||||||
|
rateLimitsMap.put(ownerId, rateLimits);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!rateLimits.tryConsume()) {
|
||||||
|
errorResponseHandler.handle(new TbRateLimitsException(ownerId.getEntityType()), (HttpServletResponse) response);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rateLimitsMap.remove(ownerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
protected SecurityUser getCurrentUser() {
|
protected SecurityUser getCurrentUser() {
|
||||||
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
|
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
|
||||||
if (authentication != null && authentication.getPrincipal() instanceof SecurityUser) {
|
if (authentication != null && authentication.getPrincipal() instanceof SecurityUser) {
|
||||||
|
|||||||
@ -38,7 +38,6 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
|||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||||
import org.thingsboard.server.service.security.model.UserPrincipal;
|
import org.thingsboard.server.service.security.model.UserPrincipal;
|
||||||
import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
|
|
||||||
import org.thingsboard.server.service.telemetry.SessionEvent;
|
import org.thingsboard.server.service.telemetry.SessionEvent;
|
||||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
|
import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
|
||||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
|
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
|
||||||
@ -138,8 +137,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
if (!checkLimits(session, sessionRef)) {
|
if (!checkLimits(session, sessionRef)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
|
||||||
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsMsgQueueLimitPerSession()));
|
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsMsgQueueLimitPerSession() > 0 ?
|
||||||
|
tenantProfileConfiguration.getWsMsgQueueLimitPerSession() : 500));
|
||||||
|
|
||||||
externalSessionMap.put(externalSessionId, internalSessionId);
|
externalSessionMap.put(externalSessionId, internalSessionId);
|
||||||
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
|
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
|
||||||
@ -300,9 +300,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
if (internalId != null) {
|
if (internalId != null) {
|
||||||
SessionMetaData sessionMd = internalSessionMap.get(internalId);
|
SessionMetaData sessionMd = internalSessionMap.get(internalId);
|
||||||
if (sessionMd != null) {
|
if (sessionMd != null) {
|
||||||
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
|
||||||
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())) {
|
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())) {
|
||||||
// fixme: is this ok not to update rate limits config if it was change in profile?
|
|
||||||
TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit()));
|
TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit()));
|
||||||
if (!rateLimits.tryConsume()) {
|
if (!rateLimits.tryConsume()) {
|
||||||
if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
|
if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
|
||||||
@ -315,6 +314,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
log.debug("[{}][{}][{}] Session is no longer blacklisted.", sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
|
log.debug("[{}][{}][{}] Session is no longer blacklisted.", sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
|
||||||
blacklistedSessions.remove(externalId);
|
blacklistedSessions.remove(externalId);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
perSessionUpdateLimits.remove(sessionRef.getSessionId());
|
||||||
}
|
}
|
||||||
sessionMd.sendMsg(msg);
|
sessionMd.sendMsg(msg);
|
||||||
} else {
|
} else {
|
||||||
@ -360,8 +361,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
|
|
||||||
private boolean checkLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) throws Exception {
|
private boolean checkLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) throws Exception {
|
||||||
var tenantProfileConfiguration =
|
var tenantProfileConfiguration =
|
||||||
tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
|
||||||
|
|
||||||
if (tenantProfileConfiguration == null) {
|
if (tenantProfileConfiguration == null) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -428,7 +428,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void cleanupLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
|
private void cleanupLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
|
||||||
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
|
||||||
|
|
||||||
String sessionId = session.getId();
|
String sessionId = session.getId();
|
||||||
perSessionUpdateLimits.remove(sessionRef.getSessionId());
|
perSessionUpdateLimits.remove(sessionRef.getSessionId());
|
||||||
|
|||||||
@ -28,39 +28,39 @@ import org.thingsboard.server.dao.tenant.TenantProfileService;
|
|||||||
@Component
|
@Component
|
||||||
class RateLimitsUpdater extends PaginatedUpdater<String, TenantProfile> {
|
class RateLimitsUpdater extends PaginatedUpdater<String, TenantProfile> {
|
||||||
|
|
||||||
@Value("${server.rest.limits.tenant.enabled}")
|
@Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_TENANT_ENABLED') ?: environment.getProperty('server.rest.limits.tenant.enabled') ?: 'false' }")
|
||||||
boolean tenantServerRestLimitsEnabled;
|
boolean tenantServerRestLimitsEnabled;
|
||||||
@Value("${server.rest.limits.tenant.configuration}")
|
@Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_TENANT_CONFIGURATION') ?: environment.getProperty('server.rest.limits.tenant.configuration') ?: '100:1,2000:60' }")
|
||||||
String tenantServerRestLimitsConfiguration;
|
String tenantServerRestLimitsConfiguration;
|
||||||
@Value("${server.rest.limits.customer.enabled}")
|
@Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_CUSTOMER_ENABLED') ?: environment.getProperty('server.rest.limits.customer.enabled') ?: 'false' }")
|
||||||
boolean customerServerRestLimitsEnabled;
|
boolean customerServerRestLimitsEnabled;
|
||||||
@Value("${server.rest.limits.customer.configuration}")
|
@Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_CUSTOMER_CONFIGURATION') ?: environment.getProperty('server.rest.limits.customer.configuration') ?: '50:1,1000:60' }")
|
||||||
String customerServerRestLimitsConfiguration;
|
String customerServerRestLimitsConfiguration;
|
||||||
|
|
||||||
@Value("${server.ws.limits.max_sessions_per_tenant}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT') ?: environment.getProperty('server.ws.limits.max_sessions_per_tenant') ?: '0' }")
|
||||||
private int maxWsSessionsPerTenant;
|
private int maxWsSessionsPerTenant;
|
||||||
@Value("${server.ws.limits.max_sessions_per_customer}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER') ?: environment.getProperty('server.ws.limits.max_sessions_per_customer') ?: '0' }")
|
||||||
private int maxWsSessionsPerCustomer;
|
private int maxWsSessionsPerCustomer;
|
||||||
@Value("${server.ws.limits.max_sessions_per_regular_user}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER') ?: environment.getProperty('server.ws.limits.max_sessions_per_regular_user') ?: '0' }")
|
||||||
private int maxWsSessionsPerRegularUser;
|
private int maxWsSessionsPerRegularUser;
|
||||||
@Value("${server.ws.limits.max_sessions_per_public_user}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER') ?: environment.getProperty('server.ws.limits.max_sessions_per_public_user') ?: '0' }")
|
||||||
private int maxWsSessionsPerPublicUser;
|
private int maxWsSessionsPerPublicUser;
|
||||||
@Value("${server.ws.limits.max_queue_per_ws_session}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION') ?: environment.getProperty('server.ws.limits.max_queue_per_ws_session') ?: '500' }")
|
||||||
private int wsMsgQueueLimitPerSession;
|
private int wsMsgQueueLimitPerSession;
|
||||||
@Value("${server.ws.limits.max_subscriptions_per_tenant}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_tenant') ?: '0' }")
|
||||||
private long maxWsSubscriptionsPerTenant;
|
private long maxWsSubscriptionsPerTenant;
|
||||||
@Value("${server.ws.limits.max_subscriptions_per_customer}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_customer') ?: '0' }")
|
||||||
private long maxWsSubscriptionsPerCustomer;
|
private long maxWsSubscriptionsPerCustomer;
|
||||||
@Value("${server.ws.limits.max_subscriptions_per_regular_user}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_regular_user') ?: '0' }")
|
||||||
private long maxWsSubscriptionsPerRegularUser;
|
private long maxWsSubscriptionsPerRegularUser;
|
||||||
@Value("${server.ws.limits.max_subscriptions_per_public_user}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_public_user') ?: '0' }")
|
||||||
private long maxWsSubscriptionsPerPublicUser;
|
private long maxWsSubscriptionsPerPublicUser;
|
||||||
@Value("${server.ws.limits.max_updates_per_session}")
|
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION') ?: environment.getProperty('server.ws.limits.max_updates_per_session') ?: '300:1,3000:60' }")
|
||||||
private String wsUpdatesPerSessionRateLimit;
|
private String wsUpdatesPerSessionRateLimit;
|
||||||
|
|
||||||
@Value("${cassandra.query.tenant_rate_limits.enabled}")
|
@Value("#{ environment.getProperty('CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED') ?: environment.getProperty('cassandra.query.tenant_rate_limits.enabled') ?: 'false' }")
|
||||||
private boolean cassandraQueryTenantRateLimitsEnabled;
|
private boolean cassandraQueryTenantRateLimitsEnabled;
|
||||||
@Value("${cassandra.query.tenant_rate_limits.configuration}")
|
@Value("#{ environment.getProperty('CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION') ?: environment.getProperty('cassandra.query.tenant_rate_limits.configuration') ?: '1000:1,30000:60' }")
|
||||||
private String cassandraQueryTenantRateLimitsConfiguration;
|
private String cassandraQueryTenantRateLimitsConfiguration;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -83,7 +83,7 @@ class RateLimitsUpdater extends PaginatedUpdater<String, TenantProfile> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void updateEntity(TenantProfile tenantProfile) {
|
protected void updateEntity(TenantProfile tenantProfile) {
|
||||||
var profileConfiguration = tenantProfile.getDefaultTenantProfileConfiguration();
|
var profileConfiguration = tenantProfile.getDefaultProfileConfiguration();
|
||||||
|
|
||||||
if (tenantServerRestLimitsEnabled && StringUtils.isNotEmpty(tenantServerRestLimitsConfiguration)) {
|
if (tenantServerRestLimitsEnabled && StringUtils.isNotEmpty(tenantServerRestLimitsConfiguration)) {
|
||||||
profileConfiguration.setTenantServerRestLimitsConfiguration(tenantServerRestLimitsConfiguration);
|
profileConfiguration.setTenantServerRestLimitsConfiguration(tenantServerRestLimitsConfiguration);
|
||||||
|
|||||||
@ -304,7 +304,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void processSessionClose(TelemetryWebSocketSessionRef sessionRef) {
|
private void processSessionClose(TelemetryWebSocketSessionRef sessionRef) {
|
||||||
var tenantProfileConfiguration = (DefaultTenantProfileConfiguration) tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
|
||||||
if (tenantProfileConfiguration != null) {
|
if (tenantProfileConfiguration != null) {
|
||||||
String sessionId = "[" + sessionRef.getSessionId() + "]";
|
String sessionId = "[" + sessionRef.getSessionId() + "]";
|
||||||
|
|
||||||
@ -338,7 +338,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean processSubscription(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
|
private boolean processSubscription(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
|
||||||
var tenantProfileConfiguration = (DefaultTenantProfileConfiguration) tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
var tenantProfileConfiguration = (DefaultTenantProfileConfiguration) tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
|
||||||
|
|
||||||
String subId = "[" + sessionRef.getSessionId() + "]:[" + cmd.getCmdId() + "]";
|
String subId = "[" + sessionRef.getSessionId() + "]:[" + cmd.getCmdId() + "]";
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -56,18 +56,6 @@ server:
|
|||||||
send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
|
send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
|
||||||
# recommended timeout >= 30 seconds. Platform will attempt to send 'ping' request 3 times within the timeout
|
# recommended timeout >= 30 seconds. Platform will attempt to send 'ping' request 3 times within the timeout
|
||||||
ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:30000}"
|
ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:30000}"
|
||||||
limits:
|
|
||||||
# Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
|
|
||||||
max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
|
|
||||||
max_sessions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER:0}"
|
|
||||||
max_sessions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER:0}"
|
|
||||||
max_sessions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER:0}"
|
|
||||||
max_queue_per_ws_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION:500}"
|
|
||||||
max_subscriptions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT:0}"
|
|
||||||
max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}"
|
|
||||||
max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}"
|
|
||||||
max_subscriptions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER:0}"
|
|
||||||
max_updates_per_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION:300:1,3000:60}"
|
|
||||||
dynamic_page_link:
|
dynamic_page_link:
|
||||||
refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}"
|
refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}"
|
||||||
refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}"
|
refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}"
|
||||||
@ -76,13 +64,6 @@ server:
|
|||||||
max_entities_per_data_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_DATA_SUBSCRIPTION:10000}"
|
max_entities_per_data_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_DATA_SUBSCRIPTION:10000}"
|
||||||
max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}"
|
max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}"
|
||||||
rest:
|
rest:
|
||||||
limits:
|
|
||||||
tenant:
|
|
||||||
enabled: "${TB_SERVER_REST_LIMITS_TENANT_ENABLED:false}"
|
|
||||||
configuration: "${TB_SERVER_REST_LIMITS_TENANT_CONFIGURATION:100:1,2000:60}"
|
|
||||||
customer:
|
|
||||||
enabled: "${TB_SERVER_REST_LIMITS_CUSTOMER_ENABLED:false}"
|
|
||||||
configuration: "${TB_SERVER_REST_LIMITS_CUSTOMER_CONFIGURATION:50:1,1000:60}"
|
|
||||||
server_side_rpc:
|
server_side_rpc:
|
||||||
# Minimum value of the server side RPC timeout. May override value provided in the REST API call.
|
# Minimum value of the server side RPC timeout. May override value provided in the REST API call.
|
||||||
# Since 2.5 migration to queues, the RPC delay depends on the size of the pending messages in the queue,
|
# Since 2.5 migration to queues, the RPC delay depends on the size of the pending messages in the queue,
|
||||||
@ -255,8 +236,6 @@ cassandra:
|
|||||||
# log one of cassandra queries with specified frequency (0 - logging is disabled)
|
# log one of cassandra queries with specified frequency (0 - logging is disabled)
|
||||||
print_queries_freq: "${CASSANDRA_QUERY_PRINT_FREQ:0}"
|
print_queries_freq: "${CASSANDRA_QUERY_PRINT_FREQ:0}"
|
||||||
tenant_rate_limits:
|
tenant_rate_limits:
|
||||||
enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}"
|
|
||||||
configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}"
|
|
||||||
print_tenant_names: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_PRINT_TENANT_NAMES:false}"
|
print_tenant_names: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_PRINT_TENANT_NAMES:false}"
|
||||||
|
|
||||||
# SQL configuration parameters
|
# SQL configuration parameters
|
||||||
|
|||||||
@ -130,6 +130,11 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H
|
|||||||
.map(profileConfiguration -> (DefaultTenantProfileConfiguration) profileConfiguration);
|
.map(profileConfiguration -> (DefaultTenantProfileConfiguration) profileConfiguration);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public DefaultTenantProfileConfiguration getDefaultProfileConfiguration() {
|
||||||
|
return getProfileConfiguration().orElse(null);
|
||||||
|
}
|
||||||
|
|
||||||
public TenantProfileData createDefaultTenantProfileData() {
|
public TenantProfileData createDefaultTenantProfileData() {
|
||||||
TenantProfileData tpd = new TenantProfileData();
|
TenantProfileData tpd = new TenantProfileData();
|
||||||
tpd.setConfiguration(new DefaultTenantProfileConfiguration());
|
tpd.setConfiguration(new DefaultTenantProfileConfiguration());
|
||||||
@ -146,9 +151,4 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonIgnore
|
|
||||||
public DefaultTenantProfileConfiguration getDefaultTenantProfileConfiguration() {
|
|
||||||
return getProfileConfiguration().orElse(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,6 +46,16 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
|
|||||||
private String transportDeviceTelemetryMsgRateLimit;
|
private String transportDeviceTelemetryMsgRateLimit;
|
||||||
private String transportDeviceTelemetryDataPointsRateLimit;
|
private String transportDeviceTelemetryDataPointsRateLimit;
|
||||||
|
|
||||||
|
private long maxTransportMessages;
|
||||||
|
private long maxTransportDataPoints;
|
||||||
|
private long maxREExecutions;
|
||||||
|
private long maxJSExecutions;
|
||||||
|
private long maxDPStorageDays;
|
||||||
|
private int maxRuleNodeExecutionsPerMessage;
|
||||||
|
private long maxEmails;
|
||||||
|
private long maxSms;
|
||||||
|
private long maxCreatedAlarms;
|
||||||
|
|
||||||
private String tenantServerRestLimitsConfiguration;
|
private String tenantServerRestLimitsConfiguration;
|
||||||
private String customerServerRestLimitsConfiguration;
|
private String customerServerRestLimitsConfiguration;
|
||||||
|
|
||||||
@ -62,16 +72,6 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
|
|||||||
|
|
||||||
private String cassandraQueryTenantRateLimitsConfiguration;
|
private String cassandraQueryTenantRateLimitsConfiguration;
|
||||||
|
|
||||||
private long maxTransportMessages;
|
|
||||||
private long maxTransportDataPoints;
|
|
||||||
private long maxREExecutions;
|
|
||||||
private long maxJSExecutions;
|
|
||||||
private long maxDPStorageDays;
|
|
||||||
private int maxRuleNodeExecutionsPerMessage;
|
|
||||||
private long maxEmails;
|
|
||||||
private long maxSms;
|
|
||||||
private long maxCreatedAlarms;
|
|
||||||
|
|
||||||
private int defaultStorageTtlDays;
|
private int defaultStorageTtlDays;
|
||||||
private int alarmsTtlDays;
|
private int alarmsTtlDays;
|
||||||
private int rpcTtlDays;
|
private int rpcTtlDays;
|
||||||
|
|||||||
@ -116,7 +116,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
F result = wrap(task, settableFuture);
|
F result = wrap(task, settableFuture);
|
||||||
|
|
||||||
boolean perTenantLimitReached = false;
|
boolean perTenantLimitReached = false;
|
||||||
var tenantProfileConfiguration = tenantProfileCache.get(task.getTenantId()).getDefaultTenantProfileConfiguration();
|
var tenantProfileConfiguration = tenantProfileCache.get(task.getTenantId()).getDefaultProfileConfiguration();
|
||||||
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) {
|
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) {
|
||||||
if (task.getTenantId() == null) {
|
if (task.getTenantId() == null) {
|
||||||
log.info("Invalid task received: {}", task);
|
log.info("Invalid task received: {}", task);
|
||||||
@ -131,6 +131,8 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
perTenantLimitReached = true;
|
perTenantLimitReached = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (!TenantId.SYS_TENANT_ID.equals(task.getTenantId())) {
|
||||||
|
perTenantLimits.remove(task.getTenantId());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!perTenantLimitReached) {
|
if (!perTenantLimitReached) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user