diff --git a/application/src/main/java/org/thingsboard/server/config/RateLimitProcessingFilter.java b/application/src/main/java/org/thingsboard/server/config/RateLimitProcessingFilter.java index e2e6f8d982..5cc9747ed9 100644 --- a/application/src/main/java/org/thingsboard/server/config/RateLimitProcessingFilter.java +++ b/application/src/main/java/org/thingsboard/server/config/RateLimitProcessingFilter.java @@ -15,17 +15,18 @@ */ package org.thingsboard.server.config; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import org.springframework.web.filter.GenericFilterBean; -import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.exception.ThingsboardErrorResponseHandler; import org.thingsboard.server.service.security.model.SecurityUser; @@ -35,42 +36,32 @@ import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @Component public class RateLimitProcessingFilter extends GenericFilterBean { - @Value("${server.rest.limits.tenant.enabled:false}") - private boolean perTenantLimitsEnabled; - @Value("${server.rest.limits.tenant.configuration:}") - private String perTenantLimitsConfiguration; - @Value("${server.rest.limits.customer.enabled:false}") - private boolean perCustomerLimitsEnabled; - @Value("${server.rest.limits.customer.configuration:}") - private String perCustomerLimitsConfiguration; - @Autowired private ThingsboardErrorResponseHandler errorResponseHandler; - private ConcurrentMap perTenantLimits = new ConcurrentHashMap<>(); - private ConcurrentMap perCustomerLimits = new ConcurrentHashMap<>(); + @Autowired + private TbTenantProfileCache tenantProfileCache; + + private final ConcurrentMap perTenantLimits = new ConcurrentHashMap<>(); + private final ConcurrentMap perCustomerLimits = new ConcurrentHashMap<>(); @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { SecurityUser user = getCurrentUser(); if (user != null && !user.isSystemAdmin()) { - if (perTenantLimitsEnabled) { - TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(user.getTenantId(), id -> new TbRateLimits(perTenantLimitsConfiguration)); - if (!rateLimits.tryConsume()) { - errorResponseHandler.handle(new TbRateLimitsException(EntityType.TENANT), (HttpServletResponse) response); - return; - } + var profileConfiguration = tenantProfileCache.get(user.getTenantId()).getDefaultProfileConfiguration(); + if (!checkRateLimits(user.getTenantId(), profileConfiguration.getTenantServerRestLimitsConfiguration(), perTenantLimits, response)) { + return; } - if (perCustomerLimitsEnabled && user.isCustomerUser()) { - TbRateLimits rateLimits = perCustomerLimits.computeIfAbsent(user.getCustomerId(), id -> new TbRateLimits(perCustomerLimitsConfiguration)); - if (!rateLimits.tryConsume()) { - errorResponseHandler.handle(new TbRateLimitsException(EntityType.CUSTOMER), (HttpServletResponse) response); + if (user.isCustomerUser()) { + if (!checkRateLimits(user.getCustomerId(), profileConfiguration.getCustomerServerRestLimitsConfiguration(), perCustomerLimits, response)) { return; } } @@ -78,6 +69,25 @@ public class RateLimitProcessingFilter extends GenericFilterBean { chain.doFilter(request, response); } + private boolean checkRateLimits(I ownerId, String rateLimitConfig, Map rateLimitsMap, ServletResponse response) { + if (StringUtils.isNotEmpty(rateLimitConfig)) { + TbRateLimits rateLimits = rateLimitsMap.get(ownerId); + if (rateLimits == null || !rateLimits.getConfiguration().equals(rateLimitConfig)) { + rateLimits = new TbRateLimits(rateLimitConfig); + rateLimitsMap.put(ownerId, rateLimits); + } + + if (!rateLimits.tryConsume()) { + errorResponseHandler.handle(new TbRateLimitsException(ownerId.getEntityType()), (HttpServletResponse) response); + return false; + } + } else { + rateLimitsMap.remove(ownerId); + } + + return true; + } + protected SecurityUser getCurrentUser() { Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); if (authentication != null && authentication.getPrincipal() instanceof SecurityUser) { diff --git a/application/src/main/java/org/thingsboard/server/controller/AlarmController.java b/application/src/main/java/org/thingsboard/server/controller/AlarmController.java index 939e9da8a0..b0ed1ed01d 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AlarmController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AlarmController.java @@ -226,7 +226,11 @@ public class AlarmController extends BaseController { TimePageLink pageLink = createTimePageLink(pageSize, page, textSearch, sortProperty, sortOrder, startTime, endTime); try { - return checkNotNull(alarmService.findAlarms(getCurrentUser().getTenantId(), new AlarmQuery(entityId, pageLink, alarmSearchStatus, alarmStatus, fetchOriginator)).get()); + if (getCurrentUser().isCustomerUser()) { + return checkNotNull(alarmService.findCustomerAlarms(getCurrentUser().getTenantId(), getCurrentUser().getCustomerId(), new AlarmQuery(entityId, pageLink, alarmSearchStatus, alarmStatus, fetchOriginator)).get()); + } else { + return checkNotNull(alarmService.findAlarms(getCurrentUser().getTenantId(), new AlarmQuery(entityId, pageLink, alarmSearchStatus, alarmStatus, fetchOriginator)).get()); + } } catch (Exception e) { throw handleException(e); } diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index 5f0ea800dd..578bfa5662 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -16,12 +16,12 @@ package org.thingsboard.server.controller.plugin; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.BeanCreationNotAllowedException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.security.core.Authentication; import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PongMessage; import org.springframework.web.socket.TextMessage; @@ -34,10 +34,10 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.config.WebSocketConfiguration; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.model.UserPrincipal; -import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService; import org.thingsboard.server.service.telemetry.SessionEvent; import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; @@ -72,22 +72,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @Autowired private TelemetryWebSocketService webSocketService; + @Autowired + private TbTenantProfileCache tenantProfileCache; + @Value("${server.ws.send_timeout:5000}") private long sendTimeout; - @Value("${server.ws.limits.max_sessions_per_tenant:0}") - private int maxSessionsPerTenant; - @Value("${server.ws.limits.max_sessions_per_customer:0}") - private int maxSessionsPerCustomer; - @Value("${server.ws.limits.max_sessions_per_regular_user:0}") - private int maxSessionsPerRegularUser; - @Value("${server.ws.limits.max_sessions_per_public_user:0}") - private int maxSessionsPerPublicUser; - @Value("${server.ws.limits.max_queue_per_ws_session:1000}") - private int maxMsgQueuePerSession; - - @Value("${server.ws.limits.max_updates_per_session:}") - private String perSessionUpdatesConfiguration; - @Value("${server.ws.ping_timeout:30000}") private long pingTimeout; @@ -144,10 +133,13 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr String internalSessionId = session.getId(); TelemetryWebSocketSessionRef sessionRef = toRef(session); String externalSessionId = sessionRef.getSessionId(); + if (!checkLimits(session, sessionRef)) { return; } - internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, maxMsgQueuePerSession)); + var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration(); + internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsMsgQueueLimitPerSession() > 0 ? + tenantProfileConfiguration.getWsMsgQueueLimitPerSession() : 500)); externalSessionMap.put(externalSessionId, internalSessionId); processInWebSocketService(sessionRef, SessionEvent.onEstablished()); @@ -323,8 +315,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr if (internalId != null) { SessionMetaData sessionMd = internalSessionMap.get(internalId); if (sessionMd != null) { - if (!StringUtils.isEmpty(perSessionUpdatesConfiguration)) { - TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(perSessionUpdatesConfiguration)); + var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration(); + if (StringUtils.isNotEmpty(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())) { + TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())); if (!rateLimits.tryConsume()) { if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) { log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached" @@ -336,6 +329,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr log.debug("[{}][{}][{}] Session is no longer blacklisted.", sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId); blacklistedSessions.remove(externalId); } + } else { + perSessionUpdateLimits.remove(sessionRef.getSessionId()); } sessionMd.sendMsg(msg); } else { @@ -380,11 +375,17 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } private boolean checkLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) throws Exception { + var tenantProfileConfiguration = + tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration(); + if (tenantProfileConfiguration == null) { + return true; + } + String sessionId = session.getId(); - if (maxSessionsPerTenant > 0) { + if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) { Set tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (tenantSessions) { - if (tenantSessions.size() < maxSessionsPerTenant) { + if (tenantSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerTenant()) { tenantSessions.add(sessionId); } else { log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached" @@ -396,10 +397,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } if (sessionRef.getSecurityCtx().isCustomerUser()) { - if (maxSessionsPerCustomer > 0) { + if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) { Set customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); synchronized (customerSessions) { - if (customerSessions.size() < maxSessionsPerCustomer) { + if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerCustomer()) { customerSessions.add(sessionId); } else { log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached" @@ -409,10 +410,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } } } - if (maxSessionsPerRegularUser > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0 + && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (regularUserSessions) { - if (regularUserSessions.size() < maxSessionsPerRegularUser) { + if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerRegularUser()) { regularUserSessions.add(sessionId); } else { log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached" @@ -422,10 +424,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } } } - if (maxSessionsPerPublicUser > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 + && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { - if (publicUserSessions.size() < maxSessionsPerPublicUser) { + if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerPublicUser()) { publicUserSessions.add(sessionId); } else { log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached" @@ -440,29 +443,31 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } private void cleanupLimits(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) { + var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration(); + String sessionId = session.getId(); perSessionUpdateLimits.remove(sessionRef.getSessionId()); blacklistedSessions.remove(sessionRef.getSessionId()); - if (maxSessionsPerTenant > 0) { + if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) { Set tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (tenantSessions) { tenantSessions.remove(sessionId); } } if (sessionRef.getSecurityCtx().isCustomerUser()) { - if (maxSessionsPerCustomer > 0) { + if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) { Set customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); synchronized (customerSessions) { customerSessions.remove(sessionId); } } - if (maxSessionsPerRegularUser > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (regularUserSessions) { regularUserSessions.remove(sessionId); } } - if (maxSessionsPerPublicUser > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { publicUserSessions.remove(sessionId); diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index e27f2e88c2..c6576097ee 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; @@ -56,12 +55,9 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.alarm.AlarmDao; -import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.model.sql.DeviceProfileEntity; -import org.thingsboard.server.dao.model.sql.RelationEntity; -import org.thingsboard.server.dao.oauth2.OAuth2Service; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; @@ -69,7 +65,6 @@ import org.thingsboard.server.dao.sql.device.DeviceProfileRepository; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; -import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.service.install.InstallScripts; import org.thingsboard.server.service.install.SystemDataLoaderService; import org.thingsboard.server.service.install.TbRuleEngineQueueConfigService; @@ -107,9 +102,6 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private TimeseriesService tsService; - @Autowired - private AlarmService alarmService; - @Autowired private EntityService entityService; @@ -120,7 +112,7 @@ public class DefaultDataUpdateService implements DataUpdateService { private DeviceProfileRepository deviceProfileRepository; @Autowired - private OAuth2Service oAuth2Service; + private RateLimitsUpdater rateLimitsUpdater; @Autowired private TenantProfileService tenantProfileService; @@ -162,8 +154,9 @@ public class DefaultDataUpdateService implements DataUpdateService { break; case "3.3.4": log.info("Updating data from version 3.3.4 to 3.4.0 ..."); - tenantsProfileQueueConfigurationUpdater.updateEntities(null); - checkPointRuleNodesUpdater.updateEntities(null); + rateLimitsUpdater.updateEntities(); + tenantsProfileQueueConfigurationUpdater.updateEntities(); + checkPointRuleNodesUpdater.updateEntities(); break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java b/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java index 914a136a4c..b2fb707791 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java @@ -49,6 +49,10 @@ public abstract class PaginatedUpdater { } } + public void updateEntities() { + updateEntities(null); + } + protected boolean forceReportTotal() { return false; } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/RateLimitsUpdater.java b/application/src/main/java/org/thingsboard/server/service/install/update/RateLimitsUpdater.java new file mode 100644 index 0000000000..054f68f71d --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/install/update/RateLimitsUpdater.java @@ -0,0 +1,115 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.install.update; + +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.tenant.TenantProfileService; + +@Component +class RateLimitsUpdater extends PaginatedUpdater { + + @Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_TENANT_ENABLED') ?: environment.getProperty('server.rest.limits.tenant.enabled') ?: 'false' }") + boolean tenantServerRestLimitsEnabled; + @Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_TENANT_CONFIGURATION') ?: environment.getProperty('server.rest.limits.tenant.configuration') ?: '100:1,2000:60' }") + String tenantServerRestLimitsConfiguration; + @Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_CUSTOMER_ENABLED') ?: environment.getProperty('server.rest.limits.customer.enabled') ?: 'false' }") + boolean customerServerRestLimitsEnabled; + @Value("#{ environment.getProperty('TB_SERVER_REST_LIMITS_CUSTOMER_CONFIGURATION') ?: environment.getProperty('server.rest.limits.customer.configuration') ?: '50:1,1000:60' }") + String customerServerRestLimitsConfiguration; + + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT') ?: environment.getProperty('server.ws.limits.max_sessions_per_tenant') ?: '0' }") + private int maxWsSessionsPerTenant; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER') ?: environment.getProperty('server.ws.limits.max_sessions_per_customer') ?: '0' }") + private int maxWsSessionsPerCustomer; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER') ?: environment.getProperty('server.ws.limits.max_sessions_per_regular_user') ?: '0' }") + private int maxWsSessionsPerRegularUser; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER') ?: environment.getProperty('server.ws.limits.max_sessions_per_public_user') ?: '0' }") + private int maxWsSessionsPerPublicUser; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION') ?: environment.getProperty('server.ws.limits.max_queue_per_ws_session') ?: '500' }") + private int wsMsgQueueLimitPerSession; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_tenant') ?: '0' }") + private long maxWsSubscriptionsPerTenant; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_customer') ?: '0' }") + private long maxWsSubscriptionsPerCustomer; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_regular_user') ?: '0' }") + private long maxWsSubscriptionsPerRegularUser; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER') ?: environment.getProperty('server.ws.limits.max_subscriptions_per_public_user') ?: '0' }") + private long maxWsSubscriptionsPerPublicUser; + @Value("#{ environment.getProperty('TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION') ?: environment.getProperty('server.ws.limits.max_updates_per_session') ?: '300:1,3000:60' }") + private String wsUpdatesPerSessionRateLimit; + + @Value("#{ environment.getProperty('CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED') ?: environment.getProperty('cassandra.query.tenant_rate_limits.enabled') ?: 'false' }") + private boolean cassandraQueryTenantRateLimitsEnabled; + @Value("#{ environment.getProperty('CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION') ?: environment.getProperty('cassandra.query.tenant_rate_limits.configuration') ?: '1000:1,30000:60' }") + private String cassandraQueryTenantRateLimitsConfiguration; + + @Autowired + private TenantProfileService tenantProfileService; + + @Override + protected boolean forceReportTotal() { + return true; + } + + @Override + protected String getName() { + return "Rate limits updater"; + } + + @Override + protected PageData findEntities(String id, PageLink pageLink) { + return tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, pageLink); + } + + @Override + protected void updateEntity(TenantProfile tenantProfile) { + var profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); + + if (tenantServerRestLimitsEnabled && StringUtils.isNotEmpty(tenantServerRestLimitsConfiguration)) { + profileConfiguration.setTenantServerRestLimitsConfiguration(tenantServerRestLimitsConfiguration); + } + if (customerServerRestLimitsEnabled && StringUtils.isNotEmpty(customerServerRestLimitsConfiguration)) { + profileConfiguration.setCustomerServerRestLimitsConfiguration(customerServerRestLimitsConfiguration); + } + + profileConfiguration.setMaxWsSessionsPerTenant(maxWsSessionsPerTenant); + profileConfiguration.setMaxWsSessionsPerCustomer(maxWsSessionsPerCustomer); + profileConfiguration.setMaxWsSessionsPerPublicUser(maxWsSessionsPerPublicUser); + profileConfiguration.setMaxWsSessionsPerRegularUser(maxWsSessionsPerRegularUser); + profileConfiguration.setMaxWsSubscriptionsPerTenant(maxWsSubscriptionsPerTenant); + profileConfiguration.setMaxWsSubscriptionsPerCustomer(maxWsSubscriptionsPerCustomer); + profileConfiguration.setMaxWsSubscriptionsPerPublicUser(maxWsSubscriptionsPerPublicUser); + profileConfiguration.setMaxWsSubscriptionsPerRegularUser(maxWsSubscriptionsPerRegularUser); + profileConfiguration.setWsMsgQueueLimitPerSession(wsMsgQueueLimitPerSession); + if (StringUtils.isNotEmpty(wsUpdatesPerSessionRateLimit)) { + profileConfiguration.setWsUpdatesPerSessionRateLimit(wsUpdatesPerSessionRateLimit); + } + + if (cassandraQueryTenantRateLimitsEnabled && StringUtils.isNotEmpty(cassandraQueryTenantRateLimitsConfiguration)) { + profileConfiguration.setCassandraQueryTenantRateLimitsConfiguration(cassandraQueryTenantRateLimitsConfiguration); + } + + tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, tenantProfile); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 5a50a2ea22..0551d7e114 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -42,7 +42,9 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.TenantRateLimitException; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; @@ -138,14 +140,8 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @Autowired private TbServiceInfoProvider serviceInfoProvider; - @Value("${server.ws.limits.max_subscriptions_per_tenant:0}") - private int maxSubscriptionsPerTenant; - @Value("${server.ws.limits.max_subscriptions_per_customer:0}") - private int maxSubscriptionsPerCustomer; - @Value("${server.ws.limits.max_subscriptions_per_regular_user:0}") - private int maxSubscriptionsPerRegularUser; - @Value("${server.ws.limits.max_subscriptions_per_public_user:0}") - private int maxSubscriptionsPerPublicUser; + @Autowired + private TbTenantProfileCache tenantProfileCache; @Value("${server.ws.ping_timeout:30000}") private long pingTimeout; @@ -320,44 +316,50 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } private void processSessionClose(TelemetryWebSocketSessionRef sessionRef) { - String sessionId = "[" + sessionRef.getSessionId() + "]"; - if (maxSubscriptionsPerTenant > 0) { - Set tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); - synchronized (tenantSubscriptions) { - tenantSubscriptions.removeIf(subId -> subId.startsWith(sessionId)); - } - } - if (sessionRef.getSecurityCtx().isCustomerUser()) { - if (maxSubscriptionsPerCustomer > 0) { - Set customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); - synchronized (customerSessions) { - customerSessions.removeIf(subId -> subId.startsWith(sessionId)); + var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration(); + if (tenantProfileConfiguration != null) { + String sessionId = "[" + sessionRef.getSessionId() + "]"; + + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) { + Set tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); + synchronized (tenantSubscriptions) { + tenantSubscriptions.removeIf(subId -> subId.startsWith(sessionId)); } } - if (maxSubscriptionsPerRegularUser > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { - Set regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); - synchronized (regularUserSessions) { - regularUserSessions.removeIf(subId -> subId.startsWith(sessionId)); + if (sessionRef.getSecurityCtx().isCustomerUser()) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) { + Set customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); + synchronized (customerSessions) { + customerSessions.removeIf(subId -> subId.startsWith(sessionId)); + } } - } - if (maxSubscriptionsPerPublicUser > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { - Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); - synchronized (publicUserSessions) { - publicUserSessions.removeIf(subId -> subId.startsWith(sessionId)); + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + Set regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); + synchronized (regularUserSessions) { + regularUserSessions.removeIf(subId -> subId.startsWith(sessionId)); + } + } + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); + synchronized (publicUserSessions) { + publicUserSessions.removeIf(subId -> subId.startsWith(sessionId)); + } } } } } private boolean processSubscription(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) { + var tenantProfileConfiguration = (DefaultTenantProfileConfiguration) tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultProfileConfiguration(); + String subId = "[" + sessionRef.getSessionId() + "]:[" + cmd.getCmdId() + "]"; try { - if (maxSubscriptionsPerTenant > 0) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) { Set tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (tenantSubscriptions) { if (cmd.isUnsubscribe()) { tenantSubscriptions.remove(subId); - } else if (tenantSubscriptions.size() < maxSubscriptionsPerTenant) { + } else if (tenantSubscriptions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant()) { tenantSubscriptions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max tenant subscriptions limit reached" @@ -369,12 +371,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } if (sessionRef.getSecurityCtx().isCustomerUser()) { - if (maxSubscriptionsPerCustomer > 0) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) { Set customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); synchronized (customerSessions) { if (cmd.isUnsubscribe()) { customerSessions.remove(subId); - } else if (customerSessions.size() < maxSubscriptionsPerCustomer) { + } else if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer()) { customerSessions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max customer subscriptions limit reached" @@ -384,10 +386,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } } } - if (maxSubscriptionsPerRegularUser > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (regularUserSessions) { - if (regularUserSessions.size() < maxSubscriptionsPerRegularUser) { + if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser()) { regularUserSessions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max regular user subscriptions limit reached" @@ -397,10 +399,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } } } - if (maxSubscriptionsPerPublicUser > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { - if (publicUserSessions.size() < maxSubscriptionsPerPublicUser) { + if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) { publicUserSessions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max public user subscriptions limit reached" diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 8f8833a598..f726f5fc78 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -58,18 +58,6 @@ server: send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}" # recommended timeout >= 30 seconds. Platform will attempt to send 'ping' request 3 times within the timeout ping_timeout: "${TB_SERVER_WS_PING_TIMEOUT:30000}" - limits: - # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation - max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}" - max_sessions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER:0}" - max_sessions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER:0}" - max_sessions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER:0}" - max_queue_per_ws_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION:500}" - max_subscriptions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT:0}" - max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}" - max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}" - max_subscriptions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER:0}" - max_updates_per_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION:300:1,3000:60}" dynamic_page_link: refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}" refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}" @@ -78,13 +66,6 @@ server: max_entities_per_data_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_DATA_SUBSCRIPTION:10000}" max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}" rest: - limits: - tenant: - enabled: "${TB_SERVER_REST_LIMITS_TENANT_ENABLED:false}" - configuration: "${TB_SERVER_REST_LIMITS_TENANT_CONFIGURATION:100:1,2000:60}" - customer: - enabled: "${TB_SERVER_REST_LIMITS_CUSTOMER_ENABLED:false}" - configuration: "${TB_SERVER_REST_LIMITS_CUSTOMER_CONFIGURATION:50:1,1000:60}" server_side_rpc: # Minimum value of the server side RPC timeout. May override value provided in the REST API call. # Since 2.5 migration to queues, the RPC delay depends on the size of the pending messages in the queue, @@ -255,8 +236,6 @@ cassandra: # log one of cassandra queries with specified frequency (0 - logging is disabled) print_queries_freq: "${CASSANDRA_QUERY_PRINT_FREQ:0}" tenant_rate_limits: - enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}" - configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}" print_tenant_names: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_PRINT_TENANT_NAMES:false}" # SQL configuration parameters diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java b/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java index e5cbf23179..4ebb903acb 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java @@ -85,7 +85,7 @@ public class TenantProfile extends SearchTextBased implements H @ApiModelProperty(position = 1, value = "JSON object with the tenant profile Id. " + "Specify this field to update the tenant profile. " + "Referencing non-existing tenant profile Id will cause error. " + - "Omit this field to create new tenant profile." ) + "Omit this field to create new tenant profile.") @Override public TenantProfileId getId() { return super.getId(); @@ -132,9 +132,15 @@ public class TenantProfile extends SearchTextBased implements H .map(profileConfiguration -> (DefaultTenantProfileConfiguration) profileConfiguration); } + @JsonIgnore + public DefaultTenantProfileConfiguration getDefaultProfileConfiguration() { + return getProfileConfiguration().orElse(null); + } + public TenantProfileData createDefaultTenantProfileData() { TenantProfileData tpd = new TenantProfileData(); tpd.setConfiguration(new DefaultTenantProfileConfiguration()); + this.profileData = tpd; return tpd; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index 9953f7a3aa..e7d6383540 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -57,6 +57,22 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private long maxSms; private long maxCreatedAlarms; + private String tenantServerRestLimitsConfiguration; + private String customerServerRestLimitsConfiguration; + + private int maxWsSessionsPerTenant; + private int maxWsSessionsPerCustomer; + private int maxWsSessionsPerRegularUser; + private int maxWsSessionsPerPublicUser; + private int wsMsgQueueLimitPerSession; + private long maxWsSubscriptionsPerTenant; + private long maxWsSubscriptionsPerCustomer; + private long maxWsSubscriptionsPerRegularUser; + private long maxWsSubscriptionsPerPublicUser; + private String wsUpdatesPerSessionRateLimit; + + private String cassandraQueryTenantRateLimitsConfiguration; + private int defaultStorageTtlDays; private int alarmsTtlDays; private int rpcTtlDays; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java index ce0213dad2..c38ed85dfd 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java @@ -20,6 +20,7 @@ import io.github.bucket4j.Bucket4j; import io.github.bucket4j.Refill; import io.github.bucket4j.local.LocalBucket; import io.github.bucket4j.local.LocalBucketBuilder; +import lombok.Getter; import java.time.Duration; @@ -28,6 +29,8 @@ import java.time.Duration; */ public class TbRateLimits { private final LocalBucket bucket; + + @Getter private final String configuration; public TbRateLimits(String limitsConfiguration) { @@ -60,8 +63,4 @@ public class TbRateLimits { return bucket.tryConsume(number); } - public String getConfiguration() { - return configuration; - } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java index da440cb06d..2fa019e663 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java @@ -22,8 +22,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.stats.DefaultCounter; +import org.thingsboard.server.common.stats.StatsCounter; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.entity.EntityService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AsyncTaskContext; import org.thingsboard.server.dao.util.NoSqlAnyDao; @@ -47,14 +51,13 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads, @Value("${cassandra.query.callback_threads:4}") int callbackThreads, @Value("${cassandra.query.poll_ms:50}") long pollMs, - @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, - @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames, @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq, @Autowired StatsFactory statsFactory, - @Autowired EntityService entityService) { - super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory, - entityService, printTenantNames); + @Autowired EntityService entityService, + @Autowired TbTenantProfileCache tenantProfileCache) { + super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, + entityService, tenantProfileCache, printTenantNames); } @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java index b6a522c931..366df9ae3d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java @@ -24,6 +24,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.entity.EntityService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AsyncTaskContext; import org.thingsboard.server.dao.util.NoSqlAnyDao; @@ -47,14 +48,13 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads, @Value("${cassandra.query.callback_threads:4}") int callbackThreads, @Value("${cassandra.query.poll_ms:50}") long pollMs, - @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, - @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames, @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq, @Autowired StatsFactory statsFactory, - @Autowired EntityService entityService) { - super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory, - entityService, printTenantNames); + @Autowired EntityService entityService, + @Autowired TbTenantProfileCache tenantProfileCache) { + super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, + entityService, tenantProfileCache, printTenantNames); } @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") diff --git a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java index e53806bb0e..096ec2ce26 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java @@ -153,9 +153,9 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe public void deleteTenant(TenantId tenantId) { log.trace("Executing deleteTenant [{}]", tenantId); Validator.validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + entityViewService.deleteEntityViewsByTenantId(tenantId); customerService.deleteCustomersByTenantId(tenantId); widgetsBundleService.deleteWidgetsBundlesByTenantId(tenantId); - entityViewService.deleteEntityViewsByTenantId(tenantId); assetService.deleteAssetsByTenantId(tenantId); deviceService.deleteDevicesByTenantId(tenantId); deviceProfileService.deleteDeviceProfilesByTenantId(tenantId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index 7952f8e22d..2850349470 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.common.stats.DefaultCounter; @@ -38,6 +39,7 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.nosql.CassandraStatementTask; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import javax.annotation.Nullable; import java.util.HashMap; @@ -71,8 +73,6 @@ public abstract class AbstractBufferedRateExecutor perTenantLimits = new ConcurrentHashMap<>(); private final AtomicInteger printQueriesIdx = new AtomicInteger(0); @@ -80,15 +80,15 @@ public abstract class AbstractBufferedRateExecutor tenantNamesCache = new HashMap<>(); + private final TbTenantProfileCache tenantProfileCache; private final boolean printTenantNames; + private final Map tenantNamesCache = new HashMap<>(); - public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs, - boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq, StatsFactory statsFactory, - EntityService entityService, boolean printTenantNames) { + public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, + int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory, + EntityService entityService, TbTenantProfileCache tenantProfileCache, boolean printTenantNames) { this.maxWaitTime = maxWaitTime; this.pollMs = pollMs; this.concurrencyLimit = concurrencyLimit; @@ -97,13 +97,12 @@ public abstract class AbstractBufferedRateExecutor settableFuture = create(); F result = wrap(task, settableFuture); + boolean perTenantLimitReached = false; - if (perTenantLimitsEnabled) { + var tenantProfileConfiguration = tenantProfileCache.get(task.getTenantId()).getDefaultProfileConfiguration(); + if (StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) { if (task.getTenantId() == null) { log.info("Invalid task received: {}", task); } else if (!task.getTenantId().isNullUid()) { - TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(task.getTenantId(), id -> new TbRateLimits(perTenantLimitsConfiguration)); + TbRateLimits rateLimits = perTenantLimits.computeIfAbsent( + task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration()) + ); if (!rateLimits.tryConsume()) { stats.incrementRateLimitedTenant(task.getTenantId()); stats.getTotalRateLimited().increment(); @@ -128,7 +131,10 @@ public abstract class AbstractBufferedRateExecutor + tenant-profile.transport-tenant-msg-rate-limit + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + tenant-profile.transport-tenant-telemetry-msg-rate-limit + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + tenant-profile.transport-tenant-telemetry-data-points-rate-limit + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + tenant-profile.transport-device-msg-rate-limit + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + tenant-profile.transport-device-telemetry-msg-rate-limit @@ -279,6 +292,104 @@ tenant-profile.transport-device-telemetry-data-points-rate-limit + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + + + + tenant-profile.tenant-rest-limits + + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + + + + tenant-profile.customer-rest-limits + + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + + + + tenant-profile.ws-limit-max-sessions-per-tenant + + + {{ 'tenant-profile.too-small-value-zero' | translate}} + + + + tenant-profile.ws-limit-max-sessions-per-customer + + + {{ 'tenant-profile.too-small-value-zero' | translate}} + + + + tenant-profile.ws-limit-max-sessions-per-public-user + + + {{ 'tenant-profile.too-small-value-zero' | translate}} + + + + tenant-profile.ws-limit-max-sessions-per-public-user + + + {{ 'tenant-profile.too-small-value-zero' | translate}} + + + + tenant-profile.ws-limit-queue-per-session + + + {{ 'tenant-profile.too-small-value-one' | translate}} + + + + tenant-profile.ws-limit-max-subscriptions-per-tenant + + + {{ 'tenant-profile.too-small-value-zero' | translate}} + + + + tenant-profile.ws-limit-max-subscriptions-per-customer + + + {{ 'tenant-profile.too-small-value-zero' | translate}} + + + + tenant-profile.ws-limit-max-subscriptions-per-regular-user + + + {{ 'tenant-profile.too-small-value-zero' | translate}} + + + + tenant-profile.ws-limit-max-subscriptions-per-public-user + + + {{ 'tenant-profile.too-small-value-zero' | translate}} + + + + tenant-profile.ws-limit-updates-per-session + + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + + + + tenant-profile.cassandra-tenant-limits-configuration + + + {{ 'tenant-profile.incorrect-pattern-for-rate-limits' | translate}} + tenant-profile.tenant-entity-export-rate-limit diff --git a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts index 6f79c34279..2f922faaac 100644 --- a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts +++ b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts @@ -35,6 +35,7 @@ import { isDefinedAndNotNull } from '@core/utils'; export class DefaultTenantProfileConfigurationComponent implements ControlValueAccessor, OnInit { defaultTenantProfileConfigurationFormGroup: FormGroup; + rateLimitsPattern = '^(((\\d+):(\\d+)),)*((\\d+):(\\d+))$'; private requiredValue: boolean; get required(): boolean { @@ -61,12 +62,12 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA maxRuleChains: [null, [Validators.required, Validators.min(0)]], maxResourcesInBytes: [null, [Validators.required, Validators.min(0)]], maxOtaPackagesInBytes: [null, [Validators.required, Validators.min(0)]], - transportTenantMsgRateLimit: [null, []], - transportTenantTelemetryMsgRateLimit: [null, []], - transportTenantTelemetryDataPointsRateLimit: [null, []], - transportDeviceMsgRateLimit: [null, []], - transportDeviceTelemetryMsgRateLimit: [null, []], - transportDeviceTelemetryDataPointsRateLimit: [null, []], + transportTenantMsgRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]], + transportTenantTelemetryMsgRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]], + transportTenantTelemetryDataPointsRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]], + transportDeviceMsgRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]], + transportDeviceTelemetryMsgRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]], + transportDeviceTelemetryDataPointsRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]], tenantEntityExportRateLimit: [null, []], tenantEntityImportRateLimit: [null, []], maxTransportMessages: [null, [Validators.required, Validators.min(0)]], @@ -80,7 +81,20 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA maxCreatedAlarms: [null, [Validators.required, Validators.min(0)]], defaultStorageTtlDays: [null, [Validators.required, Validators.min(0)]], alarmsTtlDays: [null, [Validators.required, Validators.min(0)]], - rpcTtlDays: [null, [Validators.required, Validators.min(0)]] + rpcTtlDays: [null, [Validators.required, Validators.min(0)]], + tenantServerRestLimitsConfiguration: [null, [Validators.pattern(this.rateLimitsPattern)]], + customerServerRestLimitsConfiguration: [null, [Validators.pattern(this.rateLimitsPattern)]], + maxWsSessionsPerTenant: [null, [Validators.min(0)]], + maxWsSessionsPerCustomer: [null, [Validators.min(0)]], + maxWsSessionsPerRegularUser: [null, [Validators.min(0)]], + maxWsSessionsPerPublicUser: [null, [Validators.min(0)]], + wsMsgQueueLimitPerSession: [null, [Validators.min(0)]], + maxWsSubscriptionsPerTenant: [null, [Validators.min(0)]], + maxWsSubscriptionsPerCustomer: [null, [Validators.min(0)]], + maxWsSubscriptionsPerRegularUser: [null, [Validators.min(0)]], + maxWsSubscriptionsPerPublicUser: [null, [Validators.min(0)]], + wsUpdatesPerSessionRateLimit: [null, [Validators.pattern(this.rateLimitsPattern)]], + cassandraQueryTenantRateLimitsConfiguration: [null, [Validators.pattern(this.rateLimitsPattern)]] }); this.defaultTenantProfileConfigurationFormGroup.valueChanges.subscribe(() => { this.updateModel(); diff --git a/ui-ngx/src/app/shared/models/tenant.model.ts b/ui-ngx/src/app/shared/models/tenant.model.ts index ea1ff3fdc5..70bbb1dc28 100644 --- a/ui-ngx/src/app/shared/models/tenant.model.ts +++ b/ui-ngx/src/app/shared/models/tenant.model.ts @@ -54,6 +54,22 @@ export interface DefaultTenantProfileConfiguration { maxSms: number; maxCreatedAlarms: number; + tenantServerRestLimitsConfiguration: string; + customerServerRestLimitsConfiguration: string; + + maxWsSessionsPerTenant: number; + maxWsSessionsPerCustomer: number; + maxWsSessionsPerRegularUser: number; + maxWsSessionsPerPublicUser: number; + wsMsgQueueLimitPerSession: number; + maxWsSubscriptionsPerTenant: number; + maxWsSubscriptionsPerCustomer: number; + maxWsSubscriptionsPerRegularUser: number; + maxWsSubscriptionsPerPublicUser: number; + wsUpdatesPerSessionRateLimit: string; + + cassandraQueryTenantRateLimitsConfiguration: string; + defaultStorageTtlDays: number; alarmsTtlDays: number; rpcTtlDays: number; @@ -88,9 +104,22 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan maxEmails: 0, maxSms: 0, maxCreatedAlarms: 0, + tenantServerRestLimitsConfiguration: '', + customerServerRestLimitsConfiguration: '', + maxWsSessionsPerTenant: 0, + maxWsSessionsPerCustomer: 0, + maxWsSessionsPerRegularUser: 0, + maxWsSessionsPerPublicUser: 0, + wsMsgQueueLimitPerSession: 0, + maxWsSubscriptionsPerTenant: 0, + maxWsSubscriptionsPerCustomer: 0, + maxWsSubscriptionsPerRegularUser: 0, + maxWsSubscriptionsPerPublicUser: 0, + wsUpdatesPerSessionRateLimit: '', + cassandraQueryTenantRateLimitsConfiguration: '', defaultStorageTtlDays: 0, alarmsTtlDays: 0, - rpcTtlDays: 0 + rpcTtlDays: 0, }; configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT}; break; diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index 9bea197853..0ab99ccd79 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -3140,7 +3140,22 @@ "max-created-alarms-range": "Maximum number of alarms created can't be negative", "no-queue": "No Queue configured", "add-queue": "Add Queue", - "queues-with-count": "Queues ({{count}})" + "queues-with-count": "Queues ({{count}})", + "tenant-rest-limits": "Rate limit for REST requests for tenant", + "customer-rest-limits": "Rate limit for REST requests for customer", + "incorrect-pattern-for-rate-limits": "The format is comma separated pairs of capacity and period (in seconds) with a colon between, e.g. 100:1,2000:60", + "too-small-value-zero": "The value must be bigger than 0", + "too-small-value-one": "The value must be bigger than 1", + "cassandra-tenant-limits-configuration": "Cassandra query rate limit for tenant", + "ws-limit-max-sessions-per-tenant": "Maximum number of WS sessions per tenant", + "ws-limit-max-sessions-per-customer": "Maximum number of WS sessions per customer", + "ws-limit-max-sessions-per-public-user": "Maximum number of WS sessions per public user", + "ws-limit-queue-per-session": "Maximum size of WS message queue per session", + "ws-limit-max-subscriptions-per-tenant": "Maximum number of WS subscriptions per tenant", + "ws-limit-max-subscriptions-per-customer": "Maximum number of WS subscriptions per customer", + "ws-limit-max-subscriptions-per-regular-user": "Maximum number of WS subscriptions per regular user", + "ws-limit-max-subscriptions-per-public-user": "Maximum number of WS subscriptions per public user", + "ws-limit-updates-per-session": "Rate limit for WS updates per session" }, "timeinterval": { "seconds-interval": "{ seconds, plural, 1 {1 second} other {# seconds} }",