Merge branch 'develop/3.5.1' into develop/3.5.2
This commit is contained in:
		
						commit
						c0b0c7ac5d
					
				@ -15,64 +15,53 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.config;
 | 
			
		||||
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
import org.springframework.security.authentication.BadCredentialsException;
 | 
			
		||||
import org.springframework.security.core.Authentication;
 | 
			
		||||
import org.springframework.security.core.context.SecurityContextHolder;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.springframework.web.filter.OncePerRequestFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.TenantProfileNotFoundException;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.service.security.model.SecurityUser;
 | 
			
		||||
 | 
			
		||||
import javax.servlet.FilterChain;
 | 
			
		||||
import javax.servlet.ServletException;
 | 
			
		||||
import javax.servlet.ServletResponse;
 | 
			
		||||
import javax.servlet.http.HttpServletRequest;
 | 
			
		||||
import javax.servlet.http.HttpServletResponse;
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Component
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class RateLimitProcessingFilter extends OncePerRequestFilter {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private ThingsboardErrorResponseHandler errorResponseHandler;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    @Lazy
 | 
			
		||||
    private TbTenantProfileCache tenantProfileCache;
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ConcurrentMap<CustomerId, TbRateLimits> perCustomerLimits = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ThingsboardErrorResponseHandler errorResponseHandler;
 | 
			
		||||
    private final RateLimitService rateLimitService;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException {
 | 
			
		||||
        SecurityUser user = getCurrentUser();
 | 
			
		||||
        if (user != null && !user.isSystemAdmin()) {
 | 
			
		||||
            var profile = tenantProfileCache.get(user.getTenantId());
 | 
			
		||||
            if (profile == null) {
 | 
			
		||||
            try {
 | 
			
		||||
                if (!rateLimitService.checkRateLimit(LimitedApi.REST_REQUESTS, user.getTenantId())) {
 | 
			
		||||
                    rateLimitExceeded(EntityType.TENANT, response);
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
            } catch (TenantProfileNotFoundException e) {
 | 
			
		||||
                log.debug("[{}] Failed to lookup tenant profile", user.getTenantId());
 | 
			
		||||
                errorResponseHandler.handle(new BadCredentialsException("Failed to lookup tenant profile"), response);
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            var profileConfiguration = profile.getDefaultProfileConfiguration();
 | 
			
		||||
            if (!checkRateLimits(user.getTenantId(), profileConfiguration.getTenantServerRestLimitsConfiguration(), perTenantLimits, response)) {
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (user.isCustomerUser()) {
 | 
			
		||||
                if (!checkRateLimits(user.getCustomerId(), profileConfiguration.getCustomerServerRestLimitsConfiguration(), perCustomerLimits, response)) {
 | 
			
		||||
                if (!rateLimitService.checkRateLimit(LimitedApi.REST_REQUESTS, user.getTenantId(), user.getCustomerId())) {
 | 
			
		||||
                    rateLimitExceeded(EntityType.CUSTOMER, response);
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
@ -90,23 +79,8 @@ public class RateLimitProcessingFilter extends OncePerRequestFilter {
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <I extends EntityId> boolean checkRateLimits(I ownerId, String rateLimitConfig, Map<I, TbRateLimits> rateLimitsMap, ServletResponse response) {
 | 
			
		||||
        if (StringUtils.isNotEmpty(rateLimitConfig)) {
 | 
			
		||||
            TbRateLimits rateLimits = rateLimitsMap.get(ownerId);
 | 
			
		||||
            if (rateLimits == null || !rateLimits.getConfiguration().equals(rateLimitConfig)) {
 | 
			
		||||
                rateLimits = new TbRateLimits(rateLimitConfig);
 | 
			
		||||
                rateLimitsMap.put(ownerId, rateLimits);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (!rateLimits.tryConsume()) {
 | 
			
		||||
                errorResponseHandler.handle(new TbRateLimitsException(ownerId.getEntityType()), (HttpServletResponse) response);
 | 
			
		||||
                return false;
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            rateLimitsMap.remove(ownerId);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return true;
 | 
			
		||||
    private void rateLimitExceeded(EntityType type, HttpServletResponse response) {
 | 
			
		||||
        errorResponseHandler.handle(new TbRateLimitsException(type), response);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected SecurityUser getCurrentUser() {
 | 
			
		||||
 | 
			
		||||
@ -42,16 +42,15 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.security.UserCredentials;
 | 
			
		||||
import org.thingsboard.server.common.data.security.event.UserCredentialsInvalidationEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.security.event.UserSessionInvalidationEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.security.model.JwtPair;
 | 
			
		||||
import org.thingsboard.server.common.data.security.model.SecuritySettings;
 | 
			
		||||
import org.thingsboard.server.common.data.security.model.UserPasswordPolicy;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
 | 
			
		||||
import org.thingsboard.server.dao.audit.AuditLogService;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.service.security.auth.rest.RestAuthenticationDetails;
 | 
			
		||||
import org.thingsboard.server.service.security.model.ActivateUserRequest;
 | 
			
		||||
import org.thingsboard.server.service.security.model.ChangePasswordRequest;
 | 
			
		||||
@ -65,8 +64,6 @@ import org.thingsboard.server.service.security.system.SystemSecurityService;
 | 
			
		||||
import javax.servlet.http.HttpServletRequest;
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.net.URISyntaxException;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
 | 
			
		||||
@RestController
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
@ -77,12 +74,11 @@ public class AuthController extends BaseController {
 | 
			
		||||
 | 
			
		||||
    @Value("${server.rest.rate_limits.reset_password_per_user:5:3600}")
 | 
			
		||||
    private String defaultLimitsConfiguration;
 | 
			
		||||
    private final ConcurrentMap<UserId, TbRateLimits> resetPasswordRateLimits = new ConcurrentHashMap<>();
 | 
			
		||||
    private final BCryptPasswordEncoder passwordEncoder;
 | 
			
		||||
    private final JwtTokenFactory tokenFactory;
 | 
			
		||||
    private final MailService mailService;
 | 
			
		||||
    private final SystemSecurityService systemSecurityService;
 | 
			
		||||
    private final AuditLogService auditLogService;
 | 
			
		||||
    private final RateLimitService rateLimitService;
 | 
			
		||||
    private final ApplicationEventPublisher eventPublisher;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -210,8 +206,7 @@ public class AuthController extends BaseController {
 | 
			
		||||
        UserCredentials userCredentials = userService.findUserCredentialsByResetToken(TenantId.SYS_TENANT_ID, resetToken);
 | 
			
		||||
 | 
			
		||||
        if (userCredentials != null) {
 | 
			
		||||
            TbRateLimits tbRateLimits = getTbRateLimits(userCredentials.getUserId());
 | 
			
		||||
            if (!tbRateLimits.tryConsume()) {
 | 
			
		||||
            if (!rateLimitService.checkRateLimit(LimitedApi.PASSWORD_RESET, userCredentials.getUserId(), defaultLimitsConfiguration)) {
 | 
			
		||||
                return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).build();
 | 
			
		||||
            }
 | 
			
		||||
            try {
 | 
			
		||||
@ -314,8 +309,4 @@ public class AuthController extends BaseController {
 | 
			
		||||
        eventPublisher.publishEvent(new UserSessionInvalidationEvent(user.getSessionId()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbRateLimits getTbRateLimits(UserId userId) {
 | 
			
		||||
        return resetPasswordRateLimits.computeIfAbsent(userId,
 | 
			
		||||
                key -> new TbRateLimits(defaultLimitsConfiguration, true));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -28,16 +28,16 @@ import org.springframework.web.socket.TextMessage;
 | 
			
		||||
import org.springframework.web.socket.WebSocketSession;
 | 
			
		||||
import org.springframework.web.socket.adapter.NativeWebSocketSession;
 | 
			
		||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
 | 
			
		||||
import org.thingsboard.server.config.WebSocketConfiguration;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.security.model.SecurityUser;
 | 
			
		||||
import org.thingsboard.server.service.security.model.UserPrincipal;
 | 
			
		||||
@ -80,6 +80,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private TbTenantProfileCache tenantProfileCache;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private RateLimitService rateLimitService;
 | 
			
		||||
 | 
			
		||||
    @Value("${server.ws.send_timeout:5000}")
 | 
			
		||||
    private long sendTimeout;
 | 
			
		||||
    @Value("${server.ws.ping_timeout:30000}")
 | 
			
		||||
@ -88,7 +91,6 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
 | 
			
		||||
    private int wsMaxQueueMessagesPerSession;
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentMap<String, WebSocketSessionRef> blacklistedSessions = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ConcurrentMap<String, TbRateLimits> perSessionUpdateLimits = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentMap<TenantId, Set<String>> tenantSessionsMap = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ConcurrentMap<CustomerId, Set<String>> customerSessionsMap = new ConcurrentHashMap<>();
 | 
			
		||||
@ -331,24 +333,17 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
 | 
			
		||||
        if (internalId != null) {
 | 
			
		||||
            SessionMetaData sessionMd = internalSessionMap.get(internalId);
 | 
			
		||||
            if (sessionMd != null) {
 | 
			
		||||
                var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef);
 | 
			
		||||
                if (tenantProfileConfiguration != null) {
 | 
			
		||||
                    if (StringUtils.isNotEmpty(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit())) {
 | 
			
		||||
                        TbRateLimits rateLimits = perSessionUpdateLimits.computeIfAbsent(sessionRef.getSessionId(), sid -> new TbRateLimits(tenantProfileConfiguration.getWsUpdatesPerSessionRateLimit()));
 | 
			
		||||
                        if (!rateLimits.tryConsume()) {
 | 
			
		||||
                            if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
 | 
			
		||||
                                log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached"
 | 
			
		||||
                                        , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
 | 
			
		||||
                                sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}");
 | 
			
		||||
                            }
 | 
			
		||||
                            return;
 | 
			
		||||
                        } else {
 | 
			
		||||
                            log.debug("[{}][{}][{}] Session is no longer blacklisted.", sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
 | 
			
		||||
                            blacklistedSessions.remove(externalId);
 | 
			
		||||
                        }
 | 
			
		||||
                    } else {
 | 
			
		||||
                        perSessionUpdateLimits.remove(sessionRef.getSessionId());
 | 
			
		||||
                TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
 | 
			
		||||
                if (!rateLimitService.checkRateLimit(LimitedApi.WS_UPDATES_PER_SESSION, tenantId, (Object) sessionRef.getSessionId())) {
 | 
			
		||||
                    if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
 | 
			
		||||
                        log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached"
 | 
			
		||||
                                , tenantId, sessionRef.getSecurityCtx().getId(), externalId);
 | 
			
		||||
                        sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}");
 | 
			
		||||
                    }
 | 
			
		||||
                    return;
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.debug("[{}][{}][{}] Session is no longer blacklisted.", tenantId, sessionRef.getSecurityCtx().getId(), externalId);
 | 
			
		||||
                    blacklistedSessions.remove(externalId);
 | 
			
		||||
                }
 | 
			
		||||
                sessionMd.sendMsg(msg);
 | 
			
		||||
            } else {
 | 
			
		||||
@ -464,7 +459,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
 | 
			
		||||
        if (tenantProfileConfiguration == null) return;
 | 
			
		||||
 | 
			
		||||
        String sessionId = session.getId();
 | 
			
		||||
        perSessionUpdateLimits.remove(sessionRef.getSessionId());
 | 
			
		||||
        rateLimitService.cleanUp(LimitedApi.WS_UPDATES_PER_SESSION, sessionRef.getSessionId());
 | 
			
		||||
        blacklistedSessions.remove(sessionRef.getSessionId());
 | 
			
		||||
        if (tenantProfileConfiguration.getMaxWsSessionsPerTenant() > 0) {
 | 
			
		||||
            Set<String> tenantSessions = tenantSessionsMap.computeIfAbsent(sessionRef.getSecurityCtx().getTenantId(), id -> ConcurrentHashMap.newKeySet());
 | 
			
		||||
 | 
			
		||||
@ -1,37 +0,0 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.apiusage.limits;
 | 
			
		||||
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
			
		||||
 | 
			
		||||
import java.util.function.Function;
 | 
			
		||||
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public enum LimitedApi {
 | 
			
		||||
 | 
			
		||||
    ENTITY_EXPORT(DefaultTenantProfileConfiguration::getTenantEntityExportRateLimit),
 | 
			
		||||
    ENTITY_IMPORT(DefaultTenantProfileConfiguration::getTenantEntityImportRateLimit),
 | 
			
		||||
    NOTIFICATION_REQUESTS(DefaultTenantProfileConfiguration::getTenantNotificationRequestsRateLimit),
 | 
			
		||||
    NOTIFICATION_REQUESTS_PER_RULE(DefaultTenantProfileConfiguration::getTenantNotificationRequestsPerRuleRateLimit);
 | 
			
		||||
 | 
			
		||||
    private final Function<DefaultTenantProfileConfiguration, String> configExtractor;
 | 
			
		||||
 | 
			
		||||
    public String getLimitConfig(DefaultTenantProfileConfiguration profileConfiguration) {
 | 
			
		||||
        return configExtractor.apply(profileConfiguration);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -64,8 +64,8 @@ import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.NotificationsTopicService;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.executors.NotificationExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.notification.channels.NotificationChannel;
 | 
			
		||||
 | 
			
		||||
@ -44,10 +44,10 @@ import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigg
 | 
			
		||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationRequestService;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.service.executors.NotificationExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.notification.rule.cache.NotificationRulesCache;
 | 
			
		||||
import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor;
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.service.security.auth.mfa;
 | 
			
		||||
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.apache.commons.lang3.tuple.Pair;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.security.authentication.LockedException;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
@ -29,8 +30,9 @@ import org.thingsboard.server.common.data.security.model.mfa.PlatformTwoFaSettin
 | 
			
		||||
import org.thingsboard.server.common.data.security.model.mfa.account.TwoFaAccountConfig;
 | 
			
		||||
import org.thingsboard.server.common.data.security.model.mfa.provider.TwoFaProviderConfig;
 | 
			
		||||
import org.thingsboard.server.common.data.security.model.mfa.provider.TwoFaProviderType;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
 | 
			
		||||
import org.thingsboard.server.dao.user.UserService;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.security.auth.mfa.config.TwoFaConfigManager;
 | 
			
		||||
import org.thingsboard.server.service.security.auth.mfa.provider.TwoFaProvider;
 | 
			
		||||
@ -41,8 +43,6 @@ import java.util.Collection;
 | 
			
		||||
import java.util.EnumMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@ -52,14 +52,13 @@ public class DefaultTwoFactorAuthService implements TwoFactorAuthService {
 | 
			
		||||
    private final TwoFaConfigManager configManager;
 | 
			
		||||
    private final SystemSecurityService systemSecurityService;
 | 
			
		||||
    private final UserService userService;
 | 
			
		||||
    private final RateLimitService rateLimitService;
 | 
			
		||||
    private final Map<TwoFaProviderType, TwoFaProvider<TwoFaProviderConfig, TwoFaAccountConfig>> providers = new EnumMap<>(TwoFaProviderType.class);
 | 
			
		||||
 | 
			
		||||
    private static final ThingsboardException ACCOUNT_NOT_CONFIGURED_ERROR = new ThingsboardException("2FA is not configured for account", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
 | 
			
		||||
    private static final ThingsboardException PROVIDER_NOT_CONFIGURED_ERROR = new ThingsboardException("2FA provider is not configured", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
 | 
			
		||||
    private static final ThingsboardException PROVIDER_NOT_AVAILABLE_ERROR = new ThingsboardException("2FA provider is not available", ThingsboardErrorCode.GENERAL);
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentMap<UserId, ConcurrentMap<TwoFaProviderType, TbRateLimits>> verificationCodeSendingRateLimits = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ConcurrentMap<UserId, ConcurrentMap<TwoFaProviderType, TbRateLimits>> verificationCodeCheckingRateLimits = new ConcurrentHashMap<>();
 | 
			
		||||
    private static final ThingsboardException TOO_MANY_REQUESTS_ERROR = new ThingsboardException("Too many requests", ThingsboardErrorCode.TOO_MANY_REQUESTS);
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean isTwoFaEnabled(TenantId tenantId, UserId userId) {
 | 
			
		||||
@ -91,7 +90,10 @@ public class DefaultTwoFactorAuthService implements TwoFactorAuthService {
 | 
			
		||||
            if (minVerificationCodeSendPeriod != null && minVerificationCodeSendPeriod > 4) {
 | 
			
		||||
                rateLimit = "1:" + minVerificationCodeSendPeriod;
 | 
			
		||||
            }
 | 
			
		||||
            checkRateLimits(user.getId(), accountConfig.getProviderType(), rateLimit, verificationCodeSendingRateLimits);
 | 
			
		||||
            if (!rateLimitService.checkRateLimit(LimitedApi.TWO_FA_VERIFICATION_CODE_SEND,
 | 
			
		||||
                    Pair.of(user.getId(), accountConfig.getProviderType()), rateLimit)) {
 | 
			
		||||
                throw TOO_MANY_REQUESTS_ERROR;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        TwoFaProviderConfig providerConfig = twoFaSettings.getProviderConfig(accountConfig.getProviderType())
 | 
			
		||||
@ -116,7 +118,10 @@ public class DefaultTwoFactorAuthService implements TwoFactorAuthService {
 | 
			
		||||
        PlatformTwoFaSettings twoFaSettings = configManager.getPlatformTwoFaSettings(user.getTenantId(), true)
 | 
			
		||||
                .orElseThrow(() -> PROVIDER_NOT_CONFIGURED_ERROR);
 | 
			
		||||
        if (checkLimits) {
 | 
			
		||||
            checkRateLimits(user.getId(), accountConfig.getProviderType(), twoFaSettings.getVerificationCodeCheckRateLimit(), verificationCodeCheckingRateLimits);
 | 
			
		||||
            if (!rateLimitService.checkRateLimit(LimitedApi.TWO_FA_VERIFICATION_CODE_CHECK,
 | 
			
		||||
                    Pair.of(user.getId(), accountConfig.getProviderType()), twoFaSettings.getVerificationCodeCheckRateLimit())) {
 | 
			
		||||
                throw TOO_MANY_REQUESTS_ERROR;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        TwoFaProviderConfig providerConfig = twoFaSettings.getProviderConfig(accountConfig.getProviderType())
 | 
			
		||||
                .orElseThrow(() -> PROVIDER_NOT_CONFIGURED_ERROR);
 | 
			
		||||
@ -131,43 +136,28 @@ public class DefaultTwoFactorAuthService implements TwoFactorAuthService {
 | 
			
		||||
            try {
 | 
			
		||||
                systemSecurityService.validateTwoFaVerification(user, verificationSuccess, twoFaSettings);
 | 
			
		||||
            } catch (LockedException e) {
 | 
			
		||||
                verificationCodeCheckingRateLimits.remove(user.getId());
 | 
			
		||||
                verificationCodeSendingRateLimits.remove(user.getId());
 | 
			
		||||
                cleanUpRateLimits(user.getId());
 | 
			
		||||
                throw new ThingsboardException(e.getMessage(), ThingsboardErrorCode.AUTHENTICATION);
 | 
			
		||||
            }
 | 
			
		||||
            if (verificationSuccess) {
 | 
			
		||||
                verificationCodeCheckingRateLimits.remove(user.getId());
 | 
			
		||||
                verificationCodeSendingRateLimits.remove(user.getId());
 | 
			
		||||
                cleanUpRateLimits(user.getId());
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return verificationSuccess;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void checkRateLimits(UserId userId, TwoFaProviderType providerType, String rateLimitConfig,
 | 
			
		||||
                                 ConcurrentMap<UserId, ConcurrentMap<TwoFaProviderType, TbRateLimits>> rateLimits) throws ThingsboardException {
 | 
			
		||||
        if (StringUtils.isNotEmpty(rateLimitConfig)) {
 | 
			
		||||
            ConcurrentMap<TwoFaProviderType, TbRateLimits> providersRateLimits = rateLimits.computeIfAbsent(userId, i -> new ConcurrentHashMap<>());
 | 
			
		||||
 | 
			
		||||
            TbRateLimits rateLimit = providersRateLimits.get(providerType);
 | 
			
		||||
            if (rateLimit == null || !rateLimit.getConfiguration().equals(rateLimitConfig)) {
 | 
			
		||||
                rateLimit = new TbRateLimits(rateLimitConfig, true);
 | 
			
		||||
                providersRateLimits.put(providerType, rateLimit);
 | 
			
		||||
            }
 | 
			
		||||
            if (!rateLimit.tryConsume()) {
 | 
			
		||||
                throw new ThingsboardException("Too many requests", ThingsboardErrorCode.TOO_MANY_REQUESTS);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            rateLimits.remove(userId);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TwoFaAccountConfig generateNewAccountConfig(User user, TwoFaProviderType providerType) throws ThingsboardException {
 | 
			
		||||
        TwoFaProviderConfig providerConfig = getTwoFaProviderConfig(user.getTenantId(), providerType);
 | 
			
		||||
        return getTwoFaProvider(providerType).generateNewAccountConfig(user, providerConfig);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void cleanUpRateLimits(UserId userId) {
 | 
			
		||||
        for (TwoFaProviderType providerType : TwoFaProviderType.values()) {
 | 
			
		||||
            rateLimitService.cleanUp(LimitedApi.TWO_FA_VERIFICATION_CODE_SEND, Pair.of(userId, providerType));
 | 
			
		||||
            rateLimitService.cleanUp(LimitedApi.TWO_FA_VERIFICATION_CODE_CHECK, Pair.of(userId, providerType));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TwoFaProviderConfig getTwoFaProviderConfig(TenantId tenantId, TwoFaProviderType providerType) throws ThingsboardException {
 | 
			
		||||
        return configManager.getPlatformTwoFaSettings(tenantId, true)
 | 
			
		||||
 | 
			
		||||
@ -32,8 +32,8 @@ import org.thingsboard.server.common.data.util.ThrowingRunnable;
 | 
			
		||||
import org.thingsboard.server.dao.exception.DataValidationException;
 | 
			
		||||
import org.thingsboard.server.dao.relation.RelationService;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.service.entitiy.TbNotificationEntityService;
 | 
			
		||||
import org.thingsboard.server.service.sync.ie.exporting.EntityExportService;
 | 
			
		||||
import org.thingsboard.server.service.sync.ie.exporting.impl.BaseEntityExportService;
 | 
			
		||||
 | 
			
		||||
@ -495,8 +495,8 @@ cache:
 | 
			
		||||
    timeToLiveInMinutes: "${CACHE_SPECS_NOTIFICATION_RULES_TTL:30}"
 | 
			
		||||
    maxSize: "${CACHE_SPECS_NOTIFICATION_RULES_MAX_SIZE:1000}"
 | 
			
		||||
  rateLimits:
 | 
			
		||||
    timeToLiveInMinutes: "${CACHE_SPECS_RATE_LIMITS_TTL:60}"
 | 
			
		||||
    maxSize: "${CACHE_SPECS_RATE_LIMITS_MAX_SIZE:100000}"
 | 
			
		||||
    timeToLiveInMinutes: "${CACHE_SPECS_RATE_LIMITS_TTL:120}"
 | 
			
		||||
    maxSize: "${CACHE_SPECS_RATE_LIMITS_MAX_SIZE:200000}"
 | 
			
		||||
 | 
			
		||||
#Disable this because it is not required.
 | 
			
		||||
spring.data.redis.repositories.enabled: false
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,110 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.limits;
 | 
			
		||||
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.junit.runner.RunWith;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
import org.mockito.junit.MockitoJUnitRunner;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationRuleId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.DefaultRateLimitService;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
import static org.junit.Assert.assertFalse;
 | 
			
		||||
import static org.junit.Assert.assertTrue;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.Mockito.reset;
 | 
			
		||||
import static org.mockito.Mockito.when;
 | 
			
		||||
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
public class RateLimitServiceTest {
 | 
			
		||||
 | 
			
		||||
    private RateLimitService rateLimitService;
 | 
			
		||||
    private TbTenantProfileCache tenantProfileCache;
 | 
			
		||||
    private TenantId tenantId;
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void beforeEach() {
 | 
			
		||||
        tenantProfileCache = Mockito.mock(TbTenantProfileCache.class);
 | 
			
		||||
        rateLimitService = new DefaultRateLimitService(tenantProfileCache, 60, 100);
 | 
			
		||||
        tenantId = new TenantId(UUID.randomUUID());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testRateLimits() {
 | 
			
		||||
        int max = 2;
 | 
			
		||||
        String rateLimit = max + ":600";
 | 
			
		||||
        DefaultTenantProfileConfiguration profileConfiguration = new DefaultTenantProfileConfiguration();
 | 
			
		||||
        profileConfiguration.setTenantEntityExportRateLimit(rateLimit);
 | 
			
		||||
        profileConfiguration.setTenantEntityImportRateLimit(rateLimit);
 | 
			
		||||
        profileConfiguration.setTenantNotificationRequestsRateLimit(rateLimit);
 | 
			
		||||
        profileConfiguration.setTenantNotificationRequestsPerRuleRateLimit(rateLimit);
 | 
			
		||||
        profileConfiguration.setTenantServerRestLimitsConfiguration(rateLimit);
 | 
			
		||||
        profileConfiguration.setCustomerServerRestLimitsConfiguration(rateLimit);
 | 
			
		||||
        profileConfiguration.setWsUpdatesPerSessionRateLimit(rateLimit);
 | 
			
		||||
        profileConfiguration.setCassandraQueryTenantRateLimitsConfiguration(rateLimit);
 | 
			
		||||
        updateTenantProfileConfiguration(profileConfiguration);
 | 
			
		||||
 | 
			
		||||
        for (LimitedApi limitedApi : List.of(
 | 
			
		||||
                LimitedApi.ENTITY_EXPORT,
 | 
			
		||||
                LimitedApi.ENTITY_IMPORT,
 | 
			
		||||
                LimitedApi.NOTIFICATION_REQUESTS,
 | 
			
		||||
                LimitedApi.REST_REQUESTS,
 | 
			
		||||
                LimitedApi.CASSANDRA_QUERIES
 | 
			
		||||
        )) {
 | 
			
		||||
            testRateLimits(limitedApi, max, tenantId);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        CustomerId customerId = new CustomerId(UUID.randomUUID());
 | 
			
		||||
        testRateLimits(LimitedApi.REST_REQUESTS, max, customerId);
 | 
			
		||||
 | 
			
		||||
        NotificationRuleId notificationRuleId = new NotificationRuleId(UUID.randomUUID());
 | 
			
		||||
        testRateLimits(LimitedApi.NOTIFICATION_REQUESTS_PER_RULE, max, notificationRuleId);
 | 
			
		||||
 | 
			
		||||
        String wsSessionId = UUID.randomUUID().toString();
 | 
			
		||||
        testRateLimits(LimitedApi.WS_UPDATES_PER_SESSION, max, wsSessionId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void testRateLimits(LimitedApi limitedApi, int max, Object level) {
 | 
			
		||||
        for (int i = 1; i <= max; i++) {
 | 
			
		||||
            boolean success = rateLimitService.checkRateLimit(limitedApi, tenantId, level);
 | 
			
		||||
            assertTrue(success);
 | 
			
		||||
        }
 | 
			
		||||
        boolean success = rateLimitService.checkRateLimit(limitedApi, tenantId, level);
 | 
			
		||||
        assertFalse(success);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void updateTenantProfileConfiguration(DefaultTenantProfileConfiguration profileConfiguration) {
 | 
			
		||||
        reset(tenantProfileCache);
 | 
			
		||||
        TenantProfile tenantProfile = new TenantProfile();
 | 
			
		||||
        TenantProfileData profileData = new TenantProfileData();
 | 
			
		||||
        profileData.setConfiguration(profileConfiguration);
 | 
			
		||||
        tenantProfile.setProfileData(profileData);
 | 
			
		||||
        when(tenantProfileCache.get(eq(tenantId))).thenReturn(tenantProfile);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -72,9 +72,9 @@ import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersion
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationRequestService;
 | 
			
		||||
import org.thingsboard.server.dao.rule.RuleChainService;
 | 
			
		||||
import org.thingsboard.server.dao.service.DaoSqlTest;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,31 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.exception;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
 | 
			
		||||
public class TenantProfileNotFoundException extends RuntimeException {
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final TenantId tenantId;
 | 
			
		||||
 | 
			
		||||
    public TenantProfileNotFoundException(TenantId tenantId) {
 | 
			
		||||
        super("Profile for tenant with id " + tenantId + " not found");
 | 
			
		||||
        this.tenantId = tenantId;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -24,10 +24,10 @@ import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.dao.entity.EntityService;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
 | 
			
		||||
import org.thingsboard.server.dao.util.AsyncTaskContext;
 | 
			
		||||
import org.thingsboard.server.dao.util.NoSqlAnyDao;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
 | 
			
		||||
@ -52,9 +52,9 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu
 | 
			
		||||
            @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
 | 
			
		||||
            @Autowired StatsFactory statsFactory,
 | 
			
		||||
            @Autowired EntityService entityService,
 | 
			
		||||
            @Autowired TbTenantProfileCache tenantProfileCache) {
 | 
			
		||||
            @Autowired RateLimitService rateLimitService) {
 | 
			
		||||
        super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory,
 | 
			
		||||
                entityService, tenantProfileCache, printTenantNames);
 | 
			
		||||
                entityService, rateLimitService, printTenantNames);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
 | 
			
		||||
 | 
			
		||||
@ -24,10 +24,10 @@ import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.dao.entity.EntityService;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
 | 
			
		||||
import org.thingsboard.server.dao.util.AsyncTaskContext;
 | 
			
		||||
import org.thingsboard.server.dao.util.NoSqlAnyDao;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
 | 
			
		||||
@ -52,9 +52,9 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec
 | 
			
		||||
            @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
 | 
			
		||||
            @Autowired StatsFactory statsFactory,
 | 
			
		||||
            @Autowired EntityService entityService,
 | 
			
		||||
            @Autowired TbTenantProfileCache tenantProfileCache) {
 | 
			
		||||
            @Autowired RateLimitService rateLimitService) {
 | 
			
		||||
        super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory,
 | 
			
		||||
                entityService, tenantProfileCache, printTenantNames);
 | 
			
		||||
                entityService, rateLimitService, printTenantNames);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
 | 
			
		||||
 | 
			
		||||
@ -30,24 +30,21 @@ import com.google.common.util.concurrent.SettableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
 | 
			
		||||
import org.thingsboard.server.common.stats.DefaultCounter;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsCounter;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsType;
 | 
			
		||||
import org.thingsboard.server.dao.entity.EntityService;
 | 
			
		||||
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.BlockingQueue;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.LinkedBlockingDeque;
 | 
			
		||||
@ -73,7 +70,6 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
 | 
			
		||||
    private final ScheduledExecutorService timeoutExecutor;
 | 
			
		||||
    private final int concurrencyLimit;
 | 
			
		||||
    private final int printQueriesFreq;
 | 
			
		||||
    private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    private final AtomicInteger printQueriesIdx = new AtomicInteger(0);
 | 
			
		||||
 | 
			
		||||
@ -81,14 +77,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
 | 
			
		||||
    protected final BufferedRateExecutorStats stats;
 | 
			
		||||
 | 
			
		||||
    private final EntityService entityService;
 | 
			
		||||
    private final TbTenantProfileCache tenantProfileCache;
 | 
			
		||||
    private final RateLimitService rateLimitService;
 | 
			
		||||
 | 
			
		||||
    private final boolean printTenantNames;
 | 
			
		||||
    private final Map<TenantId, String> tenantNamesCache = new HashMap<>();
 | 
			
		||||
 | 
			
		||||
    public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads,
 | 
			
		||||
                                        int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory,
 | 
			
		||||
                                        EntityService entityService, TbTenantProfileCache tenantProfileCache, boolean printTenantNames) {
 | 
			
		||||
                                        EntityService entityService, RateLimitService rateLimitService, boolean printTenantNames) {
 | 
			
		||||
        this.maxWaitTime = maxWaitTime;
 | 
			
		||||
        this.pollMs = pollMs;
 | 
			
		||||
        this.concurrencyLimit = concurrencyLimit;
 | 
			
		||||
@ -102,7 +98,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
 | 
			
		||||
        this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0));
 | 
			
		||||
 | 
			
		||||
        this.entityService = entityService;
 | 
			
		||||
        this.tenantProfileCache = tenantProfileCache;
 | 
			
		||||
        this.rateLimitService = rateLimitService;
 | 
			
		||||
        this.printTenantNames = printTenantNames;
 | 
			
		||||
 | 
			
		||||
        for (int i = 0; i < dispatcherThreads; i++) {
 | 
			
		||||
@ -116,28 +112,16 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
 | 
			
		||||
        F result = wrap(task, settableFuture);
 | 
			
		||||
 | 
			
		||||
        boolean perTenantLimitReached = false;
 | 
			
		||||
 | 
			
		||||
        var tenantProfileConfiguration =
 | 
			
		||||
                (task.getTenantId() != null && !TenantId.SYS_TENANT_ID.equals(task.getTenantId()))
 | 
			
		||||
                        ? tenantProfileCache.get(task.getTenantId()).getDefaultProfileConfiguration()
 | 
			
		||||
                        : null;
 | 
			
		||||
        if (tenantProfileConfiguration != null &&
 | 
			
		||||
                StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) {
 | 
			
		||||
            if (task.getTenantId() == null) {
 | 
			
		||||
                log.info("Invalid task received: {}", task);
 | 
			
		||||
            } else if (!task.getTenantId().isNullUid()) {
 | 
			
		||||
                TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(
 | 
			
		||||
                        task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())
 | 
			
		||||
                );
 | 
			
		||||
                if (!rateLimits.tryConsume()) {
 | 
			
		||||
                    stats.incrementRateLimitedTenant(task.getTenantId());
 | 
			
		||||
                    stats.getTotalRateLimited().increment();
 | 
			
		||||
                    settableFuture.setException(new TenantRateLimitException());
 | 
			
		||||
                    perTenantLimitReached = true;
 | 
			
		||||
                }
 | 
			
		||||
        TenantId tenantId = task.getTenantId();
 | 
			
		||||
        if (tenantId != null && !tenantId.isSysTenantId()) {
 | 
			
		||||
            if (!rateLimitService.checkRateLimit(LimitedApi.CASSANDRA_QUERIES, tenantId)) {
 | 
			
		||||
                stats.incrementRateLimitedTenant(tenantId);
 | 
			
		||||
                stats.getTotalRateLimited().increment();
 | 
			
		||||
                settableFuture.setException(new TenantRateLimitException());
 | 
			
		||||
                perTenantLimitReached = true;
 | 
			
		||||
            }
 | 
			
		||||
        } else if (!TenantId.SYS_TENANT_ID.equals(task.getTenantId())) {
 | 
			
		||||
            perTenantLimits.remove(task.getTenantId());
 | 
			
		||||
        } else if (tenantId == null) {
 | 
			
		||||
            log.info("Invalid task received: {}", task);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (!perTenantLimitReached) {
 | 
			
		||||
 | 
			
		||||
@ -13,83 +13,95 @@
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.apiusage.limits;
 | 
			
		||||
package org.thingsboard.server.dao.util.limits;
 | 
			
		||||
 | 
			
		||||
import com.github.benmanes.caffeine.cache.Cache;
 | 
			
		||||
import com.github.benmanes.caffeine.cache.Caffeine;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.TenantProfileNotFoundException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class DefaultRateLimitService implements RateLimitService {
 | 
			
		||||
 | 
			
		||||
    private final TbTenantProfileCache tenantProfileCache;
 | 
			
		||||
    @Value("${cache.rateLimits.timeToLiveInMinutes:60}")
 | 
			
		||||
    private int rateLimitsTtl;
 | 
			
		||||
    @Value("${cache.rateLimits.maxSize:100000}")
 | 
			
		||||
    private int rateLimitsCacheMaxSize;
 | 
			
		||||
 | 
			
		||||
    private Cache<RateLimitKey, TbRateLimits> rateLimits;
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    private void init() {
 | 
			
		||||
        rateLimits = Caffeine.newBuilder()
 | 
			
		||||
    public DefaultRateLimitService(TbTenantProfileCache tenantProfileCache,
 | 
			
		||||
                                   @Value("${cache.rateLimits.timeToLiveInMinutes:120}") int rateLimitsTtl,
 | 
			
		||||
                                   @Value("${cache.rateLimits.maxSize:200000}") int rateLimitsCacheMaxSize) {
 | 
			
		||||
        this.tenantProfileCache = tenantProfileCache;
 | 
			
		||||
        this.rateLimits = Caffeine.newBuilder()
 | 
			
		||||
                .expireAfterAccess(rateLimitsTtl, TimeUnit.MINUTES)
 | 
			
		||||
                .maximumSize(rateLimitsCacheMaxSize)
 | 
			
		||||
                .build();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private final Cache<RateLimitKey, TbRateLimits> rateLimits;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean checkRateLimit(LimitedApi api, TenantId tenantId) {
 | 
			
		||||
        return checkRateLimit(api, tenantId, tenantId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean checkRateLimit(LimitedApi api, TenantId tenantId, EntityId entityId) {
 | 
			
		||||
    public boolean checkRateLimit(LimitedApi api, TenantId tenantId, Object level) {
 | 
			
		||||
        if (tenantId.isSysTenantId()) {
 | 
			
		||||
            return true;
 | 
			
		||||
        }
 | 
			
		||||
        RateLimitKey key = new RateLimitKey(api, entityId);
 | 
			
		||||
        TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
 | 
			
		||||
        if (tenantProfile == null) {
 | 
			
		||||
            throw new TenantProfileNotFoundException(tenantId);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        String rateLimitConfig = tenantProfileCache.get(tenantId).getProfileConfiguration()
 | 
			
		||||
                .map(api::getLimitConfig).orElse(null);
 | 
			
		||||
        String rateLimitConfig = tenantProfile.getProfileConfiguration()
 | 
			
		||||
                .map(profileConfiguration -> api.getLimitConfig(profileConfiguration, level))
 | 
			
		||||
                .orElse(null);
 | 
			
		||||
        return checkRateLimit(api, level, rateLimitConfig);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean checkRateLimit(LimitedApi api, Object level, String rateLimitConfig) {
 | 
			
		||||
        RateLimitKey key = new RateLimitKey(api, level);
 | 
			
		||||
        if (StringUtils.isEmpty(rateLimitConfig)) {
 | 
			
		||||
            rateLimits.invalidate(key);
 | 
			
		||||
            return true;
 | 
			
		||||
        }
 | 
			
		||||
        log.trace("[{}] Checking rate limit for {} ({})", entityId, api, rateLimitConfig);
 | 
			
		||||
        log.trace("[{}] Checking rate limit for {} ({})", level, api, rateLimitConfig);
 | 
			
		||||
 | 
			
		||||
        TbRateLimits rateLimit = rateLimits.asMap().compute(key, (k, limit) -> {
 | 
			
		||||
            if (limit == null || !limit.getConfiguration().equals(rateLimitConfig)) {
 | 
			
		||||
                limit = new TbRateLimits(rateLimitConfig);
 | 
			
		||||
                log.trace("[{}] Created new rate limit bucket for {} ({})", entityId, api, rateLimitConfig);
 | 
			
		||||
                limit = new TbRateLimits(rateLimitConfig, api.isRefillRateLimitIntervally());
 | 
			
		||||
                log.trace("[{}] Created new rate limit bucket for {} ({})", level, api, rateLimitConfig);
 | 
			
		||||
            }
 | 
			
		||||
            return limit;
 | 
			
		||||
        });
 | 
			
		||||
        boolean success = rateLimit.tryConsume();
 | 
			
		||||
        if (!success) {
 | 
			
		||||
            log.debug("[{}] Rate limit exceeded for {} ({})", entityId, api, rateLimitConfig);
 | 
			
		||||
            log.debug("[{}] Rate limit exceeded for {} ({})", level, api, rateLimitConfig);
 | 
			
		||||
        }
 | 
			
		||||
        return success;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void cleanUp(LimitedApi api, Object level) {
 | 
			
		||||
        RateLimitKey key = new RateLimitKey(api, level);
 | 
			
		||||
        rateLimits.invalidate(key);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Data(staticConstructor = "of")
 | 
			
		||||
    private static class RateLimitKey {
 | 
			
		||||
        private final LimitedApi api;
 | 
			
		||||
        private final EntityId entityId;
 | 
			
		||||
        private final Object level;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,67 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.util.limits;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
			
		||||
 | 
			
		||||
import java.util.function.BiFunction;
 | 
			
		||||
import java.util.function.Function;
 | 
			
		||||
 | 
			
		||||
public enum LimitedApi {
 | 
			
		||||
 | 
			
		||||
    ENTITY_EXPORT(DefaultTenantProfileConfiguration::getTenantEntityExportRateLimit),
 | 
			
		||||
    ENTITY_IMPORT(DefaultTenantProfileConfiguration::getTenantEntityImportRateLimit),
 | 
			
		||||
    NOTIFICATION_REQUESTS(DefaultTenantProfileConfiguration::getTenantNotificationRequestsRateLimit),
 | 
			
		||||
    NOTIFICATION_REQUESTS_PER_RULE(DefaultTenantProfileConfiguration::getTenantNotificationRequestsPerRuleRateLimit),
 | 
			
		||||
    REST_REQUESTS((profileConfiguration, level) -> ((EntityId) level).getEntityType() == EntityType.TENANT ?
 | 
			
		||||
            profileConfiguration.getTenantServerRestLimitsConfiguration() :
 | 
			
		||||
            profileConfiguration.getCustomerServerRestLimitsConfiguration()),
 | 
			
		||||
    WS_UPDATES_PER_SESSION(DefaultTenantProfileConfiguration::getWsUpdatesPerSessionRateLimit),
 | 
			
		||||
    CASSANDRA_QUERIES(DefaultTenantProfileConfiguration::getCassandraQueryTenantRateLimitsConfiguration),
 | 
			
		||||
    PASSWORD_RESET(true),
 | 
			
		||||
    TWO_FA_VERIFICATION_CODE_SEND(true),
 | 
			
		||||
    TWO_FA_VERIFICATION_CODE_CHECK(true);
 | 
			
		||||
 | 
			
		||||
    private final BiFunction<DefaultTenantProfileConfiguration, Object, String> configExtractor;
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final boolean refillRateLimitIntervally;
 | 
			
		||||
 | 
			
		||||
    LimitedApi(Function<DefaultTenantProfileConfiguration, String> configExtractor) {
 | 
			
		||||
        this((profileConfiguration, level) -> configExtractor.apply(profileConfiguration));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    LimitedApi(BiFunction<DefaultTenantProfileConfiguration, Object, String> configExtractor) {
 | 
			
		||||
        this.configExtractor = configExtractor;
 | 
			
		||||
        this.refillRateLimitIntervally = false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    LimitedApi(boolean refillRateLimitIntervally) {
 | 
			
		||||
        this.configExtractor = null;
 | 
			
		||||
        this.refillRateLimitIntervally = refillRateLimitIntervally;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public String getLimitConfig(DefaultTenantProfileConfiguration profileConfiguration, Object level) {
 | 
			
		||||
        if (configExtractor != null) {
 | 
			
		||||
            return configExtractor.apply(profileConfiguration, level);
 | 
			
		||||
        } else {
 | 
			
		||||
            throw new IllegalArgumentException("No tenant profile config for " + name() + " rate limits");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -13,15 +13,18 @@
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.apiusage.limits;
 | 
			
		||||
package org.thingsboard.server.dao.util.limits;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
 | 
			
		||||
public interface RateLimitService {
 | 
			
		||||
 | 
			
		||||
    boolean checkRateLimit(LimitedApi api, TenantId tenantId);
 | 
			
		||||
 | 
			
		||||
    boolean checkRateLimit(LimitedApi api, TenantId tenantId, EntityId entityId);
 | 
			
		||||
    boolean checkRateLimit(LimitedApi api, TenantId tenantId, Object level);
 | 
			
		||||
 | 
			
		||||
    boolean checkRateLimit(LimitedApi api, Object level, String rateLimitConfig);
 | 
			
		||||
 | 
			
		||||
    void cleanUp(LimitedApi api, Object level);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user