diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index 0d4cc26ce7..44c5182423 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -19,6 +19,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; @@ -56,6 +57,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb oldQueue = queueService.findQueueById(queue.getTenantId(), queue.getId()); } + checkQueueName(queue.getName()); Queue savedQueue = queueService.saveQueue(queue); createTopicsIfNeeded(savedQueue, oldQueue); tbClusterService.onQueuesUpdate(List.of(savedQueue)); @@ -181,4 +183,10 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb } } + private void checkQueueName(String queueName) { + if (DataConstants.CF_QUEUE_NAME.equals(queueName) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueName)) { + throw new IllegalArgumentException(String.format("The queue name '%s' is not allowed. This name is reserved for internal use. Please choose a different name.", queueName)); + } + } + } diff --git a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java index 9ddf0f0a04..002f668fdc 100644 --- a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java @@ -48,6 +48,9 @@ import static org.awaitility.Awaitility.await; @DaoSqlTest public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTest { + public static final int TIMEOUT = 60; + public static final int POLL_INTERVAL = 1; + @BeforeEach void setUp() throws Exception { loginTenantAdmin(); @@ -86,6 +89,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); @@ -95,6 +99,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}")); await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); @@ -108,6 +113,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class); await().alias("update CF output -> perform calculation with updated output").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); assertThat(temperatureF).isNotNull(); @@ -119,6 +125,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class); await().alias("update CF argument -> perform calculation with new argument").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); assertThat(temperatureF).isNotNull(); @@ -129,6 +136,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class); await().alias("update CF expression -> perform calculation with new expression").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF"); assertThat(temperatureF).isNotNull(); @@ -166,6 +174,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); await().alias("create CF -> state is not ready -> no calculation performed").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); @@ -175,6 +184,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}")); await().alias("update telemetry -> perform calculation").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); @@ -213,6 +223,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); await().alias("create CF -> perform initial calculation with default value").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); @@ -222,6 +233,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}")); await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); @@ -277,6 +289,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/calculatedField", calculatedField, CalculatedField.class); await().alias("create CF and perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { // result of asset 1 ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); @@ -292,6 +305,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":25}")); await().alias("update device telemetry -> recalculate state for all assets").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { // result of asset 1 ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); @@ -307,6 +321,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/plugins/telemetry/ASSET/" + asset1.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":15}")); await().alias("update asset 1 telemetry -> recalculate state only for asset 1").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { // result of asset 1 ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); @@ -322,6 +337,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/plugins/telemetry/ASSET/" + asset2.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":5}")); await().alias("update asset 2 telemetry -> recalculate state only for asset 2").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { // result of asset 1 (no changes) ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); @@ -339,6 +355,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes Asset finalAsset3 = asset3; await().alias("add new entity to profile -> calculate state for new entity").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { // result of asset 3 ArrayNode z3 = getServerAttributes(finalAsset3.getId(), "z"); @@ -349,6 +366,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":20}")); await().alias("update device telemetry -> recalculate state for all assets").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { // result of asset 1 ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); @@ -375,6 +393,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes Asset updatedAsset3 = asset3; await().alias("update device telemetry -> recalculate state for asset 1 and asset 2").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { // result of asset 1 ArrayNode z1 = getServerAttributes(asset1.getId(), "z"); @@ -425,6 +444,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class); await().alias("create CF -> ctx is not initialized -> no calculation perform").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); @@ -434,6 +454,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}")); await().alias("update telemetry -> ctx is not initialized -> no calculation perform").atMost(TIMEOUT, TimeUnit.SECONDS) + .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS) .untilAsserted(() -> { ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp"); assertThat(fahrenheitTemp).isNotNull(); diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java index 77002cef8e..b0a3518e60 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java @@ -28,6 +28,7 @@ import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.exception.ThingsboardException; @@ -73,6 +74,7 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.containsString; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; @@ -325,6 +327,59 @@ public class BaseQueueControllerTest extends AbstractControllerTest { doDelete("/api/queues/" + queue.getUuidId()).andExpect(status().isOk()); } + @Test + public void testQueueWithReservedName() throws Exception { + loginSysAdmin(); + + // create queue + Queue queue = new Queue(); + queue.setName(DataConstants.CF_QUEUE_NAME); + queue.setTopic("tb_rule_engine.calculated_fields"); + queue.setPollInterval(25); + queue.setPartitions(10); + queue.setTenantId(TenantId.SYS_TENANT_ID); + queue.setConsumerPerPartition(false); + queue.setPackProcessingTimeout(2000); + SubmitStrategy submitStrategy = new SubmitStrategy(); + submitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR); + queue.setSubmitStrategy(submitStrategy); + ProcessingStrategy processingStrategy = new ProcessingStrategy(); + processingStrategy.setType(ProcessingStrategyType.RETRY_ALL); + processingStrategy.setRetries(3); + processingStrategy.setFailurePercentage(0.7); + processingStrategy.setPauseBetweenRetries(3); + processingStrategy.setMaxPauseBetweenRetries(5); + queue.setProcessingStrategy(processingStrategy); + + doPost("/api/queues?serviceType=" + "TB-RULE-ENGINE", queue) + .andExpect(status().isBadRequest()) + .andExpect(statusReason(containsString(String.format("The queue name '%s' is not allowed. This name is reserved for internal use. Please choose a different name.", DataConstants.CF_QUEUE_NAME)))); + + // create queue + Queue queue2 = new Queue(); + queue2.setName(DataConstants.CF_STATES_QUEUE_NAME); + queue2.setTopic("tb_rule_engine.calculated_fields"); + queue2.setPollInterval(25); + queue2.setPartitions(10); + queue2.setTenantId(TenantId.SYS_TENANT_ID); + queue2.setConsumerPerPartition(false); + queue2.setPackProcessingTimeout(2000); + SubmitStrategy submitStrategy2 = new SubmitStrategy(); + submitStrategy2.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR); + queue2.setSubmitStrategy(submitStrategy); + ProcessingStrategy processingStrategy2 = new ProcessingStrategy(); + processingStrategy2.setType(ProcessingStrategyType.RETRY_ALL); + processingStrategy2.setRetries(3); + processingStrategy2.setFailurePercentage(0.7); + processingStrategy2.setPauseBetweenRetries(3); + processingStrategy2.setMaxPauseBetweenRetries(5); + queue2.setProcessingStrategy(processingStrategy); + + doPost("/api/queues?serviceType=" + "TB-RULE-ENGINE", queue2) + .andExpect(status().isBadRequest()) + .andExpect(statusReason(containsString(String.format("The queue name '%s' is not allowed. This name is reserved for internal use. Please choose a different name.", DataConstants.CF_STATES_QUEUE_NAME)))); + } + private Queue saveQueue(Queue queue) { return doPost("/api/queues?serviceType=TB_RULE_ENGINE", queue, Queue.class); } diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 4a303e5253..dace159774 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -428,7 +428,6 @@ public class HashPartitionServiceTest { ReflectionTestUtils.setField(partitionService, "corePartitions", 10); ReflectionTestUtils.setField(partitionService, "cfEventTopic", "tb_cf_event"); ReflectionTestUtils.setField(partitionService, "cfStateTopic", "tb_cf_state"); - ReflectionTestUtils.setField(partitionService, "cfPartitions", 10); ReflectionTestUtils.setField(partitionService, "vcTopic", "tb.vc"); ReflectionTestUtils.setField(partitionService, "vcPartitions", 10); ReflectionTestUtils.setField(partitionService, "hashFunctionName", hashFunctionName);