Merge pull request #12889 from irynamatveieva/calculated-fields
Calculated fields: improvements and fixes
This commit is contained in:
		
						commit
						e4123446d7
					
				@ -287,7 +287,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
 | 
			
		||||
                state.checkStateSize(ctxId, ctx.getMaxStateSize());
 | 
			
		||||
                stateSizeChecked = true;
 | 
			
		||||
                if (state.isSizeOk()) {
 | 
			
		||||
                    cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
 | 
			
		||||
                    if (!calculationResult.isEmpty()) {
 | 
			
		||||
                        cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        callback.onSuccess();
 | 
			
		||||
                    }
 | 
			
		||||
                    if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
 | 
			
		||||
                        systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
@ -27,4 +27,11 @@ public final class CalculatedFieldResult {
 | 
			
		||||
    private final AttributeScope scope;
 | 
			
		||||
    private final JsonNode result;
 | 
			
		||||
 | 
			
		||||
    public boolean isEmpty() {
 | 
			
		||||
        return result == null || result.isMissingNode() || result.isNull() ||
 | 
			
		||||
                (result.isObject() && result.isEmpty()) ||
 | 
			
		||||
                (result.isArray() && result.isEmpty()) ||
 | 
			
		||||
                (result.isTextual() && result.asText().isEmpty());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -83,7 +83,7 @@ public class CalculatedFieldCtx {
 | 
			
		||||
        for (Map.Entry<String, Argument> entry : arguments.entrySet()) {
 | 
			
		||||
            var refId = entry.getValue().getRefEntityId();
 | 
			
		||||
            var refKey = entry.getValue().getRefEntityKey();
 | 
			
		||||
            if (refId == null) {
 | 
			
		||||
            if (refId == null || refId.equals(calculatedField.getEntityId())) {
 | 
			
		||||
                mainEntityArguments.put(refKey, entry.getKey());
 | 
			
		||||
            } else {
 | 
			
		||||
                linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()).put(refKey, entry.getKey());
 | 
			
		||||
 | 
			
		||||
@ -48,6 +48,9 @@ import static org.awaitility.Awaitility.await;
 | 
			
		||||
@DaoSqlTest
 | 
			
		||||
public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTest {
 | 
			
		||||
 | 
			
		||||
    public static final int TIMEOUT = 60;
 | 
			
		||||
    public static final int POLL_INTERVAL = 1;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    void setUp() throws Exception {
 | 
			
		||||
        loginTenantAdmin();
 | 
			
		||||
@ -86,6 +89,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
 | 
			
		||||
 | 
			
		||||
        await().alias("create CF -> perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
 | 
			
		||||
                    assertThat(fahrenheitTemp).isNotNull();
 | 
			
		||||
@ -95,6 +99,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
 | 
			
		||||
 | 
			
		||||
        await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
 | 
			
		||||
                    assertThat(fahrenheitTemp).isNotNull();
 | 
			
		||||
@ -108,6 +113,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
 | 
			
		||||
 | 
			
		||||
        await().alias("update CF output -> perform calculation with updated output").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
 | 
			
		||||
                    assertThat(temperatureF).isNotNull();
 | 
			
		||||
@ -119,6 +125,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
 | 
			
		||||
 | 
			
		||||
        await().alias("update CF argument -> perform calculation with new argument").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
 | 
			
		||||
                    assertThat(temperatureF).isNotNull();
 | 
			
		||||
@ -129,6 +136,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        savedCalculatedField = doPost("/api/calculatedField", savedCalculatedField, CalculatedField.class);
 | 
			
		||||
 | 
			
		||||
        await().alias("update CF expression -> perform calculation with new expression").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ArrayNode temperatureF = getServerAttributes(testDevice.getId(), "temperatureF");
 | 
			
		||||
                    assertThat(temperatureF).isNotNull();
 | 
			
		||||
@ -166,6 +174,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
 | 
			
		||||
 | 
			
		||||
        await().alias("create CF -> state is not ready -> no calculation performed").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
 | 
			
		||||
                    assertThat(fahrenheitTemp).isNotNull();
 | 
			
		||||
@ -175,6 +184,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
 | 
			
		||||
 | 
			
		||||
        await().alias("update telemetry -> perform calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
 | 
			
		||||
                    assertThat(fahrenheitTemp).isNotNull();
 | 
			
		||||
@ -213,6 +223,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
 | 
			
		||||
 | 
			
		||||
        await().alias("create CF -> perform initial calculation with default value").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
 | 
			
		||||
                    assertThat(fahrenheitTemp).isNotNull();
 | 
			
		||||
@ -222,6 +233,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
 | 
			
		||||
 | 
			
		||||
        await().alias("update telemetry -> recalculate state").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
 | 
			
		||||
                    assertThat(fahrenheitTemp).isNotNull();
 | 
			
		||||
@ -277,6 +289,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/calculatedField", calculatedField, CalculatedField.class);
 | 
			
		||||
 | 
			
		||||
        await().alias("create CF and perform initial calculation").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    // result of asset 1
 | 
			
		||||
                    ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
 | 
			
		||||
@ -292,6 +305,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":25}"));
 | 
			
		||||
 | 
			
		||||
        await().alias("update device telemetry -> recalculate state for all assets").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    // result of asset 1
 | 
			
		||||
                    ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
 | 
			
		||||
@ -307,6 +321,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/plugins/telemetry/ASSET/" + asset1.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":15}"));
 | 
			
		||||
 | 
			
		||||
        await().alias("update asset 1 telemetry -> recalculate state only for asset 1").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    // result of asset 1
 | 
			
		||||
                    ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
 | 
			
		||||
@ -322,6 +337,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/plugins/telemetry/ASSET/" + asset2.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"y\":5}"));
 | 
			
		||||
 | 
			
		||||
        await().alias("update asset 2 telemetry -> recalculate state only for asset 2").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    // result of asset 1 (no changes)
 | 
			
		||||
                    ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
 | 
			
		||||
@ -339,6 +355,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
 | 
			
		||||
        Asset finalAsset3 = asset3;
 | 
			
		||||
        await().alias("add new entity to profile -> calculate state for new entity").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    // result of asset 3
 | 
			
		||||
                    ArrayNode z3 = getServerAttributes(finalAsset3.getId(), "z");
 | 
			
		||||
@ -349,6 +366,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/attributes/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"x\":20}"));
 | 
			
		||||
 | 
			
		||||
        await().alias("update device telemetry -> recalculate state for all assets").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    // result of asset 1
 | 
			
		||||
                    ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
 | 
			
		||||
@ -375,6 +393,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
 | 
			
		||||
        Asset updatedAsset3 = asset3;
 | 
			
		||||
        await().alias("update device telemetry -> recalculate state for asset 1 and asset 2").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    // result of asset 1
 | 
			
		||||
                    ArrayNode z1 = getServerAttributes(asset1.getId(), "z");
 | 
			
		||||
@ -425,6 +444,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        CalculatedField savedCalculatedField = doPost("/api/calculatedField", calculatedField, CalculatedField.class);
 | 
			
		||||
 | 
			
		||||
        await().alias("create CF -> ctx is not initialized -> no calculation perform").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
 | 
			
		||||
                    assertThat(fahrenheitTemp).isNotNull();
 | 
			
		||||
@ -434,6 +454,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
 | 
			
		||||
        doPost("/api/plugins/telemetry/DEVICE/" + testDevice.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode("{\"temperature\":30}"));
 | 
			
		||||
 | 
			
		||||
        await().alias("update telemetry -> ctx is not initialized -> no calculation perform").atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .pollInterval(POLL_INTERVAL, TimeUnit.SECONDS)
 | 
			
		||||
                .untilAsserted(() -> {
 | 
			
		||||
                    ObjectNode fahrenheitTemp = getLatestTelemetry(testDevice.getId(), "fahrenheitTemp");
 | 
			
		||||
                    assertThat(fahrenheitTemp).isNotNull();
 | 
			
		||||
 | 
			
		||||
@ -28,6 +28,7 @@ import org.springframework.boot.test.mock.mockito.SpyBean;
 | 
			
		||||
import org.springframework.test.context.TestPropertySource;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.DeviceProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
 | 
			
		||||
@ -73,6 +74,7 @@ import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.awaitility.Awaitility.await;
 | 
			
		||||
import static org.hamcrest.Matchers.containsString;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyLong;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.argThat;
 | 
			
		||||
@ -325,6 +327,59 @@ public class BaseQueueControllerTest extends AbstractControllerTest {
 | 
			
		||||
        doDelete("/api/queues/" + queue.getUuidId()).andExpect(status().isOk());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testQueueWithReservedName() throws Exception {
 | 
			
		||||
        loginSysAdmin();
 | 
			
		||||
 | 
			
		||||
        // create queue
 | 
			
		||||
        Queue queue = new Queue();
 | 
			
		||||
        queue.setName(DataConstants.CF_QUEUE_NAME);
 | 
			
		||||
        queue.setTopic("tb_rule_engine.calculated_fields");
 | 
			
		||||
        queue.setPollInterval(25);
 | 
			
		||||
        queue.setPartitions(10);
 | 
			
		||||
        queue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
        queue.setConsumerPerPartition(false);
 | 
			
		||||
        queue.setPackProcessingTimeout(2000);
 | 
			
		||||
        SubmitStrategy submitStrategy = new SubmitStrategy();
 | 
			
		||||
        submitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR);
 | 
			
		||||
        queue.setSubmitStrategy(submitStrategy);
 | 
			
		||||
        ProcessingStrategy processingStrategy = new ProcessingStrategy();
 | 
			
		||||
        processingStrategy.setType(ProcessingStrategyType.RETRY_ALL);
 | 
			
		||||
        processingStrategy.setRetries(3);
 | 
			
		||||
        processingStrategy.setFailurePercentage(0.7);
 | 
			
		||||
        processingStrategy.setPauseBetweenRetries(3);
 | 
			
		||||
        processingStrategy.setMaxPauseBetweenRetries(5);
 | 
			
		||||
        queue.setProcessingStrategy(processingStrategy);
 | 
			
		||||
 | 
			
		||||
        doPost("/api/queues?serviceType=" + "TB-RULE-ENGINE", queue)
 | 
			
		||||
                .andExpect(status().isBadRequest())
 | 
			
		||||
                .andExpect(statusReason(containsString(String.format("The queue name '%s' is not allowed. This name is reserved for internal use. Please choose a different name.", DataConstants.CF_QUEUE_NAME))));
 | 
			
		||||
 | 
			
		||||
        // create queue
 | 
			
		||||
        Queue queue2 = new Queue();
 | 
			
		||||
        queue2.setName(DataConstants.CF_STATES_QUEUE_NAME);
 | 
			
		||||
        queue2.setTopic("tb_rule_engine.calculated_fields");
 | 
			
		||||
        queue2.setPollInterval(25);
 | 
			
		||||
        queue2.setPartitions(10);
 | 
			
		||||
        queue2.setTenantId(TenantId.SYS_TENANT_ID);
 | 
			
		||||
        queue2.setConsumerPerPartition(false);
 | 
			
		||||
        queue2.setPackProcessingTimeout(2000);
 | 
			
		||||
        SubmitStrategy submitStrategy2 = new SubmitStrategy();
 | 
			
		||||
        submitStrategy2.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR);
 | 
			
		||||
        queue2.setSubmitStrategy(submitStrategy);
 | 
			
		||||
        ProcessingStrategy processingStrategy2 = new ProcessingStrategy();
 | 
			
		||||
        processingStrategy2.setType(ProcessingStrategyType.RETRY_ALL);
 | 
			
		||||
        processingStrategy2.setRetries(3);
 | 
			
		||||
        processingStrategy2.setFailurePercentage(0.7);
 | 
			
		||||
        processingStrategy2.setPauseBetweenRetries(3);
 | 
			
		||||
        processingStrategy2.setMaxPauseBetweenRetries(5);
 | 
			
		||||
        queue2.setProcessingStrategy(processingStrategy);
 | 
			
		||||
 | 
			
		||||
        doPost("/api/queues?serviceType=" + "TB-RULE-ENGINE", queue2)
 | 
			
		||||
                .andExpect(status().isBadRequest())
 | 
			
		||||
                .andExpect(statusReason(containsString(String.format("The queue name '%s' is not allowed. This name is reserved for internal use. Please choose a different name.", DataConstants.CF_STATES_QUEUE_NAME))));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Queue saveQueue(Queue queue) {
 | 
			
		||||
        return doPost("/api/queues?serviceType=TB_RULE_ENGINE", queue, Queue.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -428,7 +428,6 @@ public class HashPartitionServiceTest {
 | 
			
		||||
        ReflectionTestUtils.setField(partitionService, "corePartitions", 10);
 | 
			
		||||
        ReflectionTestUtils.setField(partitionService, "cfEventTopic", "tb_cf_event");
 | 
			
		||||
        ReflectionTestUtils.setField(partitionService, "cfStateTopic", "tb_cf_state");
 | 
			
		||||
        ReflectionTestUtils.setField(partitionService, "cfPartitions", 10);
 | 
			
		||||
        ReflectionTestUtils.setField(partitionService, "vcTopic", "tb.vc");
 | 
			
		||||
        ReflectionTestUtils.setField(partitionService, "vcPartitions", 10);
 | 
			
		||||
        ReflectionTestUtils.setField(partitionService, "hashFunctionName", hashFunctionName);
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
import org.thingsboard.server.common.data.BaseData;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
@ -158,6 +159,9 @@ public abstract class DataValidator<D extends BaseData<?>> {
 | 
			
		||||
 | 
			
		||||
    protected static void validateQueueName(String name) {
 | 
			
		||||
        validateQueueNameOrTopic(name, NAME);
 | 
			
		||||
        if (DataConstants.CF_QUEUE_NAME.equals(name) || DataConstants.CF_STATES_QUEUE_NAME.equals(name)) {
 | 
			
		||||
            throw new DataValidationException(String.format("The queue name '%s' is not allowed. This name is reserved for internal use. Please choose a different name.", name));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected static void validateQueueTopic(String topic) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user