Merge branch 'ViacheslavKlimov-rate-limits-to-tenant-profile' into develop/3.4

This commit is contained in:
Igor Kulikov 2022-06-17 13:04:43 +03:00
commit 48a9907add
18 changed files with 468 additions and 161 deletions

View File

@ -15,17 +15,18 @@
*/
package org.thingsboard.server.config;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.GenericFilterBean;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
import org.thingsboard.server.service.security.model.SecurityUser;
@ -35,42 +36,32 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Component
public class RateLimitProcessingFilter extends GenericFilterBean {
@Value("${server.rest.limits.tenant.enabled:false}")
private boolean perTenantLimitsEnabled;
@Value("${server.rest.limits.tenant.configuration:}")
private String perTenantLimitsConfiguration;
@Value("${server.rest.limits.customer.enabled:false}")
private boolean perCustomerLimitsEnabled;
@Value("${server.rest.limits.customer.configuration:}")
private String perCustomerLimitsConfiguration;
@Autowired
private ThingsboardErrorResponseHandler errorResponseHandler;
private ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
private ConcurrentMap<CustomerId, TbRateLimits> perCustomerLimits = new ConcurrentHashMap<>();
@Autowired
private TbTenantProfileCache tenantProfileCache;
private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
private final ConcurrentMap<CustomerId, TbRateLimits> perCustomerLimits = new ConcurrentHashMap<>();
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
SecurityUser user = getCurrentUser();
if (user != null && !user.isSystemAdmin()) {
if (perTenantLimitsEnabled) {
TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(user.getTenantId(), id -> new TbRateLimits(perTenantLimitsConfiguration));
if (!rateLimits.tryConsume()) {
errorResponseHandler.handle(new TbRateLimitsException(EntityType.TENANT), (HttpServletResponse) response);
return;
}
var profileConfiguration = tenantProfileCache.get(user.getTenantId()).getDefaultProfileConfiguration();
if (!checkRateLimits(user.getTenantId(), profileConfiguration.getTenantServerRestLimitsConfiguration(), perTenantLimits, response)) {
return;
}
if (perCustomerLimitsEnabled && user.isCustomerUser()) {
TbRateLimits rateLimits = perCustomerLimits.computeIfAbsent(user.getCustomerId(), id -> new TbRateLimits(perCustomerLimitsConfiguration));
if (!rateLimits.tryConsume()) {
errorResponseHandler.handle(new TbRateLimitsException(EntityType.CUSTOMER), (HttpServletResponse) response);
if (user.isCustomerUser()) {
if (!checkRateLimits(user.getCustomerId(), profileConfiguration.getCustomerServerRestLimitsConfiguration(), perCustomerLimits, response)) {
return;
}
}
@ -78,6 +69,25 @@ public class RateLimitProcessingFilter extends GenericFilterBean {
chain.doFilter(request, response);
}
private <I extends EntityId> boolean checkRateLimits(I ownerId, String rateLimitConfig, Map<I, TbRateLimits> rateLimitsMap, ServletResponse response) {
if (StringUtils.isNotEmpty(rateLimitConfig)) {
TbRateLimits rateLimits = rateLimitsMap.get(ownerId);
if (rateLimits == null || !rateLimits.getConfiguration().equals(rateLimitConfig)) {
rateLimits = new TbRateLimits(rateLimitConfig);
rateLimitsMap.put(ownerId, rateLimits);
}
if (!rateLimits.tryConsume()) {
errorResponseHandler.handle(new TbRateLimitsException(ownerId.getEntityType()), (HttpServletResponse) response);
return false;
}
} else {
rateLimitsMap.remove(ownerId);
}
return true;
}
protected SecurityUser getCurrentUser() {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication != null && authentication.getPrincipal() instanceof SecurityUser) {

View File

@ -16,12 +16,12 @@
package org.thingsboard.server.controller.plugin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.BeanCreationNotAllowedException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
@ -34,10 +34,10 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import org.thingsboard.server.config.WebSocketConfiguration;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.model.UserPrincipal;
import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.SessionEvent;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
@ -72,22 +72,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
@Autowired
private TelemetryWebSocketService webSocketService;
@Autowired
private TbTenantProfileCache tenantProfileCache;
@Value("${server.ws.send_timeout:5000}")
private long sendTimeout;
@Value("${server.ws.limits.max_sessions_per_tenant:0}")
private int maxSessionsPerTenant;
@Value("${server.ws.limits.max_sessions_per_customer:0}")
private int maxSessionsPerCustomer;
@Value("${server.ws.limits.max_sessions_per_regular_user:0}")
private int maxSessionsPerRegularUser;
@Value("${server.ws.limits.max_sessions_per_public_user:0}")
private int maxSessionsPerPublicUser;
@Value("${server.ws.limits.max_queue_per_ws_session:1000}")
private int maxMsgQueuePerSession;
@Value("${server.ws.limits.max_updates_per_session:}")
private String perSessionUpdatesConfiguration;
@Value("${server.ws.ping_timeout:30000}")
private long pingTimeout;
@ -144,10 +133,13 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
String internalSessionId = session.getId();
TelemetryWebSocketSessionRef sessionRef = toRef(session);
String externalSessionId = sessionRef.getSessionId();
if (!checkLimits(session, sessionRef)) {
return;
}
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, maxMsgQueuePerSession));
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsMsgQueueLimitPerSession() > 0 ?
tenantProfileConfiguration.getWsMsgQueueLimitPerSession() : 500));
externalSessionMap.put(externalSessionId, internalSessionId);
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
@ -323,8 +315,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
if (internalId != null) {
SessionMetaData sessionMd = internalSessionMap.get(internalId);
if (sessionMd != null) {
if (!StringUtils.isEmpty(perSessionUpdatesConfiguration)) {
TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(perSessionUpdatesConfiguration));
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())) {
TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit()));
if (!rateLimits.tryConsume()) {
if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached"
@ -336,6 +329,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
log.debug("[{}][{}][{}] Session is no longer blacklisted.", sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
blacklistedSessions.remove(externalId);
}
} else {
perSessionUpdateLimits.remove(sessionRef.getSessionId());
}
sessionMd.sendMsg(msg);
} else {
@ -380,11 +375,17 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
}
private boolean checkLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) throws Exception {
var tenantProfileConfiguration =
tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
if (tenantProfileConfiguration == null) {
return true;
}
String sessionId = session.getId();
if (maxSessionsPerTenant > 0) {
if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) {
Set<String> tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (tenantSessions) {
if (tenantSessions.size() < maxSessionsPerTenant) {
if (tenantSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerTenant()) {
tenantSessions.add(sessionId);
} else {
log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached"
@ -396,10 +397,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
}
if (sessionRef.getSecurityCtx().isCustomerUser()) {
if (maxSessionsPerCustomer > 0) {
if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) {
Set<String> customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
synchronized (customerSessions) {
if (customerSessions.size() < maxSessionsPerCustomer) {
if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerCustomer()) {
customerSessions.add(sessionId);
} else {
log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached"
@ -409,10 +410,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
}
}
}
if (maxSessionsPerRegularUser > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0
&& UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (regularUserSessions) {
if (regularUserSessions.size() < maxSessionsPerRegularUser) {
if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerRegularUser()) {
regularUserSessions.add(sessionId);
} else {
log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached"
@ -422,10 +424,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
}
}
}
if (maxSessionsPerPublicUser > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0
&& UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
if (publicUserSessions.size() < maxSessionsPerPublicUser) {
if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerPublicUser()) {
publicUserSessions.add(sessionId);
} else {
log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached"
@ -440,29 +443,31 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
}
private void cleanupLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
String sessionId = session.getId();
perSessionUpdateLimits.remove(sessionRef.getSessionId());
blacklistedSessions.remove(sessionRef.getSessionId());
if (maxSessionsPerTenant > 0) {
if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) {
Set<String> tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (tenantSessions) {
tenantSessions.remove(sessionId);
}
}
if (sessionRef.getSecurityCtx().isCustomerUser()) {
if (maxSessionsPerCustomer > 0) {
if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) {
Set<String> customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
synchronized (customerSessions) {
customerSessions.remove(sessionId);
}
}
if (maxSessionsPerRegularUser > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (regularUserSessions) {
regularUserSessions.remove(sessionId);
}
}
if (maxSessionsPerPublicUser > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
publicUserSessions.remove(sessionId);

View File

@ -21,7 +21,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
@ -56,12 +55,9 @@ import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.alarm.AlarmDao;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.model.sql.DeviceProfileEntity;
import org.thingsboard.server.dao.model.sql.RelationEntity;
import org.thingsboard.server.dao.oauth2.OAuth2Service;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService;
@ -69,7 +65,6 @@ import org.thingsboard.server.dao.sql.device.DeviceProfileRepository;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.service.install.InstallScripts;
import org.thingsboard.server.service.install.SystemDataLoaderService;
import org.thingsboard.server.service.install.TbRuleEngineQueueConfigService;
@ -107,9 +102,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
@Autowired
private TimeseriesService tsService;
@Autowired
private AlarmService alarmService;
@Autowired
private EntityService entityService;
@ -120,7 +112,7 @@ public class DefaultDataUpdateService implements DataUpdateService {
private DeviceProfileRepository deviceProfileRepository;
@Autowired
private OAuth2Service oAuth2Service;
private RateLimitsUpdater rateLimitsUpdater;
@Autowired
private TenantProfileService tenantProfileService;
@ -162,8 +154,9 @@ public class DefaultDataUpdateService implements DataUpdateService {
break;
case "3.3.4":
log.info("Updating data from version 3.3.4 to 3.4.0 ...");
tenantsProfileQueueConfigurationUpdater.updateEntities(null);
checkPointRuleNodesUpdater.updateEntities(null);
rateLimitsUpdater.updateEntities();
tenantsProfileQueueConfigurationUpdater.updateEntities();
checkPointRuleNodesUpdater.updateEntities();
break;
default:
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);

View File

@ -49,6 +49,10 @@ public abstract class PaginatedUpdater<I, D> {
}
}
public void updateEntities() {
updateEntities(null);
}
protected boolean forceReportTotal() {
return false;
}

View File

@ -0,0 +1,115 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.install.update;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.tenant.TenantProfileService;
@Component
class RateLimitsUpdater extends PaginatedUpdater<String, TenantProfile> {
@Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_TENANT_ENABLED') ?: environment.getProperty('server.rest.limits.tenant.enabled') ?: 'false' }")
boolean tenantServerRestLimitsEnabled;
@Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_TENANT_CONFIGURATION') ?: environment.getProperty('server.rest.limits.tenant.configuration') ?: '100:1,2000:60' }")
String tenantServerRestLimitsConfiguration;
@Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_CUSTOMER_ENABLED') ?: environment.getProperty('server.rest.limits.customer.enabled') ?: 'false' }")
boolean customerServerRestLimitsEnabled;
@Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_CUSTOMER_CONFIGURATION') ?: environment.getProperty('server.rest.limits.customer.configuration') ?: '50:1,1000:60' }")
String customerServerRestLimitsConfiguration;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT') ?: environment.getProperty('server.ws.limits.max_sessions_per_tenant') ?: '0' }")
private int maxWsSessionsPerTenant;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER') ?: environment.getProperty('server.ws.limits.max_sessions_per_customer') ?: '0' }")
private int maxWsSessionsPerCustomer;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER') ?: environment.getProperty('server.ws.limits.max_sessions_per_regular_user') ?: '0' }")
private int maxWsSessionsPerRegularUser;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER') ?: environment.getProperty('server.ws.limits.max_sessions_per_public_user') ?: '0' }")
private int maxWsSessionsPerPublicUser;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION') ?: environment.getProperty('server.ws.limits.max_queue_per_ws_session') ?: '500' }")
private int wsMsgQueueLimitPerSession;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_tenant') ?: '0' }")
private long maxWsSubscriptionsPerTenant;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_customer') ?: '0' }")
private long maxWsSubscriptionsPerCustomer;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_regular_user') ?: '0' }")
private long maxWsSubscriptionsPerRegularUser;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_public_user') ?: '0' }")
private long maxWsSubscriptionsPerPublicUser;
@Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION') ?: environment.getProperty('server.ws.limits.max_updates_per_session') ?: '300:1,3000:60' }")
private String wsUpdatesPerSessionRateLimit;
@Value("#{ environment.getProperty('CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED') ?: environment.getProperty('cassandra.query.tenant_rate_limits.enabled') ?: 'false' }")
private boolean cassandraQueryTenantRateLimitsEnabled;
@Value("#{ environment.getProperty('CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION') ?: environment.getProperty('cassandra.query.tenant_rate_limits.configuration') ?: '1000:1,30000:60' }")
private String cassandraQueryTenantRateLimitsConfiguration;
@Autowired
private TenantProfileService tenantProfileService;
@Override
protected boolean forceReportTotal() {
return true;
}
@Override
protected String getName() {
return "Rate limits updater";
}
@Override
protected PageData<TenantProfile> findEntities(String id, PageLink pageLink) {
return tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, pageLink);
}
@Override
protected void updateEntity(TenantProfile tenantProfile) {
var profileConfiguration = tenantProfile.getDefaultProfileConfiguration();
if (tenantServerRestLimitsEnabled && StringUtils.isNotEmpty(tenantServerRestLimitsConfiguration)) {
profileConfiguration.setTenantServerRestLimitsConfiguration(tenantServerRestLimitsConfiguration);
}
if (customerServerRestLimitsEnabled && StringUtils.isNotEmpty(customerServerRestLimitsConfiguration)) {
profileConfiguration.setCustomerServerRestLimitsConfiguration(customerServerRestLimitsConfiguration);
}
profileConfiguration.setMaxWsSessionsPerTenant(maxWsSessionsPerTenant);
profileConfiguration.setMaxWsSessionsPerCustomer(maxWsSessionsPerCustomer);
profileConfiguration.setMaxWsSessionsPerPublicUser(maxWsSessionsPerPublicUser);
profileConfiguration.setMaxWsSessionsPerRegularUser(maxWsSessionsPerRegularUser);
profileConfiguration.setMaxWsSubscriptionsPerTenant(maxWsSubscriptionsPerTenant);
profileConfiguration.setMaxWsSubscriptionsPerCustomer(maxWsSubscriptionsPerCustomer);
profileConfiguration.setMaxWsSubscriptionsPerPublicUser(maxWsSubscriptionsPerPublicUser);
profileConfiguration.setMaxWsSubscriptionsPerRegularUser(maxWsSubscriptionsPerRegularUser);
profileConfiguration.setWsMsgQueueLimitPerSession(wsMsgQueueLimitPerSession);
if (StringUtils.isNotEmpty(wsUpdatesPerSessionRateLimit)) {
profileConfiguration.setWsUpdatesPerSessionRateLimit(wsUpdatesPerSessionRateLimit);
}
if (cassandraQueryTenantRateLimitsEnabled && StringUtils.isNotEmpty(cassandraQueryTenantRateLimitsConfiguration)) {
profileConfiguration.setCassandraQueryTenantRateLimitsConfiguration(cassandraQueryTenantRateLimitsConfiguration);
}
tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, tenantProfile);
}
}

View File

@ -149,7 +149,7 @@ public class DefaultTwoFactorAuthService implements TwoFactorAuthService {
ConcurrentMap<TwoFaProviderType, TbRateLimits> providersRateLimits = rateLimits.computeIfAbsent(userId, i -> new ConcurrentHashMap<>());
TbRateLimits rateLimit = providersRateLimits.get(providerType);
if (rateLimit == null || !rateLimit.getConfig().equals(rateLimitConfig)) {
if (rateLimit == null || !rateLimit.getConfiguration().equals(rateLimitConfig)) {
rateLimit = new TbRateLimits(rateLimitConfig, true);
providersRateLimits.put(providerType, rateLimit);
}

View File

@ -42,7 +42,9 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.util.TenantRateLimitException;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
@ -138,14 +140,8 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
@Autowired
private TbServiceInfoProvider serviceInfoProvider;
@Value("${server.ws.limits.max_subscriptions_per_tenant:0}")
private int maxSubscriptionsPerTenant;
@Value("${server.ws.limits.max_subscriptions_per_customer:0}")
private int maxSubscriptionsPerCustomer;
@Value("${server.ws.limits.max_subscriptions_per_regular_user:0}")
private int maxSubscriptionsPerRegularUser;
@Value("${server.ws.limits.max_subscriptions_per_public_user:0}")
private int maxSubscriptionsPerPublicUser;
@Autowired
private TbTenantProfileCache tenantProfileCache;
@Value("${server.ws.ping_timeout:30000}")
private long pingTimeout;
@ -320,44 +316,50 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
private void processSessionClose(TelemetryWebSocketSessionRef sessionRef) {
String sessionId = "[" + sessionRef.getSessionId() + "]";
if (maxSubscriptionsPerTenant > 0) {
Set<String> tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (tenantSubscriptions) {
tenantSubscriptions.removeIf(subId -> subId.startsWith(sessionId));
}
}
if (sessionRef.getSecurityCtx().isCustomerUser()) {
if (maxSubscriptionsPerCustomer > 0) {
Set<String> customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
synchronized (customerSessions) {
customerSessions.removeIf(subId -> subId.startsWith(sessionId));
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
if (tenantProfileConfiguration != null) {
String sessionId = "[" + sessionRef.getSessionId() + "]";
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) {
Set<String> tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (tenantSubscriptions) {
tenantSubscriptions.removeIf(subId -> subId.startsWith(sessionId));
}
}
if (maxSubscriptionsPerRegularUser > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (regularUserSessions) {
regularUserSessions.removeIf(subId -> subId.startsWith(sessionId));
if (sessionRef.getSecurityCtx().isCustomerUser()) {
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) {
Set<String> customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
synchronized (customerSessions) {
customerSessions.removeIf(subId -> subId.startsWith(sessionId));
}
}
}
if (maxSubscriptionsPerPublicUser > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
publicUserSessions.removeIf(subId -> subId.startsWith(sessionId));
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (regularUserSessions) {
regularUserSessions.removeIf(subId -> subId.startsWith(sessionId));
}
}
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
publicUserSessions.removeIf(subId -> subId.startsWith(sessionId));
}
}
}
}
}
private boolean processSubscription(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
var tenantProfileConfiguration = (DefaultTenantProfileConfiguration) tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration();
String subId = "[" + sessionRef.getSessionId() + "]:[" + cmd.getCmdId() + "]";
try {
if (maxSubscriptionsPerTenant > 0) {
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) {
Set<String> tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
synchronized (tenantSubscriptions) {
if (cmd.isUnsubscribe()) {
tenantSubscriptions.remove(subId);
} else if (tenantSubscriptions.size() < maxSubscriptionsPerTenant) {
} else if (tenantSubscriptions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant()) {
tenantSubscriptions.add(subId);
} else {
log.info("[{}][{}][{}] Failed to start subscription. Max tenant subscriptions limit reached"
@ -369,12 +371,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
if (sessionRef.getSecurityCtx().isCustomerUser()) {
if (maxSubscriptionsPerCustomer > 0) {
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) {
Set<String> customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
synchronized (customerSessions) {
if (cmd.isUnsubscribe()) {
customerSessions.remove(subId);
} else if (customerSessions.size() < maxSubscriptionsPerCustomer) {
} else if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer()) {
customerSessions.add(subId);
} else {
log.info("[{}][{}][{}] Failed to start subscription. Max customer subscriptions limit reached"
@ -384,10 +386,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
}
}
if (maxSubscriptionsPerRegularUser > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (regularUserSessions) {
if (regularUserSessions.size() < maxSubscriptionsPerRegularUser) {
if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser()) {
regularUserSessions.add(subId);
} else {
log.info("[{}][{}][{}] Failed to start subscription. Max regular user subscriptions limit reached"
@ -397,10 +399,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
}
}
if (maxSubscriptionsPerPublicUser > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
synchronized (publicUserSessions) {
if (publicUserSessions.size() < maxSubscriptionsPerPublicUser) {
if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) {
publicUserSessions.add(subId);
} else {
log.info("[{}][{}][{}] Failed to start subscription. Max public user subscriptions limit reached"

View File

@ -58,18 +58,6 @@ server:
send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
# recommended timeout >= 30 seconds. Platform will attempt to send 'ping' request 3 times within the timeout
ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:30000}"
limits:
# Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
max_sessions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER:0}"
max_sessions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER:0}"
max_sessions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER:0}"
max_queue_per_ws_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION:500}"
max_subscriptions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT:0}"
max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}"
max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}"
max_subscriptions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER:0}"
max_updates_per_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION:300:1,3000:60}"
dynamic_page_link:
refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}"
refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}"
@ -78,13 +66,6 @@ server:
max_entities_per_data_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_DATA_SUBSCRIPTION:10000}"
max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}"
rest:
limits:
tenant:
enabled: "${TB_SERVER_REST_LIMITS_TENANT_ENABLED:false}"
configuration: "${TB_SERVER_REST_LIMITS_TENANT_CONFIGURATION:100:1,2000:60}"
customer:
enabled: "${TB_SERVER_REST_LIMITS_CUSTOMER_ENABLED:false}"
configuration: "${TB_SERVER_REST_LIMITS_CUSTOMER_CONFIGURATION:50:1,1000:60}"
server_side_rpc:
# Minimum value of the server side RPC timeout. May override value provided in the REST API call.
# Since 2.5 migration to queues, the RPC delay depends on the size of the pending messages in the queue,
@ -255,8 +236,6 @@ cassandra:
# log one of cassandra queries with specified frequency (0 - logging is disabled)
print_queries_freq: "${CASSANDRA_QUERY_PRINT_FREQ:0}"
tenant_rate_limits:
enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}"
configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}"
print_tenant_names: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_PRINT_TENANT_NAMES:false}"
# SQL configuration parameters

View File

@ -85,7 +85,7 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H
@ApiModelProperty(position = 1, value = "JSON object with the tenant profile Id. " +
"Specify this field to update the tenant profile. " +
"Referencing non-existing tenant profile Id will cause error. " +
"Omit this field to create new tenant profile." )
"Omit this field to create new tenant profile.")
@Override
public TenantProfileId getId() {
return super.getId();
@ -132,9 +132,15 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H
.map(profileConfiguration -> (DefaultTenantProfileConfiguration) profileConfiguration);
}
@JsonIgnore
public DefaultTenantProfileConfiguration getDefaultProfileConfiguration() {
return getProfileConfiguration().orElse(null);
}
public TenantProfileData createDefaultTenantProfileData() {
TenantProfileData tpd = new TenantProfileData();
tpd.setConfiguration(new DefaultTenantProfileConfiguration());
this.profileData = tpd;
return tpd;
}

View File

@ -56,6 +56,22 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
private long maxSms;
private long maxCreatedAlarms;
private String tenantServerRestLimitsConfiguration;
private String customerServerRestLimitsConfiguration;
private int maxWsSessionsPerTenant;
private int maxWsSessionsPerCustomer;
private int maxWsSessionsPerRegularUser;
private int maxWsSessionsPerPublicUser;
private int wsMsgQueueLimitPerSession;
private long maxWsSubscriptionsPerTenant;
private long maxWsSubscriptionsPerCustomer;
private long maxWsSubscriptionsPerRegularUser;
private long maxWsSubscriptionsPerPublicUser;
private String wsUpdatesPerSessionRateLimit;
private String cassandraQueryTenantRateLimitsConfiguration;
private int defaultStorageTtlDays;
private int alarmsTtlDays;
private int rpcTtlDays;

View File

@ -20,6 +20,7 @@ import io.github.bucket4j.Bucket4j;
import io.github.bucket4j.Refill;
import io.github.bucket4j.local.LocalBucket;
import io.github.bucket4j.local.LocalBucketBuilder;
import lombok.Getter;
import java.time.Duration;
@ -28,7 +29,9 @@ import java.time.Duration;
*/
public class TbRateLimits {
private final LocalBucket bucket;
private final String config;
@Getter
private final String configuration;
public TbRateLimits(String limitsConfiguration) {
this(limitsConfiguration, false);
@ -49,7 +52,7 @@ public class TbRateLimits {
} else {
throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);
}
this.config = limitsConfiguration;
this.configuration = limitsConfiguration;
}
public boolean tryConsume() {
@ -60,8 +63,4 @@ public class TbRateLimits {
return bucket.tryConsume(number);
}
public String getConfig() {
return config;
}
}

View File

@ -22,8 +22,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
import org.thingsboard.server.dao.util.AsyncTaskContext;
import org.thingsboard.server.dao.util.NoSqlAnyDao;
@ -47,14 +51,13 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu
@Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
@Value("${cassandra.query.callback_threads:4}") int callbackThreads,
@Value("${cassandra.query.poll_ms:50}") long pollMs,
@Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
@Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
@Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
@Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
@Autowired StatsFactory statsFactory,
@Autowired EntityService entityService) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory,
entityService, printTenantNames);
@Autowired EntityService entityService,
@Autowired TbTenantProfileCache tenantProfileCache) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory,
entityService, tenantProfileCache, printTenantNames);
}
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")

