Proper rate limit exception for Cassandra queries
This commit is contained in:
parent
a736f5d212
commit
49b3081d41
@ -36,6 +36,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
|
|||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.TenantProfile;
|
import org.thingsboard.server.common.data.TenantProfile;
|
||||||
|
import org.thingsboard.server.common.data.exception.RateLimitExceededException;
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.id.CustomerId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
@ -52,7 +53,6 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
|||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
import org.thingsboard.server.dao.util.TenantRateLimitException;
|
|
||||||
import org.thingsboard.server.exception.UnauthorizedException;
|
import org.thingsboard.server.exception.UnauthorizedException;
|
||||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
@ -742,7 +742,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable e) {
|
public void onFailure(Throwable e) {
|
||||||
if (e instanceof TenantRateLimitException || e.getCause() instanceof TenantRateLimitException) {
|
if (e instanceof RateLimitExceededException || e.getCause() instanceof RateLimitExceededException) {
|
||||||
log.trace("[{}] Tenant rate limit detected for subscription: [{}]:{}", sessionRef.getSecurityCtx().getTenantId(), entityId, cmd);
|
log.trace("[{}] Tenant rate limit detected for subscription: [{}]:{}", sessionRef.getSecurityCtx().getTenantId(), entityId, cmd);
|
||||||
} else {
|
} else {
|
||||||
log.info(FAILED_TO_FETCH_DATA, e);
|
log.info(FAILED_TO_FETCH_DATA, e);
|
||||||
|
|||||||
@ -205,8 +205,8 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
ListenableFuture<Integer> dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0);
|
ListenableFuture<Integer> dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0);
|
||||||
ListenableFuture<List<Long>> versionsFuture = saveLatest ? Futures.allAsList(latestFutures) : Futures.immediateFuture(null);
|
ListenableFuture<List<Long>> versionsFuture = saveLatest ? Futures.allAsList(latestFutures) : Futures.immediateFuture(null);
|
||||||
return Futures.whenAllComplete(dpsFuture, versionsFuture).call(() -> {
|
return Futures.whenAllComplete(dpsFuture, versionsFuture).call(() -> {
|
||||||
Integer dataPoints = Futures.getUnchecked(dpsFuture);
|
Integer dataPoints = dpsFuture.get();
|
||||||
List<Long> versions = Futures.getUnchecked(versionsFuture);
|
List<Long> versions = versionsFuture.get();
|
||||||
return TimeseriesSaveResult.of(dataPoints, versions);
|
return TimeseriesSaveResult.of(dataPoints, versions);
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
@ -298,13 +298,13 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
long interval = query.getInterval();
|
long interval = query.getInterval();
|
||||||
if (interval < 1) {
|
if (interval < 1) {
|
||||||
throw new IncorrectParameterException("Invalid TsKvQuery: 'interval' must be greater than 0, but got " + interval +
|
throw new IncorrectParameterException("Invalid TsKvQuery: 'interval' must be greater than 0, but got " + interval +
|
||||||
". Please check your query parameters and ensure 'endTs' is greater than 'startTs' or increase 'interval'.");
|
". Please check your query parameters and ensure 'endTs' is greater than 'startTs' or increase 'interval'.");
|
||||||
}
|
}
|
||||||
long step = Math.max(interval, 1000);
|
long step = Math.max(interval, 1000);
|
||||||
long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
|
long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
|
||||||
if (intervalCounts > maxTsIntervals || intervalCounts < 0) {
|
if (intervalCounts > maxTsIntervals || intervalCounts < 0) {
|
||||||
throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " +
|
throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " +
|
||||||
"Please increase 'interval' parameter for your query or reduce the time range of the query.");
|
"Please increase 'interval' parameter for your query or reduce the time range of the query.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
import org.thingsboard.server.cache.limits.RateLimitService;
|
import org.thingsboard.server.cache.limits.RateLimitService;
|
||||||
|
import org.thingsboard.server.common.data.exception.RateLimitExceededException;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.limit.LimitedApi;
|
import org.thingsboard.server.common.data.limit.LimitedApi;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
@ -66,7 +67,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
|
|
||||||
private final long maxWaitTime;
|
private final long maxWaitTime;
|
||||||
private final long pollMs;
|
private final long pollMs;
|
||||||
private final String bufferName;
|
private final String bufferName;
|
||||||
private final BlockingQueue<AsyncTaskContext<T, V>> queue;
|
private final BlockingQueue<AsyncTaskContext<T, V>> queue;
|
||||||
private final ExecutorService dispatcherExecutor;
|
private final ExecutorService dispatcherExecutor;
|
||||||
private final ExecutorService callbackExecutor;
|
private final ExecutorService callbackExecutor;
|
||||||
@ -124,7 +125,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
if (!rateLimitService.checkRateLimit(myLimitedApi, tenantId, tenantId, true)) {
|
if (!rateLimitService.checkRateLimit(myLimitedApi, tenantId, tenantId, true)) {
|
||||||
stats.incrementRateLimitedTenant(tenantId);
|
stats.incrementRateLimitedTenant(tenantId);
|
||||||
stats.getTotalRateLimited().increment();
|
stats.getTotalRateLimited().increment();
|
||||||
settableFuture.setException(new TenantRateLimitException());
|
settableFuture.setException(new RateLimitExceededException(myLimitedApi));
|
||||||
perTenantLimitReached = true;
|
perTenantLimitReached = true;
|
||||||
}
|
}
|
||||||
} else if (tenantId == null) {
|
} else if (tenantId == null) {
|
||||||
@ -299,9 +300,9 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
.count();
|
.count();
|
||||||
|
|
||||||
if (queueSize > 0
|
if (queueSize > 0
|
||||||
|| rateLimitedTenantsCount > 0
|
|| rateLimitedTenantsCount > 0
|
||||||
|| concurrencyLevel.get() > 0
|
|| concurrencyLevel.get() > 0
|
||||||
|| stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
|
|| stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
|
||||||
) {
|
) {
|
||||||
StringBuilder statsBuilder = new StringBuilder();
|
StringBuilder statsBuilder = new StringBuilder();
|
||||||
|
|
||||||
|
|||||||
@ -1,19 +0,0 @@
|
|||||||
/**
|
|
||||||
* Copyright © 2016-2025 The Thingsboard Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.thingsboard.server.dao.util;
|
|
||||||
|
|
||||||
public class TenantRateLimitException extends Exception {
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user