From 8af37beb4a7df2d51cbca5a8654329939f2db34f Mon Sep 17 00:00:00 2001 From: dshvaika Date: Tue, 20 May 2025 16:19:44 +0300 Subject: [PATCH] New Cassandra rate limits: separated for Read and Write + Core and Rule Engine --- .../service/limits/RateLimitServiceTest.java | 12 +- .../server/common/data/limit/LimitedApi.java | 13 +- .../common/data/limit/LimitedApiEntry.java | 34 ++++++ .../common/data/limit/LimitedApiUtil.java | 67 +++++++++++ .../DefaultTenantProfileConfiguration.java | 6 +- .../common/data/limit/LimitedApiUtilTest.java | 113 ++++++++++++++++++ .../server/common/msg/tools/TbRateLimits.java | 27 +++-- .../common/msg/tools/TbRateLimitsTest.java | 63 ++++++++++ .../DefaultTbServiceInfoProvider.java | 13 +- .../discovery/TbServiceInfoProvider.java | 2 + dao/pom.xml | 4 + .../CassandraBufferedRateReadExecutor.java | 13 +- .../CassandraBufferedRateWriteExecutor.java | 13 +- .../util/AbstractBufferedRateExecutor.java | 24 +++- .../dao/util/BufferedRateExecutorType.java | 40 +++++++ 15 files changed, 407 insertions(+), 37 deletions(-) create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApiEntry.java create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApiUtil.java create mode 100644 common/data/src/test/java/org/thingsboard/server/common/data/limit/LimitedApiUtilTest.java create mode 100644 common/message/src/test/java/org/thingsboard/server/common/msg/tools/TbRateLimitsTest.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateExecutorType.java diff --git a/application/src/test/java/org/thingsboard/server/service/limits/RateLimitServiceTest.java b/application/src/test/java/org/thingsboard/server/service/limits/RateLimitServiceTest.java index 3eb5793c74..f85ad848ab 100644 --- a/application/src/test/java/org/thingsboard/server/service/limits/RateLimitServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/limits/RateLimitServiceTest.java @@ -52,7 +52,7 @@ public class RateLimitServiceTest { public void beforeEach() { tenantProfileCache = Mockito.mock(DefaultTbTenantProfileCache.class); rateLimitService = new DefaultRateLimitService(tenantProfileCache, mock(NotificationRuleProcessor.class), 60, 100); - tenantId = new TenantId(UUID.randomUUID()); + tenantId = TenantId.fromUUID(UUID.randomUUID()); } @Test @@ -67,7 +67,10 @@ public class RateLimitServiceTest { profileConfiguration.setTenantServerRestLimitsConfiguration(rateLimit); profileConfiguration.setCustomerServerRestLimitsConfiguration(rateLimit); profileConfiguration.setWsUpdatesPerSessionRateLimit(rateLimit); - profileConfiguration.setCassandraQueryTenantRateLimitsConfiguration(rateLimit); + profileConfiguration.setCassandraReadQueryTenantCoreRateLimits(rateLimit); + profileConfiguration.setCassandraWriteQueryTenantCoreRateLimits(rateLimit); + profileConfiguration.setCassandraReadQueryTenantRuleEngineRateLimits(rateLimit); + profileConfiguration.setCassandraWriteQueryTenantRuleEngineRateLimits(rateLimit); profileConfiguration.setEdgeEventRateLimits(rateLimit); profileConfiguration.setEdgeEventRateLimitsPerEdge(rateLimit); profileConfiguration.setEdgeUplinkMessagesRateLimits(rateLimit); @@ -79,7 +82,10 @@ public class RateLimitServiceTest { LimitedApi.ENTITY_IMPORT, LimitedApi.NOTIFICATION_REQUESTS, LimitedApi.REST_REQUESTS_PER_CUSTOMER, - LimitedApi.CASSANDRA_QUERIES, + LimitedApi.CASSANDRA_READ_QUERIES_CORE, + LimitedApi.CASSANDRA_WRITE_QUERIES_CORE, + LimitedApi.CASSANDRA_READ_QUERIES_RULE_ENGINE, + LimitedApi.CASSANDRA_WRITE_QUERIES_RULE_ENGINE, LimitedApi.EDGE_EVENTS, LimitedApi.EDGE_EVENTS_PER_EDGE, LimitedApi.EDGE_UPLINK_MESSAGES, diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApi.java b/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApi.java index db7f14171b..766a1f4eb3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApi.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApi.java @@ -30,7 +30,18 @@ public enum LimitedApi { REST_REQUESTS_PER_TENANT(DefaultTenantProfileConfiguration::getTenantServerRestLimitsConfiguration, "REST API requests", true), REST_REQUESTS_PER_CUSTOMER(DefaultTenantProfileConfiguration::getCustomerServerRestLimitsConfiguration, "REST API requests per customer", false), WS_UPDATES_PER_SESSION(DefaultTenantProfileConfiguration::getWsUpdatesPerSessionRateLimit, "WS updates per session", true), - CASSANDRA_QUERIES(DefaultTenantProfileConfiguration::getCassandraQueryTenantRateLimitsConfiguration, "Cassandra queries", true), + CASSANDRA_WRITE_QUERIES_CORE(DefaultTenantProfileConfiguration::getCassandraReadQueryTenantCoreRateLimits, "Rest API and WS telemetry read queries", true), + CASSANDRA_READ_QUERIES_CORE(DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantCoreRateLimits, "Rest API and WS telemetry write queries", true), + CASSANDRA_WRITE_QUERIES_RULE_ENGINE(DefaultTenantProfileConfiguration::getCassandraReadQueryTenantRuleEngineRateLimits, "Rule Engine telemetry read queries", true), + CASSANDRA_READ_QUERIES_RULE_ENGINE(DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits, "Rule Engine telemetry write queries", true), + CASSANDRA_READ_QUERIES_MONOLITH( + LimitedApiUtil.merge( + DefaultTenantProfileConfiguration::getCassandraReadQueryTenantCoreRateLimits, + DefaultTenantProfileConfiguration::getCassandraReadQueryTenantRuleEngineRateLimits), "Telemetry read queries", true), + CASSANDRA_WRITE_QUERIES_MONOLITH( + LimitedApiUtil.merge( + DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantCoreRateLimits, + DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits), "Telemetry write queries", true), EDGE_EVENTS(DefaultTenantProfileConfiguration::getEdgeEventRateLimits, "Edge events", true), EDGE_EVENTS_PER_EDGE(DefaultTenantProfileConfiguration::getEdgeEventRateLimitsPerEdge, "Edge events per edge", false), EDGE_UPLINK_MESSAGES(DefaultTenantProfileConfiguration::getEdgeUplinkMessagesRateLimits, "Edge uplink messages", true), diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApiEntry.java b/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApiEntry.java new file mode 100644 index 0000000000..3082b521b6 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApiEntry.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.limit; + +public record LimitedApiEntry(long capacity, long durationSeconds) { + + public static LimitedApiEntry parse(String s) { + String[] parts = s.split(":"); + return new LimitedApiEntry(Long.parseLong(parts[0]), Long.parseLong(parts[1])); + } + + public double rps() { + return (double) capacity / durationSeconds; + } + + @Override + public String toString() { + return capacity + ":" + durationSeconds; + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApiUtil.java b/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApiUtil.java new file mode 100644 index 0000000000..a90c7eb825 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/limit/LimitedApiUtil.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.limit; + +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class LimitedApiUtil { + + public static List parseConfig(String config) { + if (config == null || config.isEmpty()) { + return Collections.emptyList(); + } + return Arrays.stream(config.split(",")) + .map(LimitedApiEntry::parse) + .toList(); + } + + public static Function merge( + Function configExtractor1, + Function configExtractor2) { + return config -> { + String config1 = configExtractor1.apply(config); + String config2 = configExtractor2.apply(config); + return LimitedApiUtil.mergeStrConfigs(config1, config2); // merges the configs + }; + } + + private static String mergeStrConfigs(String firstConfig, String secondConfig) { + List all = new ArrayList<>(); + all.addAll(parseConfig(firstConfig)); + all.addAll(parseConfig(secondConfig)); + + Map merged = new HashMap<>(); + + for (LimitedApiEntry entry : all) { + merged.merge(entry.durationSeconds(), entry.capacity(), Long::sum); + } + + return merged.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) // optional: sort by duration + .map(e -> e.getValue() + ":" + e.getKey()) + .collect(Collectors.joining(",")); + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index f256b02d9a..d61f48b148 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -121,7 +121,11 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private long maxWsSubscriptionsPerPublicUser; private String wsUpdatesPerSessionRateLimit; - private String cassandraQueryTenantRateLimitsConfiguration; + private String cassandraReadQueryTenantCoreRateLimits; + private String cassandraWriteQueryTenantCoreRateLimits; + + private String cassandraReadQueryTenantRuleEngineRateLimits; + private String cassandraWriteQueryTenantRuleEngineRateLimits; private String edgeEventRateLimits; private String edgeEventRateLimitsPerEdge; diff --git a/common/data/src/test/java/org/thingsboard/server/common/data/limit/LimitedApiUtilTest.java b/common/data/src/test/java/org/thingsboard/server/common/data/limit/LimitedApiUtilTest.java new file mode 100644 index 0000000000..6486de3221 --- /dev/null +++ b/common/data/src/test/java/org/thingsboard/server/common/data/limit/LimitedApiUtilTest.java @@ -0,0 +1,113 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.limit; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; + +import java.util.List; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; + +class LimitedApiUtilTest { + + @Test + @DisplayName("LimitedApiUtil should parse single entry correctly") + void testParseSingleEntry() { + List entries = LimitedApiUtil.parseConfig("100:60"); + + assertThat(entries).hasSize(1); + assertThat(entries.get(0).capacity()).isEqualTo(100); + assertThat(entries.get(0).durationSeconds()).isEqualTo(60); + } + + @Test + @DisplayName("LimitedApiUtil should parse multiple entries correctly") + void testParseMultipleEntries() { + List entries = LimitedApiUtil.parseConfig("100:60,200:30"); + + assertThat(entries).hasSize(2); + assertThat(entries.get(0).capacity()).isEqualTo(100); + assertThat(entries.get(0).durationSeconds()).isEqualTo(60); + assertThat(entries.get(1).capacity()).isEqualTo(200); + assertThat(entries.get(1).durationSeconds()).isEqualTo(30); + } + + @Test + @DisplayName("LimitedApiUtil should return empty list for null or empty config") + void testParseEmptyConfig() { + assertThat(LimitedApiUtil.parseConfig(null)).isEmpty(); + assertThat(LimitedApiUtil.parseConfig("")).isEmpty(); + } + + @Test + @DisplayName("LimitedApiUtil should merge two configs by summing capacities with same durations") + void testMergeStrConfigs() { + Function extractor1 = cfg -> "100:60,50:30"; + Function extractor2 = cfg -> "200:60,25:10"; + + // Fake config instance (not used directly in lambda logic) + DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); + + String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config); + + // Should be: 300:60 (100+200), 50:30, 25:10 + assertThat(result).isEqualTo("25:10,50:30,300:60"); + } + + @Test + @DisplayName("LimitedApiUtil should merge configs when one is empty") + void testMergeWithEmptyOne() { + Function extractor1 = cfg -> "100:60"; + Function extractor2 = cfg -> ""; + + // Fake config instance (not used directly in lambda logic) + DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); + String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config); + + assertThat(result).isEqualTo("100:60"); + } + + @Test + @DisplayName("LimitedApiUtil should merge configs when both have distinct durations") + void testMergeWithDistinctDurations() { + Function extractor1 = cfg -> "100:60"; + Function extractor2 = cfg -> "200:10"; + + // Fake config instance (not used directly in lambda logic) + DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); + String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config); + + assertThat(result).isEqualTo("200:10,100:60"); + } + + @Test + @DisplayName("LimitedApiUtil shouldn't have duplicate durations in the same config!") + void testMergeHandlesDuplicatesInSingleConfig() { + Function extractor1 = cfg -> "100:60,200:60"; + Function extractor2 = cfg -> ""; + + // Fake config instance (not used directly in lambda logic) + DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); + String result = LimitedApiUtil.merge(extractor1, extractor2).apply(config); + + // 100+200 = 300 for duration 60. Currently possible to save the same "per seconds" config from the UI. + // This must be fixed, so we will merge only two different rate limits. + assertThat(result).isEqualTo("300:60"); + } +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java index 8e8933c324..1e2d8dd4ce 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java @@ -16,13 +16,17 @@ package org.thingsboard.server.common.msg.tools; import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.BandwidthBuilder; import io.github.bucket4j.Bucket; import io.github.bucket4j.Refill; import io.github.bucket4j.local.LocalBucket; import io.github.bucket4j.local.LocalBucketBuilder; import lombok.Getter; +import org.thingsboard.server.common.data.limit.LimitedApiEntry; +import org.thingsboard.server.common.data.limit.LimitedApiUtil; import java.time.Duration; +import java.util.List; /** * Created by ashvayka on 22.10.18. @@ -38,20 +42,19 @@ public class TbRateLimits { } public TbRateLimits(String limitsConfiguration, boolean refillIntervally) { - LocalBucketBuilder builder = Bucket.builder(); - boolean initialized = false; - for (String limitSrc : limitsConfiguration.split(",")) { - long capacity = Long.parseLong(limitSrc.split(":")[0]); - long duration = Long.parseLong(limitSrc.split(":")[1]); - Refill refill = refillIntervally ? Refill.intervally(capacity, Duration.ofSeconds(duration)) : Refill.greedy(capacity, Duration.ofSeconds(duration)); - builder.addLimit(Bandwidth.classic(capacity, refill)); - initialized = true; - } - if (initialized) { - bucket = builder.build(); - } else { + List limitedApiEntries = LimitedApiUtil.parseConfig(limitsConfiguration); + if (limitedApiEntries.isEmpty()) { throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration); } + LocalBucketBuilder localBucket = Bucket.builder(); + for (LimitedApiEntry entry : limitedApiEntries) { + BandwidthBuilder.BandwidthBuilderRefillStage bandwidthBuilder = Bandwidth.builder().capacity(entry.capacity()); + Bandwidth bandwidth = refillIntervally ? + bandwidthBuilder.refillIntervally(entry.capacity(), Duration.ofSeconds(entry.durationSeconds())).build() : + bandwidthBuilder.refillGreedy(entry.capacity(), Duration.ofSeconds(entry.durationSeconds())).build(); + localBucket.addLimit(bandwidth); + } + this.bucket = localBucket.build(); this.configuration = limitsConfiguration; } diff --git a/common/message/src/test/java/org/thingsboard/server/common/msg/tools/TbRateLimitsTest.java b/common/message/src/test/java/org/thingsboard/server/common/msg/tools/TbRateLimitsTest.java new file mode 100644 index 0000000000..a6a95da9c1 --- /dev/null +++ b/common/message/src/test/java/org/thingsboard/server/common/msg/tools/TbRateLimitsTest.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.tools; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class TbRateLimitsTest { + + @Test + @DisplayName("TbRateLimits should construct with single rate limit") + void testSingleLimitConstructor() { + TbRateLimits limits = new TbRateLimits("10:1", false); + assertThat(limits.getConfiguration()).isEqualTo("10:1"); + } + + @Test + @DisplayName("TbRateLimits should construct with multiple rate limits") + void testMultipleLimitConstructor() { + String config = "10:1,100:10"; + TbRateLimits limits = new TbRateLimits(config, false); + assertThat(limits.getConfiguration()).isEqualTo(config); + } + + @Test + @DisplayName("TbRateLimits should throw IllegalArgumentException on empty string") + void testEmptyConfigThrows() { + assertThatThrownBy(() -> new TbRateLimits("", false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Failed to parse rate limits configuration: "); + } + + @Test + @DisplayName("TbRateLimits should throw NumberFormatException on malformed value") + void testMalformedConfigThrows() { + assertThatThrownBy(() -> new TbRateLimits("not_a_number:second", false)) + .isInstanceOf(NumberFormatException.class); + } + + @Test + @DisplayName("TbRateLimits should throw ArrayIndexOutOfBoundsException on missing colon") + void testColonMissingThrows() { + assertThatThrownBy(() -> new TbRateLimits("100", false)) + .isInstanceOf(ArrayIndexOutOfBoundsException.class); + } + +} \ No newline at end of file diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index 609d3f8eee..7ff6fafbca 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -78,11 +78,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { } } log.info("Current Service ID: {}", serviceId); - if (serviceType.equalsIgnoreCase("monolith")) { - serviceTypes = List.of(ServiceType.values()); - } else { - serviceTypes = Collections.singletonList(ServiceType.of(serviceType)); - } + serviceTypes = isMonolith() ? + List.of(ServiceType.values()) : + Collections.singletonList(ServiceType.of(serviceType)); if (!serviceTypes.contains(ServiceType.TB_RULE_ENGINE) || assignedTenantProfiles == null) { assignedTenantProfiles = Collections.emptySet(); } @@ -113,6 +111,11 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { return serviceInfo; } + @Override + public boolean isMonolith() { + return serviceType.equalsIgnoreCase("monolith"); + } + @Override public boolean isService(ServiceType serviceType) { return serviceTypes.contains(serviceType); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java index 51a6d808dd..a844440a21 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java @@ -29,6 +29,8 @@ public interface TbServiceInfoProvider { ServiceInfo getServiceInfo(); + boolean isMonolith(); + boolean isService(ServiceType serviceType); ServiceInfo generateNewServiceInfoWithCurrentSystemInfo(); diff --git a/dao/pom.xml b/dao/pom.xml index 2206d8a7d7..67b5a36c92 100644 --- a/dao/pom.xml +++ b/dao/pom.xml @@ -59,6 +59,10 @@ org.thingsboard.common util + + org.thingsboard.common + queue + com.networknt json-schema-validator diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java index 259cd908fc..a7aa0dc5dc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateReadExecutor.java @@ -28,7 +28,9 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AsyncTaskContext; +import org.thingsboard.server.dao.util.BufferedRateExecutorType; import org.thingsboard.server.dao.util.NoSqlAnyDao; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; /** * Created by ashvayka on 24.10.18. @@ -38,8 +40,6 @@ import org.thingsboard.server.dao.util.NoSqlAnyDao; @NoSqlAnyDao public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecutor { - static final String BUFFER_NAME = "Read"; - public CassandraBufferedRateReadExecutor( @Value("${cassandra.query.buffer_size}") int queueLimit, @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, @@ -51,9 +51,10 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq, @Autowired StatsFactory statsFactory, @Autowired EntityService entityService, - @Autowired RateLimitService rateLimitService) { + @Autowired RateLimitService rateLimitService, + @Autowired TbServiceInfoProvider serviceInfoProvider) { super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, - entityService, rateLimitService, printTenantNames); + entityService, rateLimitService, serviceInfoProvider, printTenantNames); } @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @@ -68,8 +69,8 @@ public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecu } @Override - public String getBufferName() { - return BUFFER_NAME; + protected BufferedRateExecutorType getBufferedRateExecutorType() { + return BufferedRateExecutorType.READ; } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java index 9c894a7eac..c61148e51f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateWriteExecutor.java @@ -28,7 +28,9 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AsyncTaskContext; +import org.thingsboard.server.dao.util.BufferedRateExecutorType; import org.thingsboard.server.dao.util.NoSqlAnyDao; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; /** * Created by ashvayka on 24.10.18. @@ -38,8 +40,6 @@ import org.thingsboard.server.dao.util.NoSqlAnyDao; @NoSqlAnyDao public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExecutor { - static final String BUFFER_NAME = "Write"; - public CassandraBufferedRateWriteExecutor( @Value("${cassandra.query.buffer_size}") int queueLimit, @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, @@ -51,9 +51,10 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq, @Autowired StatsFactory statsFactory, @Autowired EntityService entityService, - @Autowired RateLimitService rateLimitService) { + @Autowired RateLimitService rateLimitService, + @Autowired TbServiceInfoProvider serviceInfoProvider) { super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, printQueriesFreq, statsFactory, - entityService, rateLimitService, printTenantNames); + entityService, rateLimitService, serviceInfoProvider, printTenantNames); } @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @@ -68,8 +69,8 @@ public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExec } @Override - public String getBufferName() { - return BUFFER_NAME; + protected BufferedRateExecutorType getBufferedRateExecutorType() { + return BufferedRateExecutorType.WRITE; } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index a87e9312e3..0f6170f25d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -34,12 +34,14 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cache.limits.RateLimitService; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.limit.LimitedApi; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsCounter; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.nosql.CassandraStatementTask; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import java.util.HashMap; import java.util.Map; @@ -78,13 +80,14 @@ public abstract class AbstractBufferedRateExecutor tenantNamesCache = new HashMap<>(); public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs, int printQueriesFreq, StatsFactory statsFactory, - EntityService entityService, RateLimitService rateLimitService, boolean printTenantNames) { + EntityService entityService, RateLimitService rateLimitService, TbServiceInfoProvider serviceInfoProvider, boolean printTenantNames) { this.maxWaitTime = maxWaitTime; this.pollMs = pollMs; this.concurrencyLimit = concurrencyLimit; @@ -99,6 +102,7 @@ public abstract class AbstractBufferedRateExecutor execute(AsyncTaskContext taskCtx); - public abstract String getBufferName(); + private String getBufferName() { + return getBufferedRateExecutorType().getDisplayName(); + } + + protected abstract BufferedRateExecutorType getBufferedRateExecutorType(); private void dispatch() { log.info("[{}] Buffered rate executor thread started", getBufferName()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateExecutorType.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateExecutorType.java new file mode 100644 index 0000000000..bf76235e97 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateExecutorType.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; +import org.thingsboard.server.common.data.limit.LimitedApi; + +@Getter +public enum BufferedRateExecutorType { + + READ(LimitedApi.CASSANDRA_READ_QUERIES_CORE, LimitedApi.CASSANDRA_READ_QUERIES_RULE_ENGINE, LimitedApi.CASSANDRA_READ_QUERIES_MONOLITH), + WRITE(LimitedApi.CASSANDRA_WRITE_QUERIES_CORE, LimitedApi.CASSANDRA_WRITE_QUERIES_RULE_ENGINE, LimitedApi.CASSANDRA_WRITE_QUERIES_MONOLITH); + + private final LimitedApi coreLimitedApi; + private final LimitedApi ruleEngineLimitedApi; + private final LimitedApi monolithLimitedApi; + + private final String displayName = StringUtils.capitalize(name().toLowerCase()); + + BufferedRateExecutorType(LimitedApi coreLimitedApi, LimitedApi ruleEngineLimitedApi, LimitedApi monolithLimitedApi) { + this.coreLimitedApi = coreLimitedApi; + this.ruleEngineLimitedApi = ruleEngineLimitedApi; + this.monolithLimitedApi = monolithLimitedApi; + } + +}