New Cassandra rate limits: separated for Read and Write + Core and Rule Engine

This commit is contained in:
dshvaika 2025-05-20 16:19:44 +03:00
parent a7a7fd0efe
commit 8af37beb4a
15 changed files with 407 additions and 37 deletions

View File

@ -52,7 +52,7 @@ public class RateLimitServiceTest {
public void beforeEach() { public void beforeEach() {
tenantProfileCache = Mockito.mock(DefaultTbTenantProfileCache.class); tenantProfileCache = Mockito.mock(DefaultTbTenantProfileCache.class);
rateLimitService = new DefaultRateLimitService(tenantProfileCache, mock(NotificationRuleProcessor.class), 60, 100); rateLimitService = new DefaultRateLimitService(tenantProfileCache, mock(NotificationRuleProcessor.class), 60, 100);
tenantId = new TenantId(UUID.randomUUID()); tenantId = TenantId.fromUUID(UUID.randomUUID());
} }
@Test @Test
@ -67,7 +67,10 @@ public class RateLimitServiceTest {
profileConfiguration.setTenantServerRestLimitsConfiguration(rateLimit); profileConfiguration.setTenantServerRestLimitsConfiguration(rateLimit);
profileConfiguration.setCustomerServerRestLimitsConfiguration(rateLimit); profileConfiguration.setCustomerServerRestLimitsConfiguration(rateLimit);
profileConfiguration.setWsUpdatesPerSessionRateLimit(rateLimit); profileConfiguration.setWsUpdatesPerSessionRateLimit(rateLimit);
profileConfiguration.setCassandraQueryTenantRateLimitsConfiguration(rateLimit); profileConfiguration.setCassandraReadQueryTenantCoreRateLimits(rateLimit);
profileConfiguration.setCassandraWriteQueryTenantCoreRateLimits(rateLimit);
profileConfiguration.setCassandraReadQueryTenantRuleEngineRateLimits(rateLimit);
profileConfiguration.setCassandraWriteQueryTenantRuleEngineRateLimits(rateLimit);
profileConfiguration.setEdgeEventRateLimits(rateLimit); profileConfiguration.setEdgeEventRateLimits(rateLimit);
profileConfiguration.setEdgeEventRateLimitsPerEdge(rateLimit); profileConfiguration.setEdgeEventRateLimitsPerEdge(rateLimit);
profileConfiguration.setEdgeUplinkMessagesRateLimits(rateLimit); profileConfiguration.setEdgeUplinkMessagesRateLimits(rateLimit);
@ -79,7 +82,10 @@ public class RateLimitServiceTest {
LimitedApi.ENTITY_IMPORT, LimitedApi.ENTITY_IMPORT,
LimitedApi.NOTIFICATION_REQUESTS, LimitedApi.NOTIFICATION_REQUESTS,
LimitedApi.REST_REQUESTS_PER_CUSTOMER, LimitedApi.REST_REQUESTS_PER_CUSTOMER,
LimitedApi.CASSANDRA_QUERIES, LimitedApi.CASSANDRA_READ_QUERIES_CORE,
LimitedApi.CASSANDRA_WRITE_QUERIES_CORE,
LimitedApi.CASSANDRA_READ_QUERIES_RULE_ENGINE,
LimitedApi.CASSANDRA_WRITE_QUERIES_RULE_ENGINE,
LimitedApi.EDGE_EVENTS, LimitedApi.EDGE_EVENTS,
LimitedApi.EDGE_EVENTS_PER_EDGE, LimitedApi.EDGE_EVENTS_PER_EDGE,
LimitedApi.EDGE_UPLINK_MESSAGES, LimitedApi.EDGE_UPLINK_MESSAGES,

View File

@ -30,7 +30,18 @@ public enum LimitedApi {
REST_REQUESTS_PER_TENANT(DefaultTenantProfileConfiguration::getTenantServerRestLimitsConfiguration, "REST API requests", true), REST_REQUESTS_PER_TENANT(DefaultTenantProfileConfiguration::getTenantServerRestLimitsConfiguration, "REST API requests", true),
REST_REQUESTS_PER_CUSTOMER(DefaultTenantProfileConfiguration::getCustomerServerRestLimitsConfiguration, "REST API requests per customer", false), REST_REQUESTS_PER_CUSTOMER(DefaultTenantProfileConfiguration::getCustomerServerRestLimitsConfiguration, "REST API requests per customer", false),
WS_UPDATES_PER_SESSION(DefaultTenantProfileConfiguration::getWsUpdatesPerSessionRateLimit, "WS updates per session", true), WS_UPDATES_PER_SESSION(DefaultTenantProfileConfiguration::getWsUpdatesPerSessionRateLimit, "WS updates per session", true),
CASSANDRA_QUERIES(DefaultTenantProfileConfiguration::getCassandraQueryTenantRateLimitsConfiguration, "Cassandra queries", true), CASSANDRA_WRITE_QUERIES_CORE(DefaultTenantProfileConfiguration::getCassandraReadQueryTenantCoreRateLimits, "Rest API and WS telemetry read queries", true),
CASSANDRA_READ_QUERIES_CORE(DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantCoreRateLimits, "Rest API and WS telemetry write queries", true),
CASSANDRA_WRITE_QUERIES_RULE_ENGINE(DefaultTenantProfileConfiguration::getCassandraReadQueryTenantRuleEngineRateLimits, "Rule Engine telemetry read queries", true),
CASSANDRA_READ_QUERIES_RULE_ENGINE(DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits, "Rule Engine telemetry write queries", true),
CASSANDRA_READ_QUERIES_MONOLITH(
LimitedApiUtil.merge(
DefaultTenantProfileConfiguration::getCassandraReadQueryTenantCoreRateLimits,
DefaultTenantProfileConfiguration::getCassandraReadQueryTenantRuleEngineRateLimits), "Telemetry read queries", true),
CASSANDRA_WRITE_QUERIES_MONOLITH(
LimitedApiUtil.merge(
DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantCoreRateLimits,
DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits), "Telemetry write queries", true),
EDGE_EVENTS(DefaultTenantProfileConfiguration::getEdgeEventRateLimits, "Edge events", true), EDGE_EVENTS(DefaultTenantProfileConfiguration::getEdgeEventRateLimits, "Edge events", true),
EDGE_EVENTS_PER_EDGE(DefaultTenantProfileConfiguration::getEdgeEventRateLimitsPerEdge, "Edge events per edge", false), EDGE_EVENTS_PER_EDGE(DefaultTenantProfileConfiguration::getEdgeEventRateLimitsPerEdge, "Edge events per edge", false),
EDGE_UPLINK_MESSAGES(DefaultTenantProfileConfiguration::getEdgeUplinkMessagesRateLimits, "Edge uplink messages", true), EDGE_UPLINK_MESSAGES(DefaultTenantProfileConfiguration::getEdgeUplinkMessagesRateLimits, "Edge uplink messages", true),

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.limit;
public record LimitedApiEntry(long capacity, long durationSeconds) {
public static LimitedApiEntry parse(String s) {
String[] parts = s.split(":");
return new LimitedApiEntry(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
}
public double rps() {
return (double) capacity / durationSeconds;
}
@Override
public String toString() {
return capacity + ":" + durationSeconds;
}
}

View File

@ -0,0 +1,67 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.limit;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public class LimitedApiUtil {
public static List<LimitedApiEntry> parseConfig(String config) {
if (config == null || config.isEmpty()) {
return Collections.emptyList();
}
return Arrays.stream(config.split(","))
.map(LimitedApiEntry::parse)
.toList();
}
public static Function<DefaultTenantProfileConfiguration, String> merge(
Function<DefaultTenantProfileConfiguration, String> configExtractor1,
Function<DefaultTenantProfileConfiguration, String> configExtractor2) {
return config -> {
String config1 = configExtractor1.apply(config);
String config2 = configExtractor2.apply(config);
return LimitedApiUtil.mergeStrConfigs(config1, config2); // merges the configs
};
}
private static String mergeStrConfigs(String firstConfig, String secondConfig) {
List<LimitedApiEntry> all = new ArrayList<>();
all.addAll(parseConfig(firstConfig));
all.addAll(parseConfig(secondConfig));
Map<Long, Long> merged = new HashMap<>();
for (LimitedApiEntry entry : all) {
merged.merge(entry.durationSeconds(), entry.capacity(), Long::sum);
}
return merged.entrySet().stream()
.sorted(Map.Entry.comparingByKey()) // optional: sort by duration
.map(e -> e.getValue() + ":" + e.getKey())
.collect(Collectors.joining(","));
}
}

View File

@ -121,7 +121,11 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
private long maxWsSubscriptionsPerPublicUser; private long maxWsSubscriptionsPerPublicUser;
private String wsUpdatesPerSessionRateLimit; private String wsUpdatesPerSessionRateLimit;
private String cassandraQueryTenantRateLimitsConfiguration; private String cassandraReadQueryTenantCoreRateLimits;
private String cassandraWriteQueryTenantCoreRateLimits;
private String cassandraReadQueryTenantRuleEngineRateLimits;
private String cassandraWriteQueryTenantRuleEngineRateLimits;
private String edgeEventRateLimits; private String edgeEventRateLimits;
private String edgeEventRateLimitsPerEdge; private String edgeEventRateLimitsPerEdge;

View File

@ -0,0 +1,113 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.limit;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import java.util.List;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
class LimitedApiUtilTest {
@Test
@DisplayName("LimitedApiUtil should parse single entry correctly")
void testParseSingleEntry() {
List<LimitedApiEntry> entries = LimitedApiUtil.parseConfig("100:60");
assertThat(entries).hasSize(1);
assertThat(entries.get(0).capacity()).isEqualTo(100);
assertThat(entries.get(0).durationSeconds()).isEqualTo(60);
}
@Test
@DisplayName("LimitedApiUtil should parse multiple entries correctly")
void testParseMultipleEntries() {
List<LimitedApiEntry> entries = LimitedApiUtil.parseConfig("100:60,200:30");
assertThat(entries).hasSize(2);
assertThat(entries.get(0).capacity()).isEqualTo(100);
assertThat(entries.get(0).durationSeconds()).isEqualTo(60);
assertThat(entries.get(1).capacity()).isEqualTo(200);
assertThat(entries.get(1).durationSeconds()).isEqualTo(30);
}
@Test
@DisplayName("LimitedApiUtil should return empty list for null or empty config")
void testParseEmptyConfig() {
assertThat(LimitedApiUtil.parseConfig(null)).isEmpty();
assertThat(LimitedApiUtil.parseConfig("")).isEmpty();
}
@Test
@DisplayName("LimitedApiUtil should merge two configs by summing capacities with same durations")
void testMergeStrConfigs() {
Function<DefaultTenantProfileConfiguration, String> extractor1 = cfg -> "100:60,50:30";
Function<DefaultTenantProfileConfiguration, String> extractor2 = cfg -> "200:60,25:10";
// Fake config instance (not used directly in lambda logic)
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config);
// Should be: 300:60 (100+200), 50:30, 25:10
assertThat(result).isEqualTo("25:10,50:30,300:60");
}
@Test
@DisplayName("LimitedApiUtil should merge configs when one is empty")
void testMergeWithEmptyOne() {
Function<DefaultTenantProfileConfiguration, String> extractor1 = cfg -> "100:60";
Function<DefaultTenantProfileConfiguration, String> extractor2 = cfg -> "";
// Fake config instance (not used directly in lambda logic)
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config);
assertThat(result).isEqualTo("100:60");
}
@Test
@DisplayName("LimitedApiUtil should merge configs when both have distinct durations")
void testMergeWithDistinctDurations() {
Function<DefaultTenantProfileConfiguration, String> extractor1 = cfg -> "100:60";
Function<DefaultTenantProfileConfiguration, String> extractor2 = cfg -> "200:10";
// Fake config instance (not used directly in lambda logic)
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config);
assertThat(result).isEqualTo("200:10,100:60");
}
@Test
@DisplayName("LimitedApiUtil shouldn't have duplicate durations in the same config!")
void testMergeHandlesDuplicatesInSingleConfig() {
Function<DefaultTenantProfileConfiguration, String> extractor1 = cfg -> "100:60,200:60";
Function<DefaultTenantProfileConfiguration, String> extractor2 = cfg -> "";
// Fake config instance (not used directly in lambda logic)
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config);
// 100+200 = 300 for duration 60. Currently possible to save the same "per seconds" config from the UI.
// This must be fixed, so we will merge only two different rate limits.
assertThat(result).isEqualTo("300:60");
}
}

View File

@ -16,13 +16,17 @@
package org.thingsboard.server.common.msg.tools; package org.thingsboard.server.common.msg.tools;
import io.github.bucket4j.Bandwidth; import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.BandwidthBuilder;
import io.github.bucket4j.Bucket; import io.github.bucket4j.Bucket;
import io.github.bucket4j.Refill; import io.github.bucket4j.Refill;
import io.github.bucket4j.local.LocalBucket; import io.github.bucket4j.local.LocalBucket;
import io.github.bucket4j.local.LocalBucketBuilder; import io.github.bucket4j.local.LocalBucketBuilder;
import lombok.Getter; import lombok.Getter;
import org.thingsboard.server.common.data.limit.LimitedApiEntry;
import org.thingsboard.server.common.data.limit.LimitedApiUtil;
import java.time.Duration; import java.time.Duration;
import java.util.List;
/** /**
* Created by ashvayka on 22.10.18. * Created by ashvayka on 22.10.18.
@ -38,20 +42,19 @@ public class TbRateLimits {
} }
public TbRateLimits(String limitsConfiguration, boolean refillIntervally) { public TbRateLimits(String limitsConfiguration, boolean refillIntervally) {
LocalBucketBuilder builder = Bucket.builder(); List<LimitedApiEntry> limitedApiEntries = LimitedApiUtil.parseConfig(limitsConfiguration);
boolean initialized = false; if (limitedApiEntries.isEmpty()) {
for (String limitSrc : limitsConfiguration.split(",")) {
long capacity = Long.parseLong(limitSrc.split(":")[0]);
long duration = Long.parseLong(limitSrc.split(":")[1]);
Refill refill = refillIntervally ? Refill.intervally(capacity, Duration.ofSeconds(duration)) : Refill.greedy(capacity, Duration.ofSeconds(duration));
builder.addLimit(Bandwidth.classic(capacity, refill));
initialized = true;
}
if (initialized) {
bucket = builder.build();
} else {
throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration); throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);
} }
LocalBucketBuilder localBucket = Bucket.builder();
for (LimitedApiEntry entry : limitedApiEntries) {
BandwidthBuilder.BandwidthBuilderRefillStage bandwidthBuilder = Bandwidth.builder().capacity(entry.capacity());
Bandwidth bandwidth = refillIntervally ?
bandwidthBuilder.refillIntervally(entry.capacity(), Duration.ofSeconds(entry.durationSeconds())).build() :
bandwidthBuilder.refillGreedy(entry.capacity(), Duration.ofSeconds(entry.durationSeconds())).build();
localBucket.addLimit(bandwidth);
}
this.bucket = localBucket.build();
this.configuration = limitsConfiguration; this.configuration = limitsConfiguration;
} }

View File

@ -0,0 +1,63 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.tools;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class TbRateLimitsTest {
@Test
@DisplayName("TbRateLimits should construct with single rate limit")
void testSingleLimitConstructor() {
TbRateLimits limits = new TbRateLimits("10:1", false);
assertThat(limits.getConfiguration()).isEqualTo("10:1");
}
@Test
@DisplayName("TbRateLimits should construct with multiple rate limits")
void testMultipleLimitConstructor() {
String config = "10:1,100:10";
TbRateLimits limits = new TbRateLimits(config, false);
assertThat(limits.getConfiguration()).isEqualTo(config);
}
@Test
@DisplayName("TbRateLimits should throw IllegalArgumentException on empty string")
void testEmptyConfigThrows() {
assertThatThrownBy(() -> new TbRateLimits("", false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Failed to parse rate limits configuration: ");
}
@Test
@DisplayName("TbRateLimits should throw NumberFormatException on malformed value")
void testMalformedConfigThrows() {
assertThatThrownBy(() -> new TbRateLimits("not_a_number:second", false))
.isInstanceOf(NumberFormatException.class);
}
@Test
@DisplayName("TbRateLimits should throw ArrayIndexOutOfBoundsException on missing colon")
void testColonMissingThrows() {
assertThatThrownBy(() -> new TbRateLimits("100", false))
.isInstanceOf(ArrayIndexOutOfBoundsException.class);
}
}

View File

@ -78,11 +78,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
} }
} }
log.info("Current Service ID: {}", serviceId); log.info("Current Service ID: {}", serviceId);
if (serviceType.equalsIgnoreCase("monolith")) { serviceTypes = isMonolith() ?
serviceTypes = List.of(ServiceType.values()); List.of(ServiceType.values()) :
} else { Collections.singletonList(ServiceType.of(serviceType));
serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
}
if (!serviceTypes.contains(ServiceType.TB_RULE_ENGINE) || assignedTenantProfiles == null) { if (!serviceTypes.contains(ServiceType.TB_RULE_ENGINE) || assignedTenantProfiles == null) {
assignedTenantProfiles = Collections.emptySet(); assignedTenantProfiles = Collections.emptySet();
} }
@ -113,6 +111,11 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
return serviceInfo; return serviceInfo;
} }
@Override
public boolean isMonolith() {
return serviceType.equalsIgnoreCase("monolith");
}
@Override @Override
public boolean isService(ServiceType serviceType) { public boolean isService(ServiceType serviceType) {
return serviceTypes.contains(serviceType); return serviceTypes.contains(serviceType);

View File

@ -29,6 +29,8 @@ public interface TbServiceInfoProvider {
ServiceInfo getServiceInfo(); ServiceInfo getServiceInfo();
boolean isMonolith();
boolean isService(ServiceType serviceType); boolean isService(ServiceType serviceType);
ServiceInfo generateNewServiceInfoWithCurrentSystemInfo(); ServiceInfo generateNewServiceInfoWithCurrentSystemInfo();

View File

@ -59,6 +59,10 @@
<groupId>org.thingsboard.common</groupId> <groupId>org.thingsboard.common</groupId>
<artifactId>util</artifactId> <artifactId>util</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>queue</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.networknt</groupId> <groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId> <artifactId>json-schema-validator</artifactId>

View File

@ -28,7 +28,9 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
import org.thingsboard.server.dao.util.AsyncTaskContext; import org.thingsboard.server.dao.util.AsyncTaskContext;
import org.thingsboard.server.dao.util.BufferedRateExecutorType;
import org.thingsboard.server.dao.util.NoSqlAnyDao; import org.thingsboard.server.dao.util.NoSqlAnyDao;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
/** /**
* Created by ashvayka on 24.10.18. * Created by ashvayka on 24.10.18.
@ -38,8 +40,6 @@ import org.thingsboard.server.dao.util.NoSqlAnyDao;
@NoSqlAnyDao @NoSqlAnyDao
public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> { public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> {
static final String BUFFER_NAME = "Read";
public CassandraBufferedRateReadExecutor( public CassandraBufferedRateReadExecutor(
@Value("${cassandra.query.buffer_size}") int queueLimit, @Value("${cassandra.query.buffer_size}") int queueLimit,
@Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
@ -51,9 +51,10 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu
@Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq, @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
@Autowired StatsFactory statsFactory, @Autowired StatsFactory statsFactory,
@Autowired EntityService entityService, @Autowired EntityService entityService,
@Autowired RateLimitService rateLimitService) { @Autowired RateLimitService rateLimitService,
@Autowired TbServiceInfoProvider serviceInfoProvider) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory,
entityService, rateLimitService, printTenantNames); entityService, rateLimitService, serviceInfoProvider, printTenantNames);
} }
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
@ -68,8 +69,8 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu
} }
@Override @Override
public String getBufferName() { protected BufferedRateExecutorType getBufferedRateExecutorType() {
return BUFFER_NAME; return BufferedRateExecutorType.READ;
} }
@Override @Override

View File

@ -28,7 +28,9 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
import org.thingsboard.server.dao.util.AsyncTaskContext; import org.thingsboard.server.dao.util.AsyncTaskContext;
import org.thingsboard.server.dao.util.BufferedRateExecutorType;
import org.thingsboard.server.dao.util.NoSqlAnyDao; import org.thingsboard.server.dao.util.NoSqlAnyDao;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
/** /**
* Created by ashvayka on 24.10.18. * Created by ashvayka on 24.10.18.
@ -38,8 +40,6 @@ import org.thingsboard.server.dao.util.NoSqlAnyDao;
@NoSqlAnyDao @NoSqlAnyDao
public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> { public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> {
static final String BUFFER_NAME = "Write";
public CassandraBufferedRateWriteExecutor( public CassandraBufferedRateWriteExecutor(
@Value("${cassandra.query.buffer_size}") int queueLimit, @Value("${cassandra.query.buffer_size}") int queueLimit,
@Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
@ -51,9 +51,10 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec
@Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq, @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
@Autowired StatsFactory statsFactory, @Autowired StatsFactory statsFactory,
@Autowired EntityService entityService, @Autowired EntityService entityService,
@Autowired RateLimitService rateLimitService) { @Autowired RateLimitService rateLimitService,
@Autowired TbServiceInfoProvider serviceInfoProvider) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory,
entityService, rateLimitService, printTenantNames); entityService, rateLimitService, serviceInfoProvider, printTenantNames);
} }
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
@ -68,8 +69,8 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec
} }
@Override @Override
public String getBufferName() { protected BufferedRateExecutorType getBufferedRateExecutorType() {
return BUFFER_NAME; return BufferedRateExecutorType.WRITE;
} }
@Override @Override

View File

@ -34,12 +34,14 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cache.limits.RateLimitService; import org.thingsboard.server.cache.limits.RateLimitService;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsCounter; import org.thingsboard.server.common.stats.StatsCounter;
import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.nosql.CassandraStatementTask; import org.thingsboard.server.dao.nosql.CassandraStatementTask;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -78,13 +80,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
private final EntityService entityService; private final EntityService entityService;
private final RateLimitService rateLimitService; private final RateLimitService rateLimitService;
private final TbServiceInfoProvider serviceInfoProvider;
private final boolean printTenantNames; private final boolean printTenantNames;
private final Map<TenantId, String> tenantNamesCache = new HashMap<>(); private final Map<TenantId, String> tenantNamesCache = new HashMap<>();
public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads,
int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory, int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory,
EntityService entityService, RateLimitService rateLimitService, boolean printTenantNames) { EntityService entityService, RateLimitService rateLimitService, TbServiceInfoProvider serviceInfoProvider, boolean printTenantNames) {
this.maxWaitTime = maxWaitTime; this.maxWaitTime = maxWaitTime;
this.pollMs = pollMs; this.pollMs = pollMs;
this.concurrencyLimit = concurrencyLimit; this.concurrencyLimit = concurrencyLimit;
@ -99,6 +102,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
this.entityService = entityService; this.entityService = entityService;
this.rateLimitService = rateLimitService; this.rateLimitService = rateLimitService;
this.serviceInfoProvider = serviceInfoProvider;
this.printTenantNames = printTenantNames; this.printTenantNames = printTenantNames;
for (int i = 0; i < dispatcherThreads; i++) { for (int i = 0; i < dispatcherThreads; i++) {
@ -114,7 +118,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
boolean perTenantLimitReached = false; boolean perTenantLimitReached = false;
TenantId tenantId = task.getTenantId(); TenantId tenantId = task.getTenantId();
if (tenantId != null && !tenantId.isSysTenantId()) { if (tenantId != null && !tenantId.isSysTenantId()) {
if (!rateLimitService.checkRateLimit(LimitedApi.CASSANDRA_QUERIES, tenantId, tenantId, true)) { if (!rateLimitService.checkRateLimit(getMyLimitedApi(), tenantId, tenantId, true)) {
stats.incrementRateLimitedTenant(tenantId); stats.incrementRateLimitedTenant(tenantId);
stats.getTotalRateLimited().increment(); stats.getTotalRateLimited().increment();
settableFuture.setException(new TenantRateLimitException()); settableFuture.setException(new TenantRateLimitException());
@ -136,6 +140,16 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
return result; return result;
} }
private LimitedApi getMyLimitedApi() {
if (serviceInfoProvider.isMonolith()) {
return getBufferedRateExecutorType().getMonolithLimitedApi();
}
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
return getBufferedRateExecutorType().getRuleEngineLimitedApi();
}
return getBufferedRateExecutorType().getCoreLimitedApi();
}
public void stop() { public void stop() {
if (dispatcherExecutor != null) { if (dispatcherExecutor != null) {
dispatcherExecutor.shutdownNow(); dispatcherExecutor.shutdownNow();
@ -154,7 +168,11 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> taskCtx); protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> taskCtx);
public abstract String getBufferName(); private String getBufferName() {
return getBufferedRateExecutorType().getDisplayName();
}
protected abstract BufferedRateExecutorType getBufferedRateExecutorType();
private void dispatch() { private void dispatch() {
log.info("[{}] Buffered rate executor thread started", getBufferName()); log.info("[{}] Buffered rate executor thread started", getBufferName());

View File

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.util;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.limit.LimitedApi;
@Getter
public enum BufferedRateExecutorType {
READ(LimitedApi.CASSANDRA_READ_QUERIES_CORE, LimitedApi.CASSANDRA_READ_QUERIES_RULE_ENGINE, LimitedApi.CASSANDRA_READ_QUERIES_MONOLITH),
WRITE(LimitedApi.CASSANDRA_WRITE_QUERIES_CORE, LimitedApi.CASSANDRA_WRITE_QUERIES_RULE_ENGINE, LimitedApi.CASSANDRA_WRITE_QUERIES_MONOLITH);
private final LimitedApi coreLimitedApi;
private final LimitedApi ruleEngineLimitedApi;
private final LimitedApi monolithLimitedApi;
private final String displayName = StringUtils.capitalize(name().toLowerCase());
BufferedRateExecutorType(LimitedApi coreLimitedApi, LimitedApi ruleEngineLimitedApi, LimitedApi monolithLimitedApi) {
this.coreLimitedApi = coreLimitedApi;
this.ruleEngineLimitedApi = ruleEngineLimitedApi;
this.monolithLimitedApi = monolithLimitedApi;
}
}