diff --git a/application/src/main/java/org/thingsboard/server/config/RateLimitProcessingFilter.java b/application/src/main/java/org/thingsboard/server/config/RateLimitProcessingFilter.java index 75d15b18f2..7163ba3feb 100644 --- a/application/src/main/java/org/thingsboard/server/config/RateLimitProcessingFilter.java +++ b/application/src/main/java/org/thingsboard/server/config/RateLimitProcessingFilter.java @@ -48,22 +48,20 @@ public class RateLimitProcessingFilter extends GenericFilterBean { @Autowired private TbTenantProfileCache tenantProfileCache; - private ConcurrentMap perTenantLimits = new ConcurrentHashMap<>(); - private ConcurrentMap perCustomerLimits = new ConcurrentHashMap<>(); + private final ConcurrentMap perTenantLimits = new ConcurrentHashMap<>(); + private final ConcurrentMap perCustomerLimits = new ConcurrentHashMap<>(); @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { SecurityUser user = getCurrentUser(); if (user != null && !user.isSystemAdmin()) { - var profileConfiguration = tenantProfileCache.get(user.getTenantId()).getDefaultTenantProfileConfiguration(); - - if(profileConfiguration != null) { - if (user.isTenantAdmin() && StringUtils.isNotEmpty(profileConfiguration.getRateLimitsTenantConfiguration())) { - + if (profileConfiguration != null) { + if (user.isTenantAdmin() && StringUtils.isNotEmpty(profileConfiguration.getTenantServerRestLimitsConfiguration())) { TbRateLimits rateLimits = perTenantLimits.get(user.getTenantId()); - if(rateLimits == null || !rateLimits.getCurrentConfig().equals(profileConfiguration.getRateLimitsTenantConfiguration())) { - rateLimits = new TbRateLimits(profileConfiguration.getRateLimitsTenantConfiguration()); + if (rateLimits == null || !rateLimits.getConfiguration().equals(profileConfiguration.getTenantServerRestLimitsConfiguration())) { + // fixme: or maybe handle component lifecycle event ? + rateLimits = new TbRateLimits(profileConfiguration.getTenantServerRestLimitsConfiguration()); perTenantLimits.put(user.getTenantId(), rateLimits); } @@ -71,12 +69,10 @@ public class RateLimitProcessingFilter extends GenericFilterBean { errorResponseHandler.handle(new TbRateLimitsException(EntityType.TENANT), (HttpServletResponse) response); return; } - } - if (user.isCustomerUser() && StringUtils.isNotEmpty(profileConfiguration.getRateLimitsCustomerConfiguration())) { - + } else if (user.isCustomerUser() && StringUtils.isNotEmpty(profileConfiguration.getCustomerServerRestLimitsConfiguration())) { TbRateLimits rateLimits = perCustomerLimits.get(user.getCustomerId()); - if(rateLimits == null || !rateLimits.getCurrentConfig().equals(profileConfiguration.getRateLimitsCustomerConfiguration())) { - rateLimits = new TbRateLimits(profileConfiguration.getRateLimitsCustomerConfiguration()); + if (rateLimits == null || !rateLimits.getConfiguration().equals(profileConfiguration.getCustomerServerRestLimitsConfiguration())) { + rateLimits = new TbRateLimits(profileConfiguration.getCustomerServerRestLimitsConfiguration()); perCustomerLimits.put(user.getCustomerId(), rateLimits); } diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index c19cef0c89..beae6ca8ad 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -16,12 +16,12 @@ package org.thingsboard.server.controller.plugin; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.BeanCreationNotAllowedException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.security.core.Authentication; import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PongMessage; import org.springframework.web.socket.TextMessage; @@ -139,8 +139,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr return; } var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration(); - - internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsLimitQueuePerWsSession())); + internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsMsgQueueLimitPerSession())); externalSessionMap.put(externalSessionId, internalSessionId); processInWebSocketService(sessionRef, SessionEvent.onEstablished()); @@ -298,14 +297,13 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr String externalId = sessionRef.getSessionId(); log.debug("[{}] Processing {}", externalId, msg); String internalId = externalSessionMap.get(externalId); - - var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration(); - if (internalId != null) { SessionMetaData sessionMd = internalSessionMap.get(internalId); if (sessionMd != null) { - if (!StringUtils.isEmpty(tenantProfileConfiguration.getWsLimitUpdatesPerSession())) { - TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsLimitUpdatesPerSession())); + var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration(); + if (StringUtils.isNotEmpty(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())) { + // fixme: is this ok not to update rate limits config if it was change in profile? + TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())); if (!rateLimits.tryConsume()) { if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) { log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached" @@ -364,15 +362,15 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration(); - if(tenantProfileConfiguration == null) { + if (tenantProfileConfiguration == null) { return true; } String sessionId = session.getId(); - if (tenantProfileConfiguration.getWsLimitMaxSessionsPerTenant() > 0) { + if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) { Set tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (tenantSessions) { - if (tenantSessions.size() < tenantProfileConfiguration.getWsLimitMaxSessionsPerTenant()) { + if (tenantSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerTenant()) { tenantSessions.add(sessionId); } else { log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached" @@ -384,10 +382,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } if (sessionRef.getSecurityCtx().isCustomerUser()) { - if (tenantProfileConfiguration.getWsLimitMaxSessionsPerCustomer() > 0) { + if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) { Set customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); synchronized (customerSessions) { - if (customerSessions.size() < tenantProfileConfiguration.getWsLimitMaxSessionsPerCustomer()) { + if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerCustomer()) { customerSessions.add(sessionId); } else { log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached" @@ -397,11 +395,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } } } - if (tenantProfileConfiguration.getWsLimitMaxSessionsPerRegularUser() > 0 + if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (regularUserSessions) { - if (regularUserSessions.size() < tenantProfileConfiguration.getWsLimitMaxSessionsPerRegularUser()) { + if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerRegularUser()) { regularUserSessions.add(sessionId); } else { log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached" @@ -411,11 +409,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr } } } - if (tenantProfileConfiguration.getWsLimitMaxSessionsPerPublicUser() > 0 + if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { - if (publicUserSessions.size() < tenantProfileConfiguration.getWsLimitMaxSessionsPerPublicUser()) { + if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerPublicUser()) { publicUserSessions.add(sessionId); } else { log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached" @@ -435,26 +433,26 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr String sessionId = session.getId(); perSessionUpdateLimits.remove(sessionRef.getSessionId()); blacklistedSessions.remove(sessionRef.getSessionId()); - if (tenantProfileConfiguration.getWsLimitMaxSessionsPerTenant() > 0) { + if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) { Set tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (tenantSessions) { tenantSessions.remove(sessionId); } } if (sessionRef.getSecurityCtx().isCustomerUser()) { - if (tenantProfileConfiguration.getWsLimitMaxSessionsPerCustomer() > 0) { + if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) { Set customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); synchronized (customerSessions) { customerSessions.remove(sessionId); } } - if (tenantProfileConfiguration.getWsLimitMaxSessionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (regularUserSessions) { regularUserSessions.remove(sessionId); } } - if (tenantProfileConfiguration.getWsLimitMaxSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { publicUserSessions.remove(sessionId); diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 02bc87aaf9..d8e84458bf 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -216,6 +216,9 @@ public class ThingsboardInstallService { dataUpdateService.updateData("3.3.2"); log.info("Updating system data..."); systemDataLoaderService.updateSystemWidgets(); + case "3.3.3": + log.info("Upgrading ThingsBoard from version 3.3.3 to 3.4 ..."); + dataUpdateService.updateData("3.3.3"); break; //TODO update CacheCleanupService on the next version upgrade diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 7652114166..d23a8eecc4 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -25,7 +25,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.flow.TbRuleChainInputNode; import org.thingsboard.rule.engine.flow.TbRuleChainInputNodeConfiguration; import org.thingsboard.rule.engine.profile.TbDeviceProfileNode; @@ -56,12 +55,9 @@ import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.alarm.AlarmDao; -import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.model.sql.DeviceProfileEntity; -import org.thingsboard.server.dao.model.sql.RelationEntity; -import org.thingsboard.server.dao.oauth2.OAuth2Service; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.sql.device.DeviceProfileRepository; @@ -101,9 +97,6 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private TimeseriesService tsService; - @Autowired - private AlarmService alarmService; - @Autowired private EntityService entityService; @@ -113,9 +106,6 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private DeviceProfileRepository deviceProfileRepository; - @Autowired - private OAuth2Service oAuth2Service; - @Autowired private RateLimitsUpdater rateLimitsUpdater; @@ -140,12 +130,15 @@ public class DefaultDataUpdateService implements DataUpdateService { tenantsAlarmsCustomerUpdater.updateEntities(null); deviceProfileEntityDynamicConditionsUpdater.updateEntities(null); updateOAuth2Params(); - rateLimitsUpdater.updateEntities(null); break; case "3.3.2": log.info("Updating data from version 3.3.2 to 3.3.3 ..."); updateNestedRuleChains(); break; + case "3.3.3": + log.info("Updating data from version 3.3.3 to 3.4 ..."); + rateLimitsUpdater.updateEntities(); + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java b/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java index c10d1b4bd7..1ec06a4413 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java @@ -50,6 +50,10 @@ public abstract class PaginatedUpdater { } } + public void updateEntities() { + updateEntities(null); + } + protected boolean forceReportTotal() { return false; } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/RateLimitsUpdater.java b/application/src/main/java/org/thingsboard/server/service/install/update/RateLimitsUpdater.java index e333641da3..f66459c006 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/RateLimitsUpdater.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/RateLimitsUpdater.java @@ -15,53 +15,53 @@ */ package org.thingsboard.server.service.install.update; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.tenant.TenantProfileService; @Component class RateLimitsUpdater extends PaginatedUpdater { + @Value("${server.rest.limits.tenant.enabled}") - boolean tenantServerRateLimitsEnabled; - @Value("${server.rest.limits.customer.enabled}") - boolean customerServerRateLimitsEnabled; + boolean tenantServerRestLimitsEnabled; @Value("${server.rest.limits.tenant.configuration}") - String tenantServerRateLimitsConfiguration; + String tenantServerRestLimitsConfiguration; + @Value("${server.rest.limits.customer.enabled}") + boolean customerServerRestLimitsEnabled; @Value("${server.rest.limits.customer.configuration}") - String customerServerRateLimitsConfiguration; + String customerServerRestLimitsConfiguration; @Value("${server.ws.limits.max_sessions_per_tenant}") - private int wsLimitMaxSessionsPerTenant; + private int maxWsSessionsPerTenant; @Value("${server.ws.limits.max_sessions_per_customer}") - private int wsLimitMaxSessionsPerCustomer; + private int maxWsSessionsPerCustomer; @Value("${server.ws.limits.max_sessions_per_regular_user}") - private int wsLimitMaxSessionsPerRegularUser; + private int maxWsSessionsPerRegularUser; @Value("${server.ws.limits.max_sessions_per_public_user}") - private int wsLimitMaxSessionsPerPublicUser; + private int maxWsSessionsPerPublicUser; @Value("${server.ws.limits.max_queue_per_ws_session}") - private int wsLimitQueuePerWsSession; + private int wsMsgQueueLimitPerSession; @Value("${server.ws.limits.max_subscriptions_per_tenant}") - private long wsLimitMaxSubscriptionsPerTenant; + private long maxWsSubscriptionsPerTenant; @Value("${server.ws.limits.max_subscriptions_per_customer}") - private long wsLimitMaxSubscriptionsPerCustomer; + private long maxWsSubscriptionsPerCustomer; @Value("${server.ws.limits.max_subscriptions_per_regular_user}") - private long wsLimitMaxSubscriptionsPerRegularUser; + private long maxWsSubscriptionsPerRegularUser; @Value("${server.ws.limits.max_subscriptions_per_public_user}") - private long wsLimitMaxSubscriptionsPerPublicUser; + private long maxWsSubscriptionsPerPublicUser; @Value("${server.ws.limits.max_updates_per_session}") - private String wsLimitUpdatesPerSession; + private String wsUpdatesPerSessionRateLimit; @Value("${cassandra.query.tenant_rate_limits.enabled}") - private boolean cassandraLimitsIsEnabled; + private boolean cassandraQueryTenantRateLimitsEnabled; @Value("${cassandra.query.tenant_rate_limits.configuration}") - private String cassandraTenantLimitsConfiguration; - @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") - private boolean printTenantNames; + private String cassandraQueryTenantRateLimitsConfiguration; @Autowired private TenantProfileService tenantProfileService; @@ -78,40 +78,38 @@ class RateLimitsUpdater extends PaginatedUpdater { @Override protected PageData findEntities(String id, PageLink pageLink) { - return tenantProfileService.findTenantProfiles(null, pageLink); + return tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, pageLink); } @Override - protected void updateEntity(TenantProfile entity) { - var profileConfiguration = entity.getDefaultTenantProfileConfiguration(); - if (profileConfiguration != null) { - if (tenantServerRateLimitsEnabled && StringUtils.isNotEmpty(tenantServerRateLimitsConfiguration)) { - profileConfiguration.setRateLimitsTenantConfiguration(tenantServerRateLimitsConfiguration); - } - if (customerServerRateLimitsEnabled && StringUtils.isNotEmpty(customerServerRateLimitsConfiguration)) { - profileConfiguration.setRateLimitsCustomerConfiguration(customerServerRateLimitsConfiguration); - } + protected void updateEntity(TenantProfile tenantProfile) { + var profileConfiguration = tenantProfile.getDefaultTenantProfileConfiguration(); - profileConfiguration.setWsLimitMaxSessionsPerTenant(wsLimitMaxSessionsPerTenant); - profileConfiguration.setWsLimitMaxSessionsPerCustomer(wsLimitMaxSessionsPerCustomer); - profileConfiguration.setWsLimitMaxSessionsPerPublicUser(wsLimitMaxSessionsPerPublicUser); - profileConfiguration.setWsLimitMaxSessionsPerRegularUser(wsLimitMaxSessionsPerRegularUser); - profileConfiguration.setWsLimitMaxSubscriptionsPerTenant(wsLimitMaxSubscriptionsPerTenant); - profileConfiguration.setWsLimitMaxSubscriptionsPerCustomer(wsLimitMaxSubscriptionsPerCustomer); - profileConfiguration.setWsLimitMaxSubscriptionsPerPublicUser(wsLimitMaxSubscriptionsPerPublicUser); - profileConfiguration.setWsLimitMaxSubscriptionsPerRegularUser(wsLimitMaxSubscriptionsPerRegularUser); - profileConfiguration.setWsLimitQueuePerWsSession(wsLimitQueuePerWsSession); - - if (StringUtils.isNotEmpty(wsLimitUpdatesPerSession)) { - profileConfiguration.setWsLimitUpdatesPerSession(wsLimitUpdatesPerSession); - } - - if (cassandraLimitsIsEnabled) { - profileConfiguration.setCassandraTenantLimitsConfiguration(cassandraTenantLimitsConfiguration); - profileConfiguration.setPrintTenantNames(printTenantNames); - } - - tenantProfileService.saveTenantProfile(null, entity); + if (tenantServerRestLimitsEnabled && StringUtils.isNotEmpty(tenantServerRestLimitsConfiguration)) { + profileConfiguration.setTenantServerRestLimitsConfiguration(tenantServerRestLimitsConfiguration); } + if (customerServerRestLimitsEnabled && StringUtils.isNotEmpty(customerServerRestLimitsConfiguration)) { + profileConfiguration.setCustomerServerRestLimitsConfiguration(customerServerRestLimitsConfiguration); + } + + profileConfiguration.setMaxWsSessionsPerTenant(maxWsSessionsPerTenant); + profileConfiguration.setMaxWsSessionsPerCustomer(maxWsSessionsPerCustomer); + profileConfiguration.setMaxWsSessionsPerPublicUser(maxWsSessionsPerPublicUser); + profileConfiguration.setMaxWsSessionsPerRegularUser(maxWsSessionsPerRegularUser); + profileConfiguration.setMaxWsSubscriptionsPerTenant(maxWsSubscriptionsPerTenant); + profileConfiguration.setMaxWsSubscriptionsPerCustomer(maxWsSubscriptionsPerCustomer); + profileConfiguration.setMaxWsSubscriptionsPerPublicUser(maxWsSubscriptionsPerPublicUser); + profileConfiguration.setMaxWsSubscriptionsPerRegularUser(maxWsSubscriptionsPerRegularUser); + profileConfiguration.setWsMsgQueueLimitPerSession(wsMsgQueueLimitPerSession); + if (StringUtils.isNotEmpty(wsUpdatesPerSessionRateLimit)) { + profileConfiguration.setWsUpdatesPerSessionRateLimit(wsUpdatesPerSessionRateLimit); + } + + if (cassandraQueryTenantRateLimitsEnabled && StringUtils.isNotEmpty(cassandraQueryTenantRateLimitsConfiguration)) { + profileConfiguration.setCassandraQueryTenantRateLimitsConfiguration(cassandraQueryTenantRateLimitsConfiguration); + } + + tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, tenantProfile); } + } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 81f945e620..b377eac8aa 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.web.socket.CloseStatus; @@ -304,29 +305,29 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi private void processSessionClose(TelemetryWebSocketSessionRef sessionRef) { var tenantProfileConfiguration = (DefaultTenantProfileConfiguration) tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration(); - if(tenantProfileConfiguration != null) { + if (tenantProfileConfiguration != null) { String sessionId = "[" + sessionRef.getSessionId() + "]"; - if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerTenant() > 0) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) { Set tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (tenantSubscriptions) { tenantSubscriptions.removeIf(subId -> subId.startsWith(sessionId)); } } if (sessionRef.getSecurityCtx().isCustomerUser()) { - if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerCustomer() > 0) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) { Set customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); synchronized (customerSessions) { customerSessions.removeIf(subId -> subId.startsWith(sessionId)); } } - if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (regularUserSessions) { regularUserSessions.removeIf(subId -> subId.startsWith(sessionId)); } } - if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { publicUserSessions.removeIf(subId -> subId.startsWith(sessionId)); @@ -341,12 +342,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi String subId = "[" + sessionRef.getSessionId() + "]:[" + cmd.getCmdId() + "]"; try { - if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerTenant() > 0) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) { Set tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet()); synchronized (tenantSubscriptions) { if (cmd.isUnsubscribe()) { tenantSubscriptions.remove(subId); - } else if (tenantSubscriptions.size() < tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerTenant()) { + } else if (tenantSubscriptions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant()) { tenantSubscriptions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max tenant subscriptions limit reached" @@ -358,12 +359,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } if (sessionRef.getSecurityCtx().isCustomerUser()) { - if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerCustomer() > 0) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) { Set customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet()); synchronized (customerSessions) { if (cmd.isUnsubscribe()) { customerSessions.remove(subId); - } else if (customerSessions.size() < tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerCustomer()) { + } else if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer()) { customerSessions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max customer subscriptions limit reached" @@ -373,10 +374,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } } } - if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (regularUserSessions) { - if (regularUserSessions.size() < tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerRegularUser()) { + if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser()) { regularUserSessions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max regular user subscriptions limit reached" @@ -386,10 +387,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } } } - if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { + if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) { Set publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet()); synchronized (publicUserSessions) { - if (publicUserSessions.size() < tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerPublicUser()) { + if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) { publicUserSessions.add(subId); } else { log.info("[{}][{}][{}] Failed to start subscription. Max public user subscriptions limit reached" diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java b/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java index ae75512625..3d6669e870 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/TenantProfile.java @@ -83,7 +83,7 @@ public class TenantProfile extends SearchTextBased implements H @ApiModelProperty(position = 1, value = "JSON object with the tenant profile Id. " + "Specify this field to update the tenant profile. " + "Referencing non-existing tenant profile Id will cause error. " + - "Omit this field to create new tenant profile." ) + "Omit this field to create new tenant profile.") @Override public TenantProfileId getId() { return super.getId(); @@ -133,6 +133,7 @@ public class TenantProfile extends SearchTextBased implements H public TenantProfileData createDefaultTenantProfileData() { TenantProfileData tpd = new TenantProfileData(); tpd.setConfiguration(new DefaultTenantProfileConfiguration()); + this.profileData = tpd; return tpd; } @@ -147,11 +148,7 @@ public class TenantProfile extends SearchTextBased implements H @JsonIgnore public DefaultTenantProfileConfiguration getDefaultTenantProfileConfiguration() { - if(getProfileData().getConfiguration() != null && - getProfileData().getConfiguration().getType().equals(TenantProfileType.DEFAULT)) { - return (DefaultTenantProfileConfiguration) this.profileData.getConfiguration(); - } - return null; + return getProfileConfiguration().orElse(null); } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index bcc5681e87..e1b97d7f5a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -46,22 +46,21 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private String transportDeviceTelemetryMsgRateLimit; private String transportDeviceTelemetryDataPointsRateLimit; - private String rateLimitsTenantConfiguration; - private String rateLimitsCustomerConfiguration; + private String tenantServerRestLimitsConfiguration; + private String customerServerRestLimitsConfiguration; - private int wsLimitMaxSessionsPerTenant; - private int wsLimitMaxSessionsPerCustomer; - private int wsLimitMaxSessionsPerRegularUser; - private int wsLimitMaxSessionsPerPublicUser; - private int wsLimitQueuePerWsSession; - private long wsLimitMaxSubscriptionsPerTenant; - private long wsLimitMaxSubscriptionsPerCustomer; - private long wsLimitMaxSubscriptionsPerRegularUser; - private long wsLimitMaxSubscriptionsPerPublicUser; - private String wsLimitUpdatesPerSession; + private int maxWsSessionsPerTenant; + private int maxWsSessionsPerCustomer; + private int maxWsSessionsPerRegularUser; + private int maxWsSessionsPerPublicUser; + private int wsMsgQueueLimitPerSession; + private long maxWsSubscriptionsPerTenant; + private long maxWsSubscriptionsPerCustomer; + private long maxWsSubscriptionsPerRegularUser; + private long maxWsSubscriptionsPerPublicUser; + private String wsUpdatesPerSessionRateLimit; - private String cassandraTenantLimitsConfiguration; - private boolean printTenantNames; + private String cassandraQueryTenantRateLimitsConfiguration; private long maxTransportMessages; private long maxTransportDataPoints; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java index 80fbaea65f..9e435ea34e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java @@ -29,11 +29,10 @@ import java.time.Duration; public class TbRateLimits { private final LocalBucket bucket; @Getter - private final String currentConfig; + private final String configuration; public TbRateLimits(String limitsConfiguration) { LocalBucketBuilder builder = Bucket4j.builder(); - currentConfig = limitsConfiguration; boolean initialized = false; for (String limitSrc : limitsConfiguration.split(",")) { long capacity = Long.parseLong(limitSrc.split(":")[0]); @@ -46,8 +45,7 @@ public class TbRateLimits { } else { throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration); } - - + this.configuration = limitsConfiguration; } public boolean tryConsume() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java index 54a9953887..70e89df502 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java @@ -22,9 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsCounter; import org.thingsboard.server.common.stats.StatsFactory; @@ -33,7 +31,6 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AsyncTaskContext; import org.thingsboard.server.dao.util.NoSqlAnyDao; -import org.thingsboard.server.dao.util.TenantRateLimitException; import javax.annotation.PreDestroy; @@ -45,13 +42,6 @@ import javax.annotation.PreDestroy; @NoSqlAnyDao public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecutor { - @Autowired - private EntityService entityService; - @Autowired - private TbTenantProfileCache tenantProfileCache; - - private Map tenantNamesCache = new HashMap<>(); - static final String BUFFER_NAME = "Read"; public CassandraBufferedRateReadExecutor( @@ -64,9 +54,10 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames, @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq, @Autowired StatsFactory statsFactory, - @Autowired EntityService entityService) { + @Autowired EntityService entityService, + @Autowired TbTenantProfileCache tenantProfileCache) { super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, - entityService, printTenantNames); + entityService, tenantProfileCache, printTenantNames); } @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @@ -104,24 +95,4 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu ); } - @Override - protected boolean checkRateLimits(CassandraStatementTask task, SettableFuture future) { - var tenantProfileConfiguration = tenantProfileCache.get(task.getTenantId()).getDefaultTenantProfileConfiguration(); - if (StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraTenantLimitsConfiguration())) { - if (task.getTenantId() == null) { - log.info("Invalid task received: {}", task); - } else if (!task.getTenantId().isNullUid()) { - TbRateLimits rateLimits = perTenantLimits.computeIfAbsent( - task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraTenantLimitsConfiguration()) - ); - if (!rateLimits.tryConsume()) { - stats.incrementRateLimitedTenant(task.getTenantId()); - stats.getTotalRateLimited().increment(); - future.setException(new TenantRateLimitException()); - return true; - } - } - } - return false; - } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java index 990baa1e2a..ebcf9cf38f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java @@ -24,6 +24,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.entity.EntityService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AsyncTaskContext; import org.thingsboard.server.dao.util.NoSqlAnyDao; @@ -47,14 +48,13 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads, @Value("${cassandra.query.callback_threads:4}") int callbackThreads, @Value("${cassandra.query.poll_ms:50}") long pollMs, - @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, - @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames, @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq, @Autowired StatsFactory statsFactory, - @Autowired EntityService entityService) { - super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory, - entityService, printTenantNames); + @Autowired EntityService entityService, + @Autowired TbTenantProfileCache tenantProfileCache) { + super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, + entityService, tenantProfileCache, printTenantNames); } @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index 76710f7fab..b478be0d7b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.common.stats.DefaultCounter; @@ -38,6 +39,7 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.nosql.CassandraStatementTask; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import javax.annotation.Nullable; import java.util.HashMap; @@ -71,22 +73,22 @@ public abstract class AbstractBufferedRateExecutor perTenantLimits = new ConcurrentHashMap<>(); + private final ConcurrentMap perTenantLimits = new ConcurrentHashMap<>(); private final AtomicInteger printQueriesIdx = new AtomicInteger(0); protected final AtomicInteger concurrencyLevel; protected final BufferedRateExecutorStats stats; - private final EntityService entityService; - private final Map tenantNamesCache = new HashMap<>(); + private final TbTenantProfileCache tenantProfileCache; private final boolean printTenantNames; + private final Map tenantNamesCache = new HashMap<>(); public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory, - EntityService entityService, boolean printTenantNames) { + EntityService entityService, TbTenantProfileCache tenantProfileCache, boolean printTenantNames) { this.maxWaitTime = maxWaitTime; this.pollMs = pollMs; this.concurrencyLimit = concurrencyLimit; @@ -100,6 +102,7 @@ public abstract class AbstractBufferedRateExecutor future); - @Override public F submit(T task) { SettableFuture settableFuture = create(); F result = wrap(task, settableFuture); - boolean perTenantLimitReached = checkRateLimits(task, settableFuture); + + boolean perTenantLimitReached = false; + var tenantProfileConfiguration = tenantProfileCache.get(task.getTenantId()).getDefaultTenantProfileConfiguration(); + if (StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) { + if (task.getTenantId() == null) { + log.info("Invalid task received: {}", task); + } else if (!task.getTenantId().isNullUid()) { + TbRateLimits rateLimits = perTenantLimits.computeIfAbsent( + task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration()) + ); + if (!rateLimits.tryConsume()) { + stats.incrementRateLimitedTenant(task.getTenantId()); + stats.getTotalRateLimited().increment(); + settableFuture.setException(new TenantRateLimitException()); + perTenantLimitReached = true; + } + } + } if (!perTenantLimitReached) { try {