View File

@ -24,6 +24,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
import org.thingsboard.server.dao.util.AsyncTaskContext;
import org.thingsboard.server.dao.util.NoSqlAnyDao;
@ -47,14 +48,13 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec
@Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
@Value("${cassandra.query.callback_threads:4}") int callbackThreads,
@Value("${cassandra.query.poll_ms:50}") long pollMs,
@Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
@Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
@Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
@Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
@Autowired StatsFactory statsFactory,
@Autowired EntityService entityService) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory,
entityService, printTenantNames);
@Autowired EntityService entityService,
@Autowired TbTenantProfileCache tenantProfileCache) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory,
entityService, tenantProfileCache, printTenantNames);
}
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")

View File

@ -30,6 +30,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import org.thingsboard.server.common.stats.DefaultCounter;
@ -38,6 +39,7 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import javax.annotation.Nullable;
import java.util.HashMap;
@ -71,8 +73,6 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
private final ScheduledExecutorService timeoutExecutor;
private final int concurrencyLimit;
private final int printQueriesFreq;
private final boolean perTenantLimitsEnabled;
private final String perTenantLimitsConfiguration;
private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
private final AtomicInteger printQueriesIdx = new AtomicInteger(0);
@ -80,15 +80,15 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
protected final AtomicInteger concurrencyLevel;
protected final BufferedRateExecutorStats stats;
private final EntityService entityService;
private final Map<TenantId, String> tenantNamesCache = new HashMap<>();
private final TbTenantProfileCache tenantProfileCache;
private final boolean printTenantNames;
private final Map<TenantId, String> tenantNamesCache = new HashMap<>();
public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs,
boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq, StatsFactory statsFactory,
EntityService entityService, boolean printTenantNames) {
public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads,
int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory,
EntityService entityService, TbTenantProfileCache tenantProfileCache, boolean printTenantNames) {
this.maxWaitTime = maxWaitTime;
this.pollMs = pollMs;
this.concurrencyLimit = concurrencyLimit;
@ -97,13 +97,12 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-dispatcher"));
this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreads, "nosql-" + getBufferName() + "-callback");
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-timeout"));
this.perTenantLimitsEnabled = perTenantLimitsEnabled;
this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;
this.stats = new BufferedRateExecutorStats(statsFactory);
String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL + getBufferName(); //metric name may change with buffer name suffix
this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0));
this.entityService = entityService;
this.tenantProfileCache = tenantProfileCache;
this.printTenantNames = printTenantNames;
for (int i = 0; i < dispatcherThreads; i++) {
@ -115,12 +114,16 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
public F submit(T task) {
SettableFuture<V> settableFuture = create();
F result = wrap(task, settableFuture);
boolean perTenantLimitReached = false;
if (perTenantLimitsEnabled) {
var tenantProfileConfiguration = tenantProfileCache.get(task.getTenantId()).getDefaultProfileConfiguration();
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) {
if (task.getTenantId() == null) {
log.info("Invalid task received: {}", task);
} else if (!task.getTenantId().isNullUid()) {
TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(task.getTenantId(), id -> new TbRateLimits(perTenantLimitsConfiguration));
TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(
task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())
);
if (!rateLimits.tryConsume()) {
stats.incrementRateLimitedTenant(task.getTenantId());
stats.getTotalRateLimited().increment();
@ -128,7 +131,10 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
perTenantLimitReached = true;
}
}
} else if (!TenantId.SYS_TENANT_ID.equals(task.getTenantId())) {
perTenantLimits.remove(task.getTenantId());
}
if (!perTenantLimitReached) {
try {
stats.getTotalAdded().increment();

View File

@ -256,21 +256,34 @@
{{ 'tenant-profile.max-created-alarms-range' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.transport-tenant-msg-rate-limit</mat-label>
<input matInput formControlName="transportTenantMsgRateLimit">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('transportTenantMsgRateLimit').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.transport-tenant-telemetry-msg-rate-limit</mat-label>
<input matInput formControlName="transportTenantTelemetryMsgRateLimit">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('transportTenantTelemetryMsgRateLimit').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.transport-tenant-telemetry-data-points-rate-limit</mat-label>
<input matInput formControlName="transportTenantTelemetryDataPointsRateLimit">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('transportTenantTelemetryDataPointsRateLimit').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.transport-device-msg-rate-limit</mat-label>
<input matInput formControlName="transportDeviceMsgRateLimit">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('transportDeviceMsgRateLimit').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.transport-device-telemetry-msg-rate-limit</mat-label>
@ -279,5 +292,103 @@
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.transport-device-telemetry-data-points-rate-limit</mat-label>
<input matInput formControlName="transportDeviceTelemetryDataPointsRateLimit">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('transportDeviceTelemetryDataPointsRateLimit').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.tenant-rest-limits</mat-label>
<input matInput
formControlName="tenantServerRestLimitsConfiguration"
type="text">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('tenantServerRestLimitsConfiguration').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.customer-rest-limits</mat-label>
<input matInput
formControlName="customerServerRestLimitsConfiguration"
type="text">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('customerServerRestLimitsConfiguration').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-max-sessions-per-tenant</mat-label>
<input matInput type="number" formControlName="maxWsSessionsPerTenant">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('maxWsSessionsPerTenant').hasError('min')">
{{ 'tenant-profile.too-small-value-zero' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-max-sessions-per-customer</mat-label>
<input matInput type="number" formControlName="maxWsSessionsPerCustomer">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('maxWsSessionsPerCustomer').hasError('min')">
{{ 'tenant-profile.too-small-value-zero' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-max-sessions-per-public-user</mat-label>
<input matInput type="number" formControlName="maxWsSessionsPerRegularUser">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('maxWsSessionsPerRegularUser').hasError('min')">
{{ 'tenant-profile.too-small-value-zero' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-max-sessions-per-public-user</mat-label>
<input matInput type="number" formControlName="maxWsSessionsPerPublicUser">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('maxWsSessionsPerPublicUser').hasError('min')">
{{ 'tenant-profile.too-small-value-zero' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-queue-per-session</mat-label>
<input matInput type="number" formControlName="wsMsgQueueLimitPerSession">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('wsMsgQueueLimitPerSession').hasError('min')">
{{ 'tenant-profile.too-small-value-one' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-max-subscriptions-per-tenant</mat-label>
<input matInput type="number" formControlName="maxWsSubscriptionsPerTenant">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('maxWsSubscriptionsPerTenant').hasError('min')">
{{ 'tenant-profile.too-small-value-zero' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-max-subscriptions-per-customer</mat-label>
<input matInput type="number" formControlName="maxWsSubscriptionsPerCustomer">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('maxWsSubscriptionsPerCustomer').hasError('min')">
{{ 'tenant-profile.too-small-value-zero' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-max-subscriptions-per-regular-user</mat-label>
<input matInput type="number" formControlName="maxWsSubscriptionsPerRegularUser">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('maxWsSubscriptionsPerRegularUser').hasError('min')">
{{ 'tenant-profile.too-small-value-zero' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-max-subscriptions-per-public-user</mat-label>
<input matInput type="number" formControlName="maxWsSubscriptionsPerPublicUser">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('maxWsSubscriptionsPerPublicUser').hasError('min')">
{{ 'tenant-profile.too-small-value-zero' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.ws-limit-updates-per-session</mat-label>
<input matInput type="text" formControlName="wsUpdatesPerSessionRateLimit">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('wsUpdatesPerSessionRateLimit').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.cassandra-tenant-limits-configuration</mat-label>
<input matInput type="text" formControlName="cassandraQueryTenantRateLimitsConfiguration">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('cassandraQueryTenantRateLimitsConfiguration').hasError('pattern')">
{{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}}
</mat-error>
</mat-form-field>
</section>

View File

@ -35,6 +35,7 @@ import { isDefinedAndNotNull } from '@core/utils';
export class DefaultTenantProfileConfigurationComponent implements ControlValueAccessor, OnInit {
defaultTenantProfileConfigurationFormGroup: FormGroup;
rateLimitsPattern = '^(((\\d+):(\\d+)),)*((\\d+):(\\d+))$';
private requiredValue: boolean;
get required(): boolean {
@ -61,12 +62,12 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA
maxRuleChains: [null, [Validators.required, Validators.min(0)]],
maxResourcesInBytes: [null, [Validators.required, Validators.min(0)]],
maxOtaPackagesInBytes: [null, [Validators.required, Validators.min(0)]],
transportTenantMsgRateLimit: [null, []],
transportTenantTelemetryMsgRateLimit: [null, []],
transportTenantTelemetryDataPointsRateLimit: [null, []],
transportDeviceMsgRateLimit: [null, []],
transportDeviceTelemetryMsgRateLimit: [null, []],
transportDeviceTelemetryDataPointsRateLimit: [null, []],
transportTenantMsgRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]],
transportTenantTelemetryMsgRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]],
transportTenantTelemetryDataPointsRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]],
transportDeviceMsgRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]],
transportDeviceTelemetryMsgRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]],
transportDeviceTelemetryDataPointsRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]],
maxTransportMessages: [null, [Validators.required, Validators.min(0)]],
maxTransportDataPoints: [null, [Validators.required, Validators.min(0)]],
maxREExecutions: [null, [Validators.required, Validators.min(0)]],
@ -78,7 +79,20 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA
maxCreatedAlarms: [null, [Validators.required, Validators.min(0)]],
defaultStorageTtlDays: [null, [Validators.required, Validators.min(0)]],
alarmsTtlDays: [null, [Validators.required, Validators.min(0)]],
rpcTtlDays: [null, [Validators.required, Validators.min(0)]]
rpcTtlDays: [null, [Validators.required, Validators.min(0)]],
tenantServerRestLimitsConfiguration: [null, [Validators.pattern(this.rateLimitsPattern)]],
customerServerRestLimitsConfiguration: [null, [Validators.pattern(this.rateLimitsPattern)]],
maxWsSessionsPerTenant: [null, [Validators.min(0)]],
maxWsSessionsPerCustomer: [null, [Validators.min(0)]],
maxWsSessionsPerRegularUser: [null, [Validators.min(0)]],
maxWsSessionsPerPublicUser: [null, [Validators.min(0)]],
wsMsgQueueLimitPerSession: [null, [Validators.min(0)]],
maxWsSubscriptionsPerTenant: [null, [Validators.min(0)]],
maxWsSubscriptionsPerCustomer: [null, [Validators.min(0)]],
maxWsSubscriptionsPerRegularUser: [null, [Validators.min(0)]],
maxWsSubscriptionsPerPublicUser: [null, [Validators.min(0)]],
wsUpdatesPerSessionRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]],
cassandraQueryTenantRateLimitsConfiguration: [null, [Validators.pattern(this.rateLimitsPattern)]]
});
this.defaultTenantProfileConfigurationFormGroup.valueChanges.subscribe(() => {
this.updateModel();

View File

@ -51,6 +51,22 @@ export interface DefaultTenantProfileConfiguration {
maxSms: number;
maxCreatedAlarms: number;
tenantServerRestLimitsConfiguration: string;
customerServerRestLimitsConfiguration: string;
maxWsSessionsPerTenant: number;
maxWsSessionsPerCustomer: number;
maxWsSessionsPerRegularUser: number;
maxWsSessionsPerPublicUser: number;
wsMsgQueueLimitPerSession: number;
maxWsSubscriptionsPerTenant: number;
maxWsSubscriptionsPerCustomer: number;
maxWsSubscriptionsPerRegularUser: number;
maxWsSubscriptionsPerPublicUser: number;
wsUpdatesPerSessionRateLimit: string;
cassandraQueryTenantRateLimitsConfiguration: string;
defaultStorageTtlDays: number;
alarmsTtlDays: number;
rpcTtlDays: number;
@ -85,9 +101,22 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan
maxEmails: 0,
maxSms: 0,
maxCreatedAlarms: 0,
tenantServerRestLimitsConfiguration: '',
customerServerRestLimitsConfiguration: '',
maxWsSessionsPerTenant: 0,
maxWsSessionsPerCustomer: 0,
maxWsSessionsPerRegularUser: 0,
maxWsSessionsPerPublicUser: 0,
wsMsgQueueLimitPerSession: 0,
maxWsSubscriptionsPerTenant: 0,
maxWsSubscriptionsPerCustomer: 0,
maxWsSubscriptionsPerRegularUser: 0,
maxWsSubscriptionsPerPublicUser: 0,
wsUpdatesPerSessionRateLimit: '',
cassandraQueryTenantRateLimitsConfiguration: '',
defaultStorageTtlDays: 0,
alarmsTtlDays: 0,
rpcTtlDays: 0
rpcTtlDays: 0,
};
configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT};
break;

View File

@ -3109,7 +3109,22 @@
"max-created-alarms-range": "Maximum number of alarms created can't be negative",
"no-queue": "No Queue configured",
"add-queue": "Add Queue",
"queues-with-count": "Queues ({{count}})"
"queues-with-count": "Queues ({{count}})",
"tenant-rest-limits": "Rate limit for REST requests for tenant",
"customer-rest-limits": "Rate limit for REST requests for customer",
"incorrect-pattern-for-rate-limits": "The format is comma separated pairs of capacity and period (in seconds) with a colon between, e.g. 100:1,2000:60",
"too-small-value-zero": "The value must be bigger than 0",
"too-small-value-one": "The value must be bigger than 1",
"cassandra-tenant-limits-configuration": "Cassandra query rate limit for tenant",
"ws-limit-max-sessions-per-tenant": "Maximum number of WS sessions per tenant",
"ws-limit-max-sessions-per-customer": "Maximum number of WS sessions per customer",
"ws-limit-max-sessions-per-public-user": "Maximum number of WS sessions per public user",
"ws-limit-queue-per-session": "Maximum size of WS message queue per session",
"ws-limit-max-subscriptions-per-tenant": "Maximum number of WS subscriptions per tenant",
"ws-limit-max-subscriptions-per-customer": "Maximum number of WS subscriptions per customer",
"ws-limit-max-subscriptions-per-regular-user": "Maximum number of WS subscriptions per regular user",
"ws-limit-max-subscriptions-per-public-user": "Maximum number of WS subscriptions per public user",
"ws-limit-updates-per-session": "Rate limit for WS updates per session"
},
"timeinterval": {
"seconds-interval": "{ seconds, plural, 1 {1 second} other {# seconds} }",