fixes after review + new tests added

This commit is contained in:
dshvaika 2025-05-28 12:31:04 +03:00
parent 92e81ad233
commit 45dee234c8
12 changed files with 227 additions and 104 deletions

View File

@ -36,9 +36,11 @@ import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.common.data.validation.RateLimit;
import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueCallback;
import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -314,6 +316,42 @@ public class TenantProfileControllerTest extends AbstractControllerTest {
Assert.assertEquals(1, pageData.getTotalElements()); Assert.assertEquals(1, pageData.getTotalElements());
} }
@Test
public void testRateLimitValidationAllFields() throws Exception {
loginSysAdmin();
Mockito.reset(tbClusterService);
List<String> failedFields = new ArrayList<>();
for (Field field : DefaultTenantProfileConfiguration.class.getDeclaredFields()) {
RateLimit rateLimit = field.getAnnotation(RateLimit.class);
if (rateLimit == null) continue;
String fieldName = field.getName();
String expectedLabel = rateLimit.fieldName();
TenantProfile tenantProfile = createTenantProfile("Invalid RateLimit - " + fieldName);
DefaultTenantProfileConfiguration config = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
field.setAccessible(true);
field.set(config, "10:1,10:1"); // Set invalid duplicate value
try {
doPost("/api/tenantProfile", tenantProfile)
.andExpect(status().isBadRequest())
.andExpect(statusReason(containsString(expectedLabel + " rate limit has duplicate 'Per seconds' configuration.")));
} catch (AssertionError e) {
failedFields.add(fieldName + " (label: " + expectedLabel + ")");
}
}
if (!failedFields.isEmpty()) {
throw new AssertionError("RateLimit validation failed for fields: " + String.join(", ", failedFields));
}
testBroadcastEntityStateChangeEventNeverTenantProfile();
}
private TenantProfile createTenantProfile(String name) { private TenantProfile createTenantProfile(String name) {
TenantProfile tenantProfile = new TenantProfile(); TenantProfile tenantProfile = new TenantProfile();
tenantProfile.setName(name); tenantProfile.setName(name);

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileCon
import java.util.Optional; import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
@Getter
public enum LimitedApi { public enum LimitedApi {
ENTITY_EXPORT(DefaultTenantProfileConfiguration::getTenantEntityExportRateLimit, "entity version creation", true), ENTITY_EXPORT(DefaultTenantProfileConfiguration::getTenantEntityExportRateLimit, "entity version creation", true),
@ -35,11 +36,11 @@ public enum LimitedApi {
CASSANDRA_WRITE_QUERIES_RULE_ENGINE(DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits, "Rule Engine telemetry Cassandra write queries", true), CASSANDRA_WRITE_QUERIES_RULE_ENGINE(DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits, "Rule Engine telemetry Cassandra write queries", true),
CASSANDRA_READ_QUERIES_RULE_ENGINE(DefaultTenantProfileConfiguration::getCassandraReadQueryTenantRuleEngineRateLimits, "Rule Engine telemetry Cassandra read queries", true), CASSANDRA_READ_QUERIES_RULE_ENGINE(DefaultTenantProfileConfiguration::getCassandraReadQueryTenantRuleEngineRateLimits, "Rule Engine telemetry Cassandra read queries", true),
CASSANDRA_READ_QUERIES_MONOLITH( CASSANDRA_READ_QUERIES_MONOLITH(
LimitedApiUtil.merge( RateLimitUtil.merge(
DefaultTenantProfileConfiguration::getCassandraReadQueryTenantCoreRateLimits, DefaultTenantProfileConfiguration::getCassandraReadQueryTenantCoreRateLimits,
DefaultTenantProfileConfiguration::getCassandraReadQueryTenantRuleEngineRateLimits), "Telemetry read queries", true), DefaultTenantProfileConfiguration::getCassandraReadQueryTenantRuleEngineRateLimits), "Telemetry read queries", true),
CASSANDRA_WRITE_QUERIES_MONOLITH( CASSANDRA_WRITE_QUERIES_MONOLITH(
LimitedApiUtil.merge( RateLimitUtil.merge(
DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantCoreRateLimits, DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantCoreRateLimits,
DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits), "Telemetry write queries", true), DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits), "Telemetry write queries", true),
EDGE_EVENTS(DefaultTenantProfileConfiguration::getEdgeEventRateLimits, "Edge events", true), EDGE_EVENTS(DefaultTenantProfileConfiguration::getEdgeEventRateLimits, "Edge events", true),
@ -58,11 +59,8 @@ public enum LimitedApi {
CALCULATED_FIELD_DEBUG_EVENTS("calculated field debug events", true); CALCULATED_FIELD_DEBUG_EVENTS("calculated field debug events", true);
private final Function<DefaultTenantProfileConfiguration, String> configExtractor; private final Function<DefaultTenantProfileConfiguration, String> configExtractor;
@Getter
private final boolean perTenant; private final boolean perTenant;
@Getter
private final boolean refillRateLimitIntervally; private final boolean refillRateLimitIntervally;
@Getter
private final String label; private final String label;
LimitedApi(Function<DefaultTenantProfileConfiguration, String> configExtractor, String label, boolean perTenant) { LimitedApi(Function<DefaultTenantProfileConfiguration, String> configExtractor, String label, boolean perTenant) {

View File

@ -15,11 +15,11 @@
*/ */
package org.thingsboard.server.common.data.limit; package org.thingsboard.server.common.data.limit;
public record LimitedApiEntry(long capacity, long durationSeconds) { public record RateLimitEntry(long capacity, long durationSeconds) {
public static LimitedApiEntry parse(String s) { public static RateLimitEntry parse(String s) {
String[] parts = s.split(":"); String[] parts = s.split(":");
return new LimitedApiEntry(Long.parseLong(parts[0]), Long.parseLong(parts[1])); return new RateLimitEntry(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
} }
@Override @Override

View File

@ -28,14 +28,14 @@ import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class LimitedApiUtil { public class RateLimitUtil {
public static List<LimitedApiEntry> parseConfig(String config) { public static List<RateLimitEntry> parseConfig(String config) {
if (config == null || config.isEmpty()) { if (config == null || config.isEmpty()) {
return Collections.emptyList(); return Collections.emptyList();
} }
return Arrays.stream(config.split(",")) return Arrays.stream(config.split(","))
.map(LimitedApiEntry::parse) .map(RateLimitEntry::parse)
.toList(); .toList();
} }
@ -45,18 +45,18 @@ public class LimitedApiUtil {
return config -> { return config -> {
String config1 = configExtractor1.apply(config); String config1 = configExtractor1.apply(config);
String config2 = configExtractor2.apply(config); String config2 = configExtractor2.apply(config);
return LimitedApiUtil.mergeStrConfigs(config1, config2); // merges the configs return RateLimitUtil.mergeStrConfigs(config1, config2); // merges the configs
}; };
} }
private static String mergeStrConfigs(String firstConfig, String secondConfig) { private static String mergeStrConfigs(String firstConfig, String secondConfig) {
List<LimitedApiEntry> all = new ArrayList<>(); List<RateLimitEntry> all = new ArrayList<>();
all.addAll(parseConfig(firstConfig)); all.addAll(parseConfig(firstConfig));
all.addAll(parseConfig(secondConfig)); all.addAll(parseConfig(secondConfig));
Map<Long, Long> merged = new HashMap<>(); Map<Long, Long> merged = new HashMap<>();
for (LimitedApiEntry entry : all) { for (RateLimitEntry entry : all) {
merged.merge(entry.durationSeconds(), entry.capacity(), Long::sum); merged.merge(entry.durationSeconds(), entry.capacity(), Long::sum);
} }
@ -67,9 +67,9 @@ public class LimitedApiUtil {
} }
public static boolean isValid(String configStr) { public static boolean isValid(String configStr) {
List<LimitedApiEntry> limitedApiEntries = parseConfig(configStr); List<RateLimitEntry> limitedApiEntries = parseConfig(configStr);
Set<Long> distinctDurations = new HashSet<>(); Set<Long> distinctDurations = new HashSet<>();
for (LimitedApiEntry entry : limitedApiEntries) { for (RateLimitEntry entry : limitedApiEntries) {
if (!distinctDurations.add(entry.durationSeconds())) { if (!distinctDurations.add(entry.durationSeconds())) {
return false; return false;
} }
@ -85,7 +85,7 @@ public class LimitedApiUtil {
Set<Long> distinctDurations = new HashSet<>(); Set<Long> distinctDurations = new HashSet<>();
return parseConfig(configStr).stream() return parseConfig(configStr).stream()
.filter(entry -> distinctDurations.add(entry.durationSeconds())) .filter(entry -> distinctDurations.add(entry.durationSeconds()))
.map(LimitedApiEntry::toString) .map(RateLimitEntry::toString)
.collect(Collectors.joining(",")); .collect(Collectors.joining(","));
} }

View File

@ -24,7 +24,7 @@ import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.TenantProfileType; import org.thingsboard.server.common.data.TenantProfileType;
import org.thingsboard.server.common.data.limit.LimitedApiUtil; import org.thingsboard.server.common.data.limit.RateLimitUtil;
import org.thingsboard.server.common.data.validation.RateLimit; import org.thingsboard.server.common.data.validation.RateLimit;
import java.io.Serial; import java.io.Serial;
@ -238,41 +238,41 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
@Deprecated(forRemoval = true, since = "4.1") @Deprecated(forRemoval = true, since = "4.1")
public void deduplicateRateLimitsConfigs() { public void deduplicateRateLimitsConfigs() {
this.transportTenantMsgRateLimit = LimitedApiUtil.deduplicateByDuration(transportTenantMsgRateLimit); this.transportTenantMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportTenantMsgRateLimit);
this.transportTenantTelemetryMsgRateLimit = LimitedApiUtil.deduplicateByDuration(transportTenantTelemetryMsgRateLimit); this.transportTenantTelemetryMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportTenantTelemetryMsgRateLimit);
this.transportTenantTelemetryDataPointsRateLimit = LimitedApiUtil.deduplicateByDuration(transportTenantTelemetryDataPointsRateLimit); this.transportTenantTelemetryDataPointsRateLimit = RateLimitUtil.deduplicateByDuration(transportTenantTelemetryDataPointsRateLimit);
this.transportDeviceMsgRateLimit = LimitedApiUtil.deduplicateByDuration(transportDeviceMsgRateLimit); this.transportDeviceMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportDeviceMsgRateLimit);
this.transportDeviceTelemetryMsgRateLimit = LimitedApiUtil.deduplicateByDuration(transportDeviceTelemetryMsgRateLimit); this.transportDeviceTelemetryMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportDeviceTelemetryMsgRateLimit);
this.transportDeviceTelemetryDataPointsRateLimit = LimitedApiUtil.deduplicateByDuration(transportDeviceTelemetryDataPointsRateLimit); this.transportDeviceTelemetryDataPointsRateLimit = RateLimitUtil.deduplicateByDuration(transportDeviceTelemetryDataPointsRateLimit);
this.transportGatewayMsgRateLimit = LimitedApiUtil.deduplicateByDuration(transportGatewayMsgRateLimit); this.transportGatewayMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayMsgRateLimit);
this.transportGatewayTelemetryMsgRateLimit = LimitedApiUtil.deduplicateByDuration(transportGatewayTelemetryMsgRateLimit); this.transportGatewayTelemetryMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayTelemetryMsgRateLimit);
this.transportGatewayTelemetryDataPointsRateLimit = LimitedApiUtil.deduplicateByDuration(transportGatewayTelemetryDataPointsRateLimit); this.transportGatewayTelemetryDataPointsRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayTelemetryDataPointsRateLimit);
this.transportGatewayDeviceMsgRateLimit = LimitedApiUtil.deduplicateByDuration(transportGatewayDeviceMsgRateLimit); this.transportGatewayDeviceMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayDeviceMsgRateLimit);
this.transportGatewayDeviceTelemetryMsgRateLimit = LimitedApiUtil.deduplicateByDuration(transportGatewayDeviceTelemetryMsgRateLimit); this.transportGatewayDeviceTelemetryMsgRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayDeviceTelemetryMsgRateLimit);
this.transportGatewayDeviceTelemetryDataPointsRateLimit = LimitedApiUtil.deduplicateByDuration(transportGatewayDeviceTelemetryDataPointsRateLimit); this.transportGatewayDeviceTelemetryDataPointsRateLimit = RateLimitUtil.deduplicateByDuration(transportGatewayDeviceTelemetryDataPointsRateLimit);
this.tenantEntityExportRateLimit = LimitedApiUtil.deduplicateByDuration(tenantEntityExportRateLimit); this.tenantEntityExportRateLimit = RateLimitUtil.deduplicateByDuration(tenantEntityExportRateLimit);
this.tenantEntityImportRateLimit = LimitedApiUtil.deduplicateByDuration(tenantEntityImportRateLimit); this.tenantEntityImportRateLimit = RateLimitUtil.deduplicateByDuration(tenantEntityImportRateLimit);
this.tenantNotificationRequestsRateLimit = LimitedApiUtil.deduplicateByDuration(tenantNotificationRequestsRateLimit); this.tenantNotificationRequestsRateLimit = RateLimitUtil.deduplicateByDuration(tenantNotificationRequestsRateLimit);
this.tenantNotificationRequestsPerRuleRateLimit = LimitedApiUtil.deduplicateByDuration(tenantNotificationRequestsPerRuleRateLimit); this.tenantNotificationRequestsPerRuleRateLimit = RateLimitUtil.deduplicateByDuration(tenantNotificationRequestsPerRuleRateLimit);
this.cassandraReadQueryTenantCoreRateLimits = LimitedApiUtil.deduplicateByDuration(cassandraReadQueryTenantCoreRateLimits); this.cassandraReadQueryTenantCoreRateLimits = RateLimitUtil.deduplicateByDuration(cassandraReadQueryTenantCoreRateLimits);
this.cassandraWriteQueryTenantCoreRateLimits = LimitedApiUtil.deduplicateByDuration(cassandraWriteQueryTenantCoreRateLimits); this.cassandraWriteQueryTenantCoreRateLimits = RateLimitUtil.deduplicateByDuration(cassandraWriteQueryTenantCoreRateLimits);
this.cassandraReadQueryTenantRuleEngineRateLimits = LimitedApiUtil.deduplicateByDuration(cassandraReadQueryTenantRuleEngineRateLimits); this.cassandraReadQueryTenantRuleEngineRateLimits = RateLimitUtil.deduplicateByDuration(cassandraReadQueryTenantRuleEngineRateLimits);
this.cassandraWriteQueryTenantRuleEngineRateLimits = LimitedApiUtil.deduplicateByDuration(cassandraWriteQueryTenantRuleEngineRateLimits); this.cassandraWriteQueryTenantRuleEngineRateLimits = RateLimitUtil.deduplicateByDuration(cassandraWriteQueryTenantRuleEngineRateLimits);
this.edgeEventRateLimits = LimitedApiUtil.deduplicateByDuration(edgeEventRateLimits); this.edgeEventRateLimits = RateLimitUtil.deduplicateByDuration(edgeEventRateLimits);
this.edgeEventRateLimitsPerEdge = LimitedApiUtil.deduplicateByDuration(edgeEventRateLimitsPerEdge); this.edgeEventRateLimitsPerEdge = RateLimitUtil.deduplicateByDuration(edgeEventRateLimitsPerEdge);
this.edgeUplinkMessagesRateLimits = LimitedApiUtil.deduplicateByDuration(edgeUplinkMessagesRateLimits); this.edgeUplinkMessagesRateLimits = RateLimitUtil.deduplicateByDuration(edgeUplinkMessagesRateLimits);
this.edgeUplinkMessagesRateLimitsPerEdge = LimitedApiUtil.deduplicateByDuration(edgeUplinkMessagesRateLimitsPerEdge); this.edgeUplinkMessagesRateLimitsPerEdge = RateLimitUtil.deduplicateByDuration(edgeUplinkMessagesRateLimitsPerEdge);
this.wsUpdatesPerSessionRateLimit = LimitedApiUtil.deduplicateByDuration(wsUpdatesPerSessionRateLimit); this.wsUpdatesPerSessionRateLimit = RateLimitUtil.deduplicateByDuration(wsUpdatesPerSessionRateLimit);
this.tenantServerRestLimitsConfiguration = LimitedApiUtil.deduplicateByDuration(tenantServerRestLimitsConfiguration); this.tenantServerRestLimitsConfiguration = RateLimitUtil.deduplicateByDuration(tenantServerRestLimitsConfiguration);
this.customerServerRestLimitsConfiguration = LimitedApiUtil.deduplicateByDuration(customerServerRestLimitsConfiguration); this.customerServerRestLimitsConfiguration = RateLimitUtil.deduplicateByDuration(customerServerRestLimitsConfiguration);
} }
} }

View File

@ -0,0 +1,102 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.limit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
class LimitedApiTest {
private DefaultTenantProfileConfiguration config;
@BeforeEach
void setUp() {
config = mock(DefaultTenantProfileConfiguration.class);
}
@Test
void testCorrectConfigExtractorsUsed() {
Map<LimitedApi, Runnable> verifierMap = Map.ofEntries(
Map.entry(LimitedApi.ENTITY_EXPORT, () ->
verify(config).getTenantEntityExportRateLimit()),
Map.entry(LimitedApi.ENTITY_IMPORT, () ->
verify(config).getTenantEntityImportRateLimit()),
Map.entry(LimitedApi.NOTIFICATION_REQUESTS, () ->
verify(config).getTenantNotificationRequestsRateLimit()),
Map.entry(LimitedApi.NOTIFICATION_REQUESTS_PER_RULE, () ->
verify(config).getTenantNotificationRequestsPerRuleRateLimit()),
Map.entry(LimitedApi.REST_REQUESTS_PER_TENANT, () ->
verify(config).getTenantServerRestLimitsConfiguration()),
Map.entry(LimitedApi.REST_REQUESTS_PER_CUSTOMER, () ->
verify(config).getCustomerServerRestLimitsConfiguration()),
Map.entry(LimitedApi.WS_UPDATES_PER_SESSION, () ->
verify(config).getWsUpdatesPerSessionRateLimit()),
Map.entry(LimitedApi.CASSANDRA_WRITE_QUERIES_CORE, () ->
verify(config).getCassandraWriteQueryTenantCoreRateLimits()),
Map.entry(LimitedApi.CASSANDRA_READ_QUERIES_CORE, () ->
verify(config).getCassandraReadQueryTenantCoreRateLimits()),
Map.entry(LimitedApi.CASSANDRA_WRITE_QUERIES_RULE_ENGINE, () ->
verify(config).getCassandraWriteQueryTenantRuleEngineRateLimits()),
Map.entry(LimitedApi.CASSANDRA_READ_QUERIES_RULE_ENGINE, () ->
verify(config).getCassandraReadQueryTenantRuleEngineRateLimits()),
Map.entry(LimitedApi.CASSANDRA_READ_QUERIES_MONOLITH, () -> {
verify(config).getCassandraReadQueryTenantCoreRateLimits();
verify(config).getCassandraReadQueryTenantRuleEngineRateLimits();
}),
Map.entry(LimitedApi.CASSANDRA_WRITE_QUERIES_MONOLITH, () -> {
verify(config).getCassandraWriteQueryTenantCoreRateLimits();
verify(config).getCassandraWriteQueryTenantRuleEngineRateLimits();
}),
Map.entry(LimitedApi.EDGE_EVENTS, () ->
verify(config).getEdgeEventRateLimits()),
Map.entry(LimitedApi.EDGE_EVENTS_PER_EDGE, () ->
verify(config).getEdgeEventRateLimitsPerEdge()),
Map.entry(LimitedApi.EDGE_UPLINK_MESSAGES, () ->
verify(config).getEdgeUplinkMessagesRateLimits()),
Map.entry(LimitedApi.EDGE_UPLINK_MESSAGES_PER_EDGE, () ->
verify(config).getEdgeUplinkMessagesRateLimitsPerEdge())
);
Set<LimitedApi> expected = verifierMap.keySet();
Set<LimitedApi> actual = Arrays.stream(LimitedApi.values())
.filter(api -> api.getConfigExtractor() != null)
.collect(Collectors.toSet());
assertThat(expected)
.as("Verifier map should cover all LimitedApis with extractors")
.containsExactlyInAnyOrderElementsOf(actual);
for (Map.Entry<LimitedApi, Runnable> entry : verifierMap.entrySet()) {
LimitedApi api = entry.getKey();
api.getLimitConfig(config);
entry.getValue().run();
clearInvocations(config);
}
}
}

View File

@ -24,12 +24,12 @@ import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
class LimitedApiUtilTest { class RateLimitUtilTest {
@Test @Test
@DisplayName("LimitedApiUtil should parse single entry correctly") @DisplayName("LimitedApiUtil should parse single entry correctly")
void testParseSingleEntry() { void testParseSingleEntry() {
List<LimitedApiEntry> entries = LimitedApiUtil.parseConfig("100:60"); List<RateLimitEntry> entries = RateLimitUtil.parseConfig("100:60");
assertThat(entries).hasSize(1); assertThat(entries).hasSize(1);
assertThat(entries.get(0).capacity()).isEqualTo(100); assertThat(entries.get(0).capacity()).isEqualTo(100);
@ -39,7 +39,7 @@ class LimitedApiUtilTest {
@Test @Test
@DisplayName("LimitedApiUtil should parse multiple entries correctly") @DisplayName("LimitedApiUtil should parse multiple entries correctly")
void testParseMultipleEntries() { void testParseMultipleEntries() {
List<LimitedApiEntry> entries = LimitedApiUtil.parseConfig("100:60,200:30"); List<RateLimitEntry> entries = RateLimitUtil.parseConfig("100:60,200:30");
assertThat(entries).hasSize(2); assertThat(entries).hasSize(2);
assertThat(entries.get(0).capacity()).isEqualTo(100); assertThat(entries.get(0).capacity()).isEqualTo(100);
@ -51,8 +51,8 @@ class LimitedApiUtilTest {
@Test @Test
@DisplayName("LimitedApiUtil should return empty list for null or empty config") @DisplayName("LimitedApiUtil should return empty list for null or empty config")
void testParseEmptyConfig() { void testParseEmptyConfig() {
assertThat(LimitedApiUtil.parseConfig(null)).isEmpty(); assertThat(RateLimitUtil.parseConfig(null)).isEmpty();
assertThat(LimitedApiUtil.parseConfig("")).isEmpty(); assertThat(RateLimitUtil.parseConfig("")).isEmpty();
} }
@Test @Test
@ -64,7 +64,7 @@ class LimitedApiUtilTest {
// Fake config instance (not used directly in lambda logic) // Fake config instance (not used directly in lambda logic)
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config); String result = RateLimitUtil.merge(extractor1, extractor2).apply(config);
// Should be: 300:60 (100+200), 50:30, 25:10 // Should be: 300:60 (100+200), 50:30, 25:10
assertThat(result).isEqualTo("25:10,50:30,300:60"); assertThat(result).isEqualTo("25:10,50:30,300:60");
@ -78,7 +78,7 @@ class LimitedApiUtilTest {
// Fake config instance (not used directly in lambda logic) // Fake config instance (not used directly in lambda logic)
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config); String result = RateLimitUtil.merge(extractor1, extractor2).apply(config);
assertThat(result).isEqualTo("100:60"); assertThat(result).isEqualTo("100:60");
} }
@ -91,7 +91,7 @@ class LimitedApiUtilTest {
// Fake config instance (not used directly in lambda logic) // Fake config instance (not used directly in lambda logic)
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config); String result = RateLimitUtil.merge(extractor1, extractor2).apply(config);
assertThat(result).isEqualTo("200:10,100:60"); assertThat(result).isEqualTo("200:10,100:60");
} }

View File

@ -21,8 +21,8 @@ import io.github.bucket4j.Bucket;
import io.github.bucket4j.local.LocalBucket; import io.github.bucket4j.local.LocalBucket;
import io.github.bucket4j.local.LocalBucketBuilder; import io.github.bucket4j.local.LocalBucketBuilder;
import lombok.Getter; import lombok.Getter;
import org.thingsboard.server.common.data.limit.LimitedApiEntry; import org.thingsboard.server.common.data.limit.RateLimitEntry;
import org.thingsboard.server.common.data.limit.LimitedApiUtil; import org.thingsboard.server.common.data.limit.RateLimitUtil;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
@ -41,12 +41,12 @@ public class TbRateLimits {
} }
public TbRateLimits(String limitsConfiguration, boolean refillIntervally) { public TbRateLimits(String limitsConfiguration, boolean refillIntervally) {
List<LimitedApiEntry> limitedApiEntries = LimitedApiUtil.parseConfig(limitsConfiguration); List<RateLimitEntry> limitedApiEntries = RateLimitUtil.parseConfig(limitsConfiguration);
if (limitedApiEntries.isEmpty()) { if (limitedApiEntries.isEmpty()) {
throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration); throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);
} }
LocalBucketBuilder localBucket = Bucket.builder(); LocalBucketBuilder localBucket = Bucket.builder();
for (LimitedApiEntry entry : limitedApiEntries) { for (RateLimitEntry entry : limitedApiEntries) {
BandwidthBuilder.BandwidthBuilderRefillStage bandwidthBuilder = Bandwidth.builder().capacity(entry.capacity()); BandwidthBuilder.BandwidthBuilderRefillStage bandwidthBuilder = Bandwidth.builder().capacity(entry.capacity());
Bandwidth bandwidth = refillIntervally ? Bandwidth bandwidth = refillIntervally ?
bandwidthBuilder.refillIntervally(entry.capacity(), Duration.ofSeconds(entry.durationSeconds())).build() : bandwidthBuilder.refillIntervally(entry.capacity(), Duration.ofSeconds(entry.durationSeconds())).build() :

View File

@ -53,8 +53,8 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu
@Autowired EntityService entityService, @Autowired EntityService entityService,
@Autowired RateLimitService rateLimitService, @Autowired RateLimitService rateLimitService,
@Autowired(required = false) TbServiceInfoProvider serviceInfoProvider) { @Autowired(required = false) TbServiceInfoProvider serviceInfoProvider) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq,
entityService, rateLimitService, serviceInfoProvider, printTenantNames); BufferedRateExecutorType.READ, entityService, rateLimitService, serviceInfoProvider, statsFactory, printTenantNames);
} }
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
@ -68,11 +68,6 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu
super.stop(); super.stop();
} }
@Override
protected BufferedRateExecutorType getBufferedRateExecutorType() {
return BufferedRateExecutorType.READ;
}
@Override @Override
protected SettableFuture<TbResultSet> create() { protected SettableFuture<TbResultSet> create() {
return SettableFuture.create(); return SettableFuture.create();

View File

@ -53,8 +53,8 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec
@Autowired EntityService entityService, @Autowired EntityService entityService,
@Autowired RateLimitService rateLimitService, @Autowired RateLimitService rateLimitService,
@Autowired(required = false) TbServiceInfoProvider serviceInfoProvider) { @Autowired(required = false) TbServiceInfoProvider serviceInfoProvider) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq,
entityService, rateLimitService, serviceInfoProvider, printTenantNames); BufferedRateExecutorType.WRITE, entityService, rateLimitService, serviceInfoProvider, statsFactory, printTenantNames);
} }
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
@ -68,11 +68,6 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec
super.stop(); super.stop();
} }
@Override
protected BufferedRateExecutorType getBufferedRateExecutorType() {
return BufferedRateExecutorType.WRITE;
}
@Override @Override
protected SettableFuture<TbResultSet> create() { protected SettableFuture<TbResultSet> create() {
return SettableFuture.create(); return SettableFuture.create();

View File

@ -18,7 +18,7 @@ package org.thingsboard.server.dao.service;
import jakarta.validation.ConstraintValidator; import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext; import jakarta.validation.ConstraintValidatorContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.limit.LimitedApiUtil; import org.thingsboard.server.common.data.limit.RateLimitUtil;
import org.thingsboard.server.common.data.validation.RateLimit; import org.thingsboard.server.common.data.validation.RateLimit;
@Slf4j @Slf4j
@ -26,7 +26,7 @@ public class RateLimitValidator implements ConstraintValidator<RateLimit, String
@Override @Override
public boolean isValid(String value, ConstraintValidatorContext constraintValidatorContext) { public boolean isValid(String value, ConstraintValidatorContext constraintValidatorContext) {
return value == null || LimitedApiUtil.isValid(value); return value == null || RateLimitUtil.isValid(value);
} }
} }

View File

@ -66,6 +66,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
private final long maxWaitTime; private final long maxWaitTime;
private final long pollMs; private final long pollMs;
private final String bufferName;
private final BlockingQueue<AsyncTaskContext<T, V>> queue; private final BlockingQueue<AsyncTaskContext<T, V>> queue;
private final ExecutorService dispatcherExecutor; private final ExecutorService dispatcherExecutor;
private final ExecutorService callbackExecutor; private final ExecutorService callbackExecutor;
@ -80,29 +81,32 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
private final EntityService entityService; private final EntityService entityService;
private final RateLimitService rateLimitService; private final RateLimitService rateLimitService;
private final TbServiceInfoProvider serviceInfoProvider;
private final boolean printTenantNames; private final boolean printTenantNames;
private final Map<TenantId, String> tenantNamesCache = new HashMap<>(); private final Map<TenantId, String> tenantNamesCache = new HashMap<>();
private final LimitedApi myLimitedApi;
public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads,
int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory, int callbackThreads, long pollMs, int printQueriesFreq, BufferedRateExecutorType executorType,
EntityService entityService, RateLimitService rateLimitService, TbServiceInfoProvider serviceInfoProvider, boolean printTenantNames) { EntityService entityService, RateLimitService rateLimitService, TbServiceInfoProvider serviceInfoProvider,
StatsFactory statsFactory, boolean printTenantNames) {
this.maxWaitTime = maxWaitTime; this.maxWaitTime = maxWaitTime;
this.pollMs = pollMs; this.pollMs = pollMs;
this.bufferName = executorType.getDisplayName();
this.concurrencyLimit = concurrencyLimit; this.concurrencyLimit = concurrencyLimit;
this.printQueriesFreq = printQueriesFreq; this.printQueriesFreq = printQueriesFreq;
this.queue = new LinkedBlockingDeque<>(queueLimit); this.queue = new LinkedBlockingDeque<>(queueLimit);
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-dispatcher")); this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-" + bufferName + "-dispatcher"));
this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreads, "nosql-" + getBufferName() + "-callback"); this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreads, "nosql-" + bufferName + "-callback");
this.timeoutExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("nosql-" + getBufferName() + "-timeout"); this.timeoutExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("nosql-" + bufferName + "-timeout");
this.stats = new BufferedRateExecutorStats(statsFactory); this.stats = new BufferedRateExecutorStats(statsFactory);
String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL + getBufferName(); //metric name may change with buffer name suffix String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL + bufferName; //metric name may change with buffer name suffix
this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0)); this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0));
this.entityService = entityService; this.entityService = entityService;
this.rateLimitService = rateLimitService; this.rateLimitService = rateLimitService;
this.serviceInfoProvider = serviceInfoProvider; this.myLimitedApi = resolveLimitedApi(serviceInfoProvider, executorType);
this.printTenantNames = printTenantNames; this.printTenantNames = printTenantNames;
for (int i = 0; i < dispatcherThreads; i++) { for (int i = 0; i < dispatcherThreads; i++) {
@ -118,14 +122,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
boolean perTenantLimitReached = false; boolean perTenantLimitReached = false;
TenantId tenantId = task.getTenantId(); TenantId tenantId = task.getTenantId();
if (tenantId != null && !tenantId.isSysTenantId()) { if (tenantId != null && !tenantId.isSysTenantId()) {
if (!rateLimitService.checkRateLimit(getMyLimitedApi(), tenantId, tenantId, true)) { if (!rateLimitService.checkRateLimit(myLimitedApi, tenantId, tenantId, true)) {
stats.incrementRateLimitedTenant(tenantId); stats.incrementRateLimitedTenant(tenantId);
stats.getTotalRateLimited().increment(); stats.getTotalRateLimited().increment();
settableFuture.setException(new TenantRateLimitException()); settableFuture.setException(new TenantRateLimitException());
perTenantLimitReached = true; perTenantLimitReached = true;
} }
} else if (tenantId == null) { } else if (tenantId == null) {
log.info("[{}] Invalid task received: {}", getBufferName(), task); log.info("[{}] Invalid task received: {}", bufferName, task);
} }
if (!perTenantLimitReached) { if (!perTenantLimitReached) {
@ -140,17 +144,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
return result; return result;
} }
private LimitedApi getMyLimitedApi() { private LimitedApi resolveLimitedApi(TbServiceInfoProvider serviceInfoProvider, BufferedRateExecutorType executorType) {
if (serviceInfoProvider == null) { if (serviceInfoProvider == null || serviceInfoProvider.isMonolith()) {
return getBufferedRateExecutorType().getMonolithLimitedApi(); return executorType.getMonolithLimitedApi();
}
if (serviceInfoProvider.isMonolith()) {
return getBufferedRateExecutorType().getMonolithLimitedApi();
} }
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
return getBufferedRateExecutorType().getRuleEngineLimitedApi(); return executorType.getRuleEngineLimitedApi();
} }
return getBufferedRateExecutorType().getCoreLimitedApi(); return executorType.getCoreLimitedApi();
} }
public void stop() { public void stop() {
@ -171,14 +172,8 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> taskCtx); protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> taskCtx);
private String getBufferName() {
return getBufferedRateExecutorType().getDisplayName();
}
protected abstract BufferedRateExecutorType getBufferedRateExecutorType();
private void dispatch() { private void dispatch() {
log.info("[{}] Buffered rate executor thread started", getBufferName()); log.info("[{}] Buffered rate executor thread started", bufferName);
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
int curLvl = concurrencyLevel.get(); int curLvl = concurrencyLevel.get();
AsyncTaskContext<T, V> taskCtx = null; AsyncTaskContext<T, V> taskCtx = null;
@ -190,7 +185,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) { if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) {
printQueriesIdx.set(0); printQueriesIdx.set(0);
String query = queryToString(finalTaskCtx); String query = queryToString(finalTaskCtx);
log.info("[{}][{}] Cassandra query: {}", getBufferName(), taskCtx.getId(), query); log.info("[{}][{}] Cassandra query: {}", bufferName, taskCtx.getId(), query);
} }
} }
logTask("Processing", finalTaskCtx); logTask("Processing", finalTaskCtx);
@ -243,7 +238,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
} }
} }
} }
log.info("[{}] Buffered rate executor thread stopped", getBufferName()); log.info("[{}] Buffered rate executor thread stopped", bufferName);
} }
private void logTask(String action, AsyncTaskContext<T, V> taskCtx) { private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
@ -319,7 +314,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] "); statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
stats.getStatsCounters().forEach(StatsCounter::clear); stats.getStatsCounters().forEach(StatsCounter::clear);
log.info("[{}] Permits {}", getBufferName(), statsBuilder); log.info("[{}] Permits {}", bufferName, statsBuilder);
} }
stats.getRateLimitedTenants().entrySet().stream() stats.getRateLimitedTenants().entrySet().stream()
@ -335,13 +330,13 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
try { try {
return entityService.fetchEntityName(TenantId.SYS_TENANT_ID, tenantId).orElse(defaultName); return entityService.fetchEntityName(TenantId.SYS_TENANT_ID, tenantId).orElse(defaultName);
} catch (Exception e) { } catch (Exception e) {
log.error("[{}][{}] Failed to get tenant name", getBufferName(), tenantId, e); log.error("[{}][{}] Failed to get tenant name", bufferName, tenantId, e);
return defaultName; return defaultName;
} }
}); });
log.info("[{}][{}][{}] Rate limited requests: {}", getBufferName(), tenantId, name, rateLimitedRequests); log.info("[{}][{}][{}] Rate limited requests: {}", bufferName, tenantId, name, rateLimitedRequests);
} else { } else {
log.info("[{}][{}] Rate limited requests: {}", getBufferName(), tenantId, rateLimitedRequests); log.info("[{}][{}] Rate limited requests: {}", bufferName, tenantId, rateLimitedRequests);
} }
}); });
} }