Rate limits to tenant profile BE refactoring
This commit is contained in:
parent
cb071ac0a2
commit
487d7165cc
@ -48,22 +48,20 @@ public class RateLimitProcessingFilter extends GenericFilterBean {
|
||||
@Autowired
|
||||
private TbTenantProfileCache tenantProfileCache;
|
||||
|
||||
private ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
|
||||
private ConcurrentMap<CustomerId, TbRateLimits> perCustomerLimits = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<CustomerId, TbRateLimits> perCustomerLimits = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
|
||||
SecurityUser user = getCurrentUser();
|
||||
if (user != null && !user.isSystemAdmin()) {
|
||||
|
||||
var profileConfiguration = tenantProfileCache.get(user.getTenantId()).getDefaultTenantProfileConfiguration();
|
||||
|
||||
if(profileConfiguration != null) {
|
||||
if (user.isTenantAdmin() && StringUtils.isNotEmpty(profileConfiguration.getRateLimitsTenantConfiguration())) {
|
||||
|
||||
if (profileConfiguration != null) {
|
||||
if (user.isTenantAdmin() && StringUtils.isNotEmpty(profileConfiguration.getTenantServerRestLimitsConfiguration())) {
|
||||
TbRateLimits rateLimits = perTenantLimits.get(user.getTenantId());
|
||||
if(rateLimits == null || !rateLimits.getCurrentConfig().equals(profileConfiguration.getRateLimitsTenantConfiguration())) {
|
||||
rateLimits = new TbRateLimits(profileConfiguration.getRateLimitsTenantConfiguration());
|
||||
if (rateLimits == null || !rateLimits.getConfiguration().equals(profileConfiguration.getTenantServerRestLimitsConfiguration())) {
|
||||
// fixme: or maybe handle component lifecycle event ?
|
||||
rateLimits = new TbRateLimits(profileConfiguration.getTenantServerRestLimitsConfiguration());
|
||||
perTenantLimits.put(user.getTenantId(), rateLimits);
|
||||
}
|
||||
|
||||
@ -71,12 +69,10 @@ public class RateLimitProcessingFilter extends GenericFilterBean {
|
||||
errorResponseHandler.handle(new TbRateLimitsException(EntityType.TENANT), (HttpServletResponse) response);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (user.isCustomerUser() && StringUtils.isNotEmpty(profileConfiguration.getRateLimitsCustomerConfiguration())) {
|
||||
|
||||
} else if (user.isCustomerUser() && StringUtils.isNotEmpty(profileConfiguration.getCustomerServerRestLimitsConfiguration())) {
|
||||
TbRateLimits rateLimits = perCustomerLimits.get(user.getCustomerId());
|
||||
if(rateLimits == null || !rateLimits.getCurrentConfig().equals(profileConfiguration.getRateLimitsCustomerConfiguration())) {
|
||||
rateLimits = new TbRateLimits(profileConfiguration.getRateLimitsCustomerConfiguration());
|
||||
if (rateLimits == null || !rateLimits.getConfiguration().equals(profileConfiguration.getCustomerServerRestLimitsConfiguration())) {
|
||||
rateLimits = new TbRateLimits(profileConfiguration.getCustomerServerRestLimitsConfiguration());
|
||||
perCustomerLimits.put(user.getCustomerId(), rateLimits);
|
||||
}
|
||||
|
||||
|
||||
@ -16,12 +16,12 @@
|
||||
package org.thingsboard.server.controller.plugin;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.BeanCreationNotAllowedException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.security.core.Authentication;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.PongMessage;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
@ -139,8 +139,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
return;
|
||||
}
|
||||
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
||||
|
||||
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsLimitQueuePerWsSession()));
|
||||
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, tenantProfileConfiguration.getWsMsgQueueLimitPerSession()));
|
||||
|
||||
externalSessionMap.put(externalSessionId, internalSessionId);
|
||||
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
|
||||
@ -298,14 +297,13 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
String externalId = sessionRef.getSessionId();
|
||||
log.debug("[{}] Processing {}", externalId, msg);
|
||||
String internalId = externalSessionMap.get(externalId);
|
||||
|
||||
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
||||
|
||||
if (internalId != null) {
|
||||
SessionMetaData sessionMd = internalSessionMap.get(internalId);
|
||||
if (sessionMd != null) {
|
||||
if (!StringUtils.isEmpty(tenantProfileConfiguration.getWsLimitUpdatesPerSession())) {
|
||||
TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsLimitUpdatesPerSession()));
|
||||
var tenantProfileConfiguration = tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
||||
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())) {
|
||||
// fixme: is this ok not to update rate limits config if it was change in profile?
|
||||
TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit()));
|
||||
if (!rateLimits.tryConsume()) {
|
||||
if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
|
||||
log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached"
|
||||
@ -364,15 +362,15 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
var tenantProfileConfiguration =
|
||||
tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
||||
|
||||
if(tenantProfileConfiguration == null) {
|
||||
if (tenantProfileConfiguration == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
String sessionId = session.getId();
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSessionsPerTenant() > 0) {
|
||||
if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) {
|
||||
Set<String> tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (tenantSessions) {
|
||||
if (tenantSessions.size() < tenantProfileConfiguration.getWsLimitMaxSessionsPerTenant()) {
|
||||
if (tenantSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerTenant()) {
|
||||
tenantSessions.add(sessionId);
|
||||
} else {
|
||||
log.info("[{}][{}][{}] Failed to start session. Max tenant sessions limit reached"
|
||||
@ -384,10 +382,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
}
|
||||
|
||||
if (sessionRef.getSecurityCtx().isCustomerUser()) {
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSessionsPerCustomer() > 0) {
|
||||
if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) {
|
||||
Set<String> customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (customerSessions) {
|
||||
if (customerSessions.size() < tenantProfileConfiguration.getWsLimitMaxSessionsPerCustomer()) {
|
||||
if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerCustomer()) {
|
||||
customerSessions.add(sessionId);
|
||||
} else {
|
||||
log.info("[{}][{}][{}] Failed to start session. Max customer sessions limit reached"
|
||||
@ -397,11 +395,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
}
|
||||
}
|
||||
}
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSessionsPerRegularUser() > 0
|
||||
if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0
|
||||
&& UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
Set<String> regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (regularUserSessions) {
|
||||
if (regularUserSessions.size() < tenantProfileConfiguration.getWsLimitMaxSessionsPerRegularUser()) {
|
||||
if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerRegularUser()) {
|
||||
regularUserSessions.add(sessionId);
|
||||
} else {
|
||||
log.info("[{}][{}][{}] Failed to start session. Max regular user sessions limit reached"
|
||||
@ -411,11 +409,11 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
}
|
||||
}
|
||||
}
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSessionsPerPublicUser() > 0
|
||||
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0
|
||||
&& UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (publicUserSessions) {
|
||||
if (publicUserSessions.size() < tenantProfileConfiguration.getWsLimitMaxSessionsPerPublicUser()) {
|
||||
if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSessionsPerPublicUser()) {
|
||||
publicUserSessions.add(sessionId);
|
||||
} else {
|
||||
log.info("[{}][{}][{}] Failed to start session. Max public user sessions limit reached"
|
||||
@ -435,26 +433,26 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
String sessionId = session.getId();
|
||||
perSessionUpdateLimits.remove(sessionRef.getSessionId());
|
||||
blacklistedSessions.remove(sessionRef.getSessionId());
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSessionsPerTenant() > 0) {
|
||||
if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) {
|
||||
Set<String> tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (tenantSessions) {
|
||||
tenantSessions.remove(sessionId);
|
||||
}
|
||||
}
|
||||
if (sessionRef.getSecurityCtx().isCustomerUser()) {
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSessionsPerCustomer() > 0) {
|
||||
if (tenantProfileConfiguration.getMaxWsSessionsPerCustomer() > 0) {
|
||||
Set<String> customerSessions = customerSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (customerSessions) {
|
||||
customerSessions.remove(sessionId);
|
||||
}
|
||||
}
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSessionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
if (tenantProfileConfiguration.getMaxWsSessionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
Set<String> regularUserSessions = regularUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (regularUserSessions) {
|
||||
regularUserSessions.remove(sessionId);
|
||||
}
|
||||
}
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
if (tenantProfileConfiguration.getMaxWsSessionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
Set<String> publicUserSessions = publicUserSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (publicUserSessions) {
|
||||
publicUserSessions.remove(sessionId);
|
||||
|
||||
@ -216,6 +216,9 @@ public class ThingsboardInstallService {
|
||||
dataUpdateService.updateData("3.3.2");
|
||||
log.info("Updating system data...");
|
||||
systemDataLoaderService.updateSystemWidgets();
|
||||
case "3.3.3":
|
||||
log.info("Upgrading ThingsBoard from version 3.3.3 to 3.4 ...");
|
||||
dataUpdateService.updateData("3.3.3");
|
||||
break;
|
||||
|
||||
//TODO update CacheCleanupService on the next version upgrade
|
||||
|
||||
@ -25,7 +25,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
|
||||
import org.thingsboard.rule.engine.flow.TbRuleChainInputNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.profile.TbDeviceProfileNode;
|
||||
@ -56,12 +55,9 @@ import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.alarm.AlarmDao;
|
||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.model.sql.DeviceProfileEntity;
|
||||
import org.thingsboard.server.dao.model.sql.RelationEntity;
|
||||
import org.thingsboard.server.dao.oauth2.OAuth2Service;
|
||||
import org.thingsboard.server.dao.relation.RelationService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.sql.device.DeviceProfileRepository;
|
||||
@ -101,9 +97,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
@Autowired
|
||||
private TimeseriesService tsService;
|
||||
|
||||
@Autowired
|
||||
private AlarmService alarmService;
|
||||
|
||||
@Autowired
|
||||
private EntityService entityService;
|
||||
|
||||
@ -113,9 +106,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
@Autowired
|
||||
private DeviceProfileRepository deviceProfileRepository;
|
||||
|
||||
@Autowired
|
||||
private OAuth2Service oAuth2Service;
|
||||
|
||||
@Autowired
|
||||
private RateLimitsUpdater rateLimitsUpdater;
|
||||
|
||||
@ -140,12 +130,15 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
tenantsAlarmsCustomerUpdater.updateEntities(null);
|
||||
deviceProfileEntityDynamicConditionsUpdater.updateEntities(null);
|
||||
updateOAuth2Params();
|
||||
rateLimitsUpdater.updateEntities(null);
|
||||
break;
|
||||
case "3.3.2":
|
||||
log.info("Updating data from version 3.3.2 to 3.3.3 ...");
|
||||
updateNestedRuleChains();
|
||||
break;
|
||||
case "3.3.3":
|
||||
log.info("Updating data from version 3.3.3 to 3.4 ...");
|
||||
rateLimitsUpdater.updateEntities();
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
|
||||
}
|
||||
|
||||
@ -50,6 +50,10 @@ public abstract class PaginatedUpdater<I, D> {
|
||||
}
|
||||
}
|
||||
|
||||
public void updateEntities() {
|
||||
updateEntities(null);
|
||||
}
|
||||
|
||||
protected boolean forceReportTotal() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -15,53 +15,53 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.install.update;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
|
||||
|
||||
@Component
|
||||
class RateLimitsUpdater extends PaginatedUpdater<String, TenantProfile> {
|
||||
|
||||
@Value("${server.rest.limits.tenant.enabled}")
|
||||
boolean tenantServerRateLimitsEnabled;
|
||||
@Value("${server.rest.limits.customer.enabled}")
|
||||
boolean customerServerRateLimitsEnabled;
|
||||
boolean tenantServerRestLimitsEnabled;
|
||||
@Value("${server.rest.limits.tenant.configuration}")
|
||||
String tenantServerRateLimitsConfiguration;
|
||||
String tenantServerRestLimitsConfiguration;
|
||||
@Value("${server.rest.limits.customer.enabled}")
|
||||
boolean customerServerRestLimitsEnabled;
|
||||
@Value("${server.rest.limits.customer.configuration}")
|
||||
String customerServerRateLimitsConfiguration;
|
||||
String customerServerRestLimitsConfiguration;
|
||||
|
||||
@Value("${server.ws.limits.max_sessions_per_tenant}")
|
||||
private int wsLimitMaxSessionsPerTenant;
|
||||
private int maxWsSessionsPerTenant;
|
||||
@Value("${server.ws.limits.max_sessions_per_customer}")
|
||||
private int wsLimitMaxSessionsPerCustomer;
|
||||
private int maxWsSessionsPerCustomer;
|
||||
@Value("${server.ws.limits.max_sessions_per_regular_user}")
|
||||
private int wsLimitMaxSessionsPerRegularUser;
|
||||
private int maxWsSessionsPerRegularUser;
|
||||
@Value("${server.ws.limits.max_sessions_per_public_user}")
|
||||
private int wsLimitMaxSessionsPerPublicUser;
|
||||
private int maxWsSessionsPerPublicUser;
|
||||
@Value("${server.ws.limits.max_queue_per_ws_session}")
|
||||
private int wsLimitQueuePerWsSession;
|
||||
private int wsMsgQueueLimitPerSession;
|
||||
@Value("${server.ws.limits.max_subscriptions_per_tenant}")
|
||||
private long wsLimitMaxSubscriptionsPerTenant;
|
||||
private long maxWsSubscriptionsPerTenant;
|
||||
@Value("${server.ws.limits.max_subscriptions_per_customer}")
|
||||
private long wsLimitMaxSubscriptionsPerCustomer;
|
||||
private long maxWsSubscriptionsPerCustomer;
|
||||
@Value("${server.ws.limits.max_subscriptions_per_regular_user}")
|
||||
private long wsLimitMaxSubscriptionsPerRegularUser;
|
||||
private long maxWsSubscriptionsPerRegularUser;
|
||||
@Value("${server.ws.limits.max_subscriptions_per_public_user}")
|
||||
private long wsLimitMaxSubscriptionsPerPublicUser;
|
||||
private long maxWsSubscriptionsPerPublicUser;
|
||||
@Value("${server.ws.limits.max_updates_per_session}")
|
||||
private String wsLimitUpdatesPerSession;
|
||||
private String wsUpdatesPerSessionRateLimit;
|
||||
|
||||
@Value("${cassandra.query.tenant_rate_limits.enabled}")
|
||||
private boolean cassandraLimitsIsEnabled;
|
||||
private boolean cassandraQueryTenantRateLimitsEnabled;
|
||||
@Value("${cassandra.query.tenant_rate_limits.configuration}")
|
||||
private String cassandraTenantLimitsConfiguration;
|
||||
@Value("${cassandra.query.tenant_rate_limits.print_tenant_names}")
|
||||
private boolean printTenantNames;
|
||||
private String cassandraQueryTenantRateLimitsConfiguration;
|
||||
|
||||
@Autowired
|
||||
private TenantProfileService tenantProfileService;
|
||||
@ -78,40 +78,38 @@ class RateLimitsUpdater extends PaginatedUpdater<String, TenantProfile> {
|
||||
|
||||
@Override
|
||||
protected PageData<TenantProfile> findEntities(String id, PageLink pageLink) {
|
||||
return tenantProfileService.findTenantProfiles(null, pageLink);
|
||||
return tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateEntity(TenantProfile entity) {
|
||||
var profileConfiguration = entity.getDefaultTenantProfileConfiguration();
|
||||
if (profileConfiguration != null) {
|
||||
if (tenantServerRateLimitsEnabled && StringUtils.isNotEmpty(tenantServerRateLimitsConfiguration)) {
|
||||
profileConfiguration.setRateLimitsTenantConfiguration(tenantServerRateLimitsConfiguration);
|
||||
}
|
||||
if (customerServerRateLimitsEnabled && StringUtils.isNotEmpty(customerServerRateLimitsConfiguration)) {
|
||||
profileConfiguration.setRateLimitsCustomerConfiguration(customerServerRateLimitsConfiguration);
|
||||
}
|
||||
protected void updateEntity(TenantProfile tenantProfile) {
|
||||
var profileConfiguration = tenantProfile.getDefaultTenantProfileConfiguration();
|
||||
|
||||
profileConfiguration.setWsLimitMaxSessionsPerTenant(wsLimitMaxSessionsPerTenant);
|
||||
profileConfiguration.setWsLimitMaxSessionsPerCustomer(wsLimitMaxSessionsPerCustomer);
|
||||
profileConfiguration.setWsLimitMaxSessionsPerPublicUser(wsLimitMaxSessionsPerPublicUser);
|
||||
profileConfiguration.setWsLimitMaxSessionsPerRegularUser(wsLimitMaxSessionsPerRegularUser);
|
||||
profileConfiguration.setWsLimitMaxSubscriptionsPerTenant(wsLimitMaxSubscriptionsPerTenant);
|
||||
profileConfiguration.setWsLimitMaxSubscriptionsPerCustomer(wsLimitMaxSubscriptionsPerCustomer);
|
||||
profileConfiguration.setWsLimitMaxSubscriptionsPerPublicUser(wsLimitMaxSubscriptionsPerPublicUser);
|
||||
profileConfiguration.setWsLimitMaxSubscriptionsPerRegularUser(wsLimitMaxSubscriptionsPerRegularUser);
|
||||
profileConfiguration.setWsLimitQueuePerWsSession(wsLimitQueuePerWsSession);
|
||||
|
||||
if (StringUtils.isNotEmpty(wsLimitUpdatesPerSession)) {
|
||||
profileConfiguration.setWsLimitUpdatesPerSession(wsLimitUpdatesPerSession);
|
||||
}
|
||||
|
||||
if (cassandraLimitsIsEnabled) {
|
||||
profileConfiguration.setCassandraTenantLimitsConfiguration(cassandraTenantLimitsConfiguration);
|
||||
profileConfiguration.setPrintTenantNames(printTenantNames);
|
||||
}
|
||||
|
||||
tenantProfileService.saveTenantProfile(null, entity);
|
||||
if (tenantServerRestLimitsEnabled && StringUtils.isNotEmpty(tenantServerRestLimitsConfiguration)) {
|
||||
profileConfiguration.setTenantServerRestLimitsConfiguration(tenantServerRestLimitsConfiguration);
|
||||
}
|
||||
if (customerServerRestLimitsEnabled && StringUtils.isNotEmpty(customerServerRestLimitsConfiguration)) {
|
||||
profileConfiguration.setCustomerServerRestLimitsConfiguration(customerServerRestLimitsConfiguration);
|
||||
}
|
||||
|
||||
profileConfiguration.setMaxWsSessionsPerTenant(maxWsSessionsPerTenant);
|
||||
profileConfiguration.setMaxWsSessionsPerCustomer(maxWsSessionsPerCustomer);
|
||||
profileConfiguration.setMaxWsSessionsPerPublicUser(maxWsSessionsPerPublicUser);
|
||||
profileConfiguration.setMaxWsSessionsPerRegularUser(maxWsSessionsPerRegularUser);
|
||||
profileConfiguration.setMaxWsSubscriptionsPerTenant(maxWsSubscriptionsPerTenant);
|
||||
profileConfiguration.setMaxWsSubscriptionsPerCustomer(maxWsSubscriptionsPerCustomer);
|
||||
profileConfiguration.setMaxWsSubscriptionsPerPublicUser(maxWsSubscriptionsPerPublicUser);
|
||||
profileConfiguration.setMaxWsSubscriptionsPerRegularUser(maxWsSubscriptionsPerRegularUser);
|
||||
profileConfiguration.setWsMsgQueueLimitPerSession(wsMsgQueueLimitPerSession);
|
||||
if (StringUtils.isNotEmpty(wsUpdatesPerSessionRateLimit)) {
|
||||
profileConfiguration.setWsUpdatesPerSessionRateLimit(wsUpdatesPerSessionRateLimit);
|
||||
}
|
||||
|
||||
if (cassandraQueryTenantRateLimitsEnabled && StringUtils.isNotEmpty(cassandraQueryTenantRateLimitsConfiguration)) {
|
||||
profileConfiguration.setCassandraQueryTenantRateLimitsConfiguration(cassandraQueryTenantRateLimitsConfiguration);
|
||||
}
|
||||
|
||||
tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, tenantProfile);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
@ -304,29 +305,29 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
||||
|
||||
private void processSessionClose(TelemetryWebSocketSessionRef sessionRef) {
|
||||
var tenantProfileConfiguration = (DefaultTenantProfileConfiguration) tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()).getDefaultTenantProfileConfiguration();
|
||||
if(tenantProfileConfiguration != null) {
|
||||
if (tenantProfileConfiguration != null) {
|
||||
String sessionId = "[" + sessionRef.getSessionId() + "]";
|
||||
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerTenant() > 0) {
|
||||
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) {
|
||||
Set<String> tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (tenantSubscriptions) {
|
||||
tenantSubscriptions.removeIf(subId -> subId.startsWith(sessionId));
|
||||
}
|
||||
}
|
||||
if (sessionRef.getSecurityCtx().isCustomerUser()) {
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerCustomer() > 0) {
|
||||
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) {
|
||||
Set<String> customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (customerSessions) {
|
||||
customerSessions.removeIf(subId -> subId.startsWith(sessionId));
|
||||
}
|
||||
}
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
Set<String> regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (regularUserSessions) {
|
||||
regularUserSessions.removeIf(subId -> subId.startsWith(sessionId));
|
||||
}
|
||||
}
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (publicUserSessions) {
|
||||
publicUserSessions.removeIf(subId -> subId.startsWith(sessionId));
|
||||
@ -341,12 +342,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
||||
|
||||
String subId = "[" + sessionRef.getSessionId() + "]:[" + cmd.getCmdId() + "]";
|
||||
try {
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerTenant() > 0) {
|
||||
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant() > 0) {
|
||||
Set<String> tenantSubscriptions = tenantSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (tenantSubscriptions) {
|
||||
if (cmd.isUnsubscribe()) {
|
||||
tenantSubscriptions.remove(subId);
|
||||
} else if (tenantSubscriptions.size() < tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerTenant()) {
|
||||
} else if (tenantSubscriptions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerTenant()) {
|
||||
tenantSubscriptions.add(subId);
|
||||
} else {
|
||||
log.info("[{}][{}][{}] Failed to start subscription. Max tenant subscriptions limit reached"
|
||||
@ -358,12 +359,12 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
||||
}
|
||||
|
||||
if (sessionRef.getSecurityCtx().isCustomerUser()) {
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerCustomer() > 0) {
|
||||
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer() > 0) {
|
||||
Set<String> customerSessions = customerSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getCustomerId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (customerSessions) {
|
||||
if (cmd.isUnsubscribe()) {
|
||||
customerSessions.remove(subId);
|
||||
} else if (customerSessions.size() < tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerCustomer()) {
|
||||
} else if (customerSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerCustomer()) {
|
||||
customerSessions.add(subId);
|
||||
} else {
|
||||
log.info("[{}][{}][{}] Failed to start subscription. Max customer subscriptions limit reached"
|
||||
@ -373,10 +374,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
||||
}
|
||||
}
|
||||
}
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser() > 0 && UserPrincipal.Type.USER_NAME.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
Set<String> regularUserSessions = regularUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (regularUserSessions) {
|
||||
if (regularUserSessions.size() < tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerRegularUser()) {
|
||||
if (regularUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerRegularUser()) {
|
||||
regularUserSessions.add(subId);
|
||||
} else {
|
||||
log.info("[{}][{}][{}] Failed to start subscription. Max regular user subscriptions limit reached"
|
||||
@ -386,10 +387,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
||||
}
|
||||
}
|
||||
}
|
||||
if (tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
if (tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser() > 0 && UserPrincipal.Type.PUBLIC_ID.equals(sessionRef.getSecurityCtx().getUserPrincipal().getType())) {
|
||||
Set<String> publicUserSessions = publicUserSubscriptionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getId(), id -> ConcurrentHashMap.newKeySet());
|
||||
synchronized (publicUserSessions) {
|
||||
if (publicUserSessions.size() < tenantProfileConfiguration.getWsLimitMaxSubscriptionsPerPublicUser()) {
|
||||
if (publicUserSessions.size() < tenantProfileConfiguration.getMaxWsSubscriptionsPerPublicUser()) {
|
||||
publicUserSessions.add(subId);
|
||||
} else {
|
||||
log.info("[{}][{}][{}] Failed to start subscription. Max public user subscriptions limit reached"
|
||||
|
||||
@ -83,7 +83,7 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H
|
||||
@ApiModelProperty(position = 1, value = "JSON object with the tenant profile Id. " +
|
||||
"Specify this field to update the tenant profile. " +
|
||||
"Referencing non-existing tenant profile Id will cause error. " +
|
||||
"Omit this field to create new tenant profile." )
|
||||
"Omit this field to create new tenant profile.")
|
||||
@Override
|
||||
public TenantProfileId getId() {
|
||||
return super.getId();
|
||||
@ -133,6 +133,7 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H
|
||||
public TenantProfileData createDefaultTenantProfileData() {
|
||||
TenantProfileData tpd = new TenantProfileData();
|
||||
tpd.setConfiguration(new DefaultTenantProfileConfiguration());
|
||||
this.profileData = tpd;
|
||||
return tpd;
|
||||
}
|
||||
|
||||
@ -147,11 +148,7 @@ public class TenantProfile extends SearchTextBased<TenantProfileId> implements H
|
||||
|
||||
@JsonIgnore
|
||||
public DefaultTenantProfileConfiguration getDefaultTenantProfileConfiguration() {
|
||||
if(getProfileData().getConfiguration() != null &&
|
||||
getProfileData().getConfiguration().getType().equals(TenantProfileType.DEFAULT)) {
|
||||
return (DefaultTenantProfileConfiguration) this.profileData.getConfiguration();
|
||||
}
|
||||
return null;
|
||||
return getProfileConfiguration().orElse(null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -46,22 +46,21 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
|
||||
private String transportDeviceTelemetryMsgRateLimit;
|
||||
private String transportDeviceTelemetryDataPointsRateLimit;
|
||||
|
||||
private String rateLimitsTenantConfiguration;
|
||||
private String rateLimitsCustomerConfiguration;
|
||||
private String tenantServerRestLimitsConfiguration;
|
||||
private String customerServerRestLimitsConfiguration;
|
||||
|
||||
private int wsLimitMaxSessionsPerTenant;
|
||||
private int wsLimitMaxSessionsPerCustomer;
|
||||
private int wsLimitMaxSessionsPerRegularUser;
|
||||
private int wsLimitMaxSessionsPerPublicUser;
|
||||
private int wsLimitQueuePerWsSession;
|
||||
private long wsLimitMaxSubscriptionsPerTenant;
|
||||
private long wsLimitMaxSubscriptionsPerCustomer;
|
||||
private long wsLimitMaxSubscriptionsPerRegularUser;
|
||||
private long wsLimitMaxSubscriptionsPerPublicUser;
|
||||
private String wsLimitUpdatesPerSession;
|
||||
private int maxWsSessionsPerTenant;
|
||||
private int maxWsSessionsPerCustomer;
|
||||
private int maxWsSessionsPerRegularUser;
|
||||
private int maxWsSessionsPerPublicUser;
|
||||
private int wsMsgQueueLimitPerSession;
|
||||
private long maxWsSubscriptionsPerTenant;
|
||||
private long maxWsSubscriptionsPerCustomer;
|
||||
private long maxWsSubscriptionsPerRegularUser;
|
||||
private long maxWsSubscriptionsPerPublicUser;
|
||||
private String wsUpdatesPerSessionRateLimit;
|
||||
|
||||
private String cassandraTenantLimitsConfiguration;
|
||||
private boolean printTenantNames;
|
||||
private String cassandraQueryTenantRateLimitsConfiguration;
|
||||
|
||||
private long maxTransportMessages;
|
||||
private long maxTransportDataPoints;
|
||||
|
||||
@ -29,11 +29,10 @@ import java.time.Duration;
|
||||
public class TbRateLimits {
|
||||
private final LocalBucket bucket;
|
||||
@Getter
|
||||
private final String currentConfig;
|
||||
private final String configuration;
|
||||
|
||||
public TbRateLimits(String limitsConfiguration) {
|
||||
LocalBucketBuilder builder = Bucket4j.builder();
|
||||
currentConfig = limitsConfiguration;
|
||||
boolean initialized = false;
|
||||
for (String limitSrc : limitsConfiguration.split(",")) {
|
||||
long capacity = Long.parseLong(limitSrc.split(":")[0]);
|
||||
@ -46,8 +45,7 @@ public class TbRateLimits {
|
||||
} else {
|
||||
throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);
|
||||
}
|
||||
|
||||
|
||||
this.configuration = limitsConfiguration;
|
||||
}
|
||||
|
||||
public boolean tryConsume() {
|
||||
|
||||
@ -22,9 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
|
||||
import org.thingsboard.server.common.stats.DefaultCounter;
|
||||
import org.thingsboard.server.common.stats.StatsCounter;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
@ -33,7 +31,6 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
|
||||
import org.thingsboard.server.dao.util.AsyncTaskContext;
|
||||
import org.thingsboard.server.dao.util.NoSqlAnyDao;
|
||||
import org.thingsboard.server.dao.util.TenantRateLimitException;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
@ -45,13 +42,6 @@ import javax.annotation.PreDestroy;
|
||||
@NoSqlAnyDao
|
||||
public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> {
|
||||
|
||||
@Autowired
|
||||
private EntityService entityService;
|
||||
@Autowired
|
||||
private TbTenantProfileCache tenantProfileCache;
|
||||
|
||||
private Map<TenantId, String> tenantNamesCache = new HashMap<>();
|
||||
|
||||
static final String BUFFER_NAME = "Read";
|
||||
|
||||
public CassandraBufferedRateReadExecutor(
|
||||
@ -64,9 +54,10 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu
|
||||
@Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
|
||||
@Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
|
||||
@Autowired StatsFactory statsFactory,
|
||||
@Autowired EntityService entityService) {
|
||||
@Autowired EntityService entityService,
|
||||
@Autowired TbTenantProfileCache tenantProfileCache) {
|
||||
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory,
|
||||
entityService, printTenantNames);
|
||||
entityService, tenantProfileCache, printTenantNames);
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
|
||||
@ -104,24 +95,4 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean checkRateLimits(CassandraStatementTask task, SettableFuture<TbResultSet> future) {
|
||||
var tenantProfileConfiguration = tenantProfileCache.get(task.getTenantId()).getDefaultTenantProfileConfiguration();
|
||||
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraTenantLimitsConfiguration())) {
|
||||
if (task.getTenantId() == null) {
|
||||
log.info("Invalid task received: {}", task);
|
||||
} else if (!task.getTenantId().isNullUid()) {
|
||||
TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(
|
||||
task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraTenantLimitsConfiguration())
|
||||
);
|
||||
if (!rateLimits.tryConsume()) {
|
||||
stats.incrementRateLimitedTenant(task.getTenantId());
|
||||
stats.getTotalRateLimited().increment();
|
||||
future.setException(new TenantRateLimitException());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
|
||||
import org.thingsboard.server.dao.util.AsyncTaskContext;
|
||||
import org.thingsboard.server.dao.util.NoSqlAnyDao;
|
||||
@ -47,14 +48,13 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec
|
||||
@Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
|
||||
@Value("${cassandra.query.callback_threads:4}") int callbackThreads,
|
||||
@Value("${cassandra.query.poll_ms:50}") long pollMs,
|
||||
@Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
|
||||
@Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
|
||||
@Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
|
||||
@Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
|
||||
@Autowired StatsFactory statsFactory,
|
||||
@Autowired EntityService entityService) {
|
||||
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory,
|
||||
entityService, printTenantNames);
|
||||
@Autowired EntityService entityService,
|
||||
@Autowired TbTenantProfileCache tenantProfileCache) {
|
||||
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory,
|
||||
entityService, tenantProfileCache, printTenantNames);
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
|
||||
|
||||
@ -30,6 +30,7 @@ import com.google.common.util.concurrent.SettableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
|
||||
import org.thingsboard.server.common.stats.DefaultCounter;
|
||||
@ -38,6 +39,7 @@ import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.common.stats.StatsType;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashMap;
|
||||
@ -71,22 +73,22 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
||||
private final ScheduledExecutorService timeoutExecutor;
|
||||
private final int concurrencyLimit;
|
||||
private final int printQueriesFreq;
|
||||
protected final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
|
||||
|
||||
private final AtomicInteger printQueriesIdx = new AtomicInteger(0);
|
||||
|
||||
protected final AtomicInteger concurrencyLevel;
|
||||
protected final BufferedRateExecutorStats stats;
|
||||
|
||||
|
||||
private final EntityService entityService;
|
||||
private final Map<TenantId, String> tenantNamesCache = new HashMap<>();
|
||||
private final TbTenantProfileCache tenantProfileCache;
|
||||
|
||||
private final boolean printTenantNames;
|
||||
private final Map<TenantId, String> tenantNamesCache = new HashMap<>();
|
||||
|
||||
public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads,
|
||||
int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory,
|
||||
EntityService entityService, boolean printTenantNames) {
|
||||
EntityService entityService, TbTenantProfileCache tenantProfileCache, boolean printTenantNames) {
|
||||
this.maxWaitTime = maxWaitTime;
|
||||
this.pollMs = pollMs;
|
||||
this.concurrencyLimit = concurrencyLimit;
|
||||
@ -100,6 +102,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
||||
this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0));
|
||||
|
||||
this.entityService = entityService;
|
||||
this.tenantProfileCache = tenantProfileCache;
|
||||
this.printTenantNames = printTenantNames;
|
||||
|
||||
for (int i = 0; i < dispatcherThreads; i++) {
|
||||
@ -107,13 +110,28 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract boolean checkRateLimits(T task, SettableFuture<V> future);
|
||||
|
||||
@Override
|
||||
public F submit(T task) {
|
||||
SettableFuture<V> settableFuture = create();
|
||||
F result = wrap(task, settableFuture);
|
||||
boolean perTenantLimitReached = checkRateLimits(task, settableFuture);
|
||||
|
||||
boolean perTenantLimitReached = false;
|
||||
var tenantProfileConfiguration = tenantProfileCache.get(task.getTenantId()).getDefaultTenantProfileConfiguration();
|
||||
if (StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) {
|
||||
if (task.getTenantId() == null) {
|
||||
log.info("Invalid task received: {}", task);
|
||||
} else if (!task.getTenantId().isNullUid()) {
|
||||
TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(
|
||||
task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())
|
||||
);
|
||||
if (!rateLimits.tryConsume()) {
|
||||
stats.incrementRateLimitedTenant(task.getTenantId());
|
||||
stats.getTotalRateLimited().increment();
|
||||
settableFuture.setException(new TenantRateLimitException());
|
||||
perTenantLimitReached = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!perTenantLimitReached) {
|
||||
try {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user