From 49b3081d416ca42f016fe82f13c70ebc26d7d613 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 4 Jun 2025 12:45:17 +0300 Subject: [PATCH] Proper rate limit exception for Cassandra queries --- .../service/ws/DefaultWebSocketService.java | 4 ++-- .../dao/timeseries/BaseTimeseriesService.java | 8 ++++---- .../util/AbstractBufferedRateExecutor.java | 11 ++++++----- .../dao/util/TenantRateLimitException.java | 19 ------------------- 4 files changed, 12 insertions(+), 30 deletions(-) delete mode 100644 dao/src/main/java/org/thingsboard/server/dao/util/TenantRateLimitException.java diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index cbe6663663..283e3baf76 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -36,6 +36,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.exception.RateLimitExceededException; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -52,7 +53,6 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.timeseries.TimeseriesService; -import org.thingsboard.server.dao.util.TenantRateLimitException; import org.thingsboard.server.exception.UnauthorizedException; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -742,7 +742,7 @@ public class DefaultWebSocketService implements WebSocketService { @Override public void onFailure(Throwable e) { - if (e instanceof TenantRateLimitException || e.getCause() instanceof TenantRateLimitException) { + if (e instanceof RateLimitExceededException || e.getCause() instanceof RateLimitExceededException) { log.trace("[{}] Tenant rate limit detected for subscription: [{}]:{}", sessionRef.getSecurityCtx().getTenantId(), entityId, cmd); } else { log.info(FAILED_TO_FETCH_DATA, e); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 9eefcaae1e..cecf4ab587 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -205,8 +205,8 @@ public class BaseTimeseriesService implements TimeseriesService { ListenableFuture dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0); ListenableFuture> versionsFuture = saveLatest ? Futures.allAsList(latestFutures) : Futures.immediateFuture(null); return Futures.whenAllComplete(dpsFuture, versionsFuture).call(() -> { - Integer dataPoints = Futures.getUnchecked(dpsFuture); - List versions = Futures.getUnchecked(versionsFuture); + Integer dataPoints = dpsFuture.get(); + List versions = versionsFuture.get(); return TimeseriesSaveResult.of(dataPoints, versions); }, MoreExecutors.directExecutor()); } @@ -298,13 +298,13 @@ public class BaseTimeseriesService implements TimeseriesService { long interval = query.getInterval(); if (interval < 1) { throw new IncorrectParameterException("Invalid TsKvQuery: 'interval' must be greater than 0, but got " + interval + - ". Please check your query parameters and ensure 'endTs' is greater than 'startTs' or increase 'interval'."); + ". Please check your query parameters and ensure 'endTs' is greater than 'startTs' or increase 'interval'."); } long step = Math.max(interval, 1000); long intervalCounts = (query.getEndTs() - query.getStartTs()) / step; if (intervalCounts > maxTsIntervals || intervalCounts < 0) { throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " + - "Please increase 'interval' parameter for your query or reduce the time range of the query."); + "Please increase 'interval' parameter for your query or reduce the time range of the query."); } } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index cbcf3e81ec..4d691db31d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cache.limits.RateLimitService; +import org.thingsboard.server.common.data.exception.RateLimitExceededException; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.msg.queue.ServiceType; @@ -66,7 +67,7 @@ public abstract class AbstractBufferedRateExecutor> queue; private final ExecutorService dispatcherExecutor; private final ExecutorService callbackExecutor; @@ -124,7 +125,7 @@ public abstract class AbstractBufferedRateExecutor 0 - || rateLimitedTenantsCount > 0 - || concurrencyLevel.get() > 0 - || stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0) + || rateLimitedTenantsCount > 0 + || concurrencyLevel.get() > 0 + || stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0) ) { StringBuilder statsBuilder = new StringBuilder(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/TenantRateLimitException.java b/dao/src/main/java/org/thingsboard/server/dao/util/TenantRateLimitException.java deleted file mode 100644 index 3d79af980d..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/util/TenantRateLimitException.java +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.util; - -public class TenantRateLimitException extends Exception { -}