From 18ce96fd10580d04912a589012fa1249c6824542 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Fri, 14 Jul 2023 13:14:01 +0300 Subject: [PATCH] Ability to update isolated processing option in tenant profile --- .../DefaultTbTenantProfileService.java | 9 +- .../server/controller/AbstractWebTest.java | 21 +- .../service/queue/QueueServiceTest.java | 225 ++++++++++++++++++ .../service/ttl/AlarmsCleanUpServiceTest.java | 2 +- .../server/actors/TbActorMailbox.java | 6 +- .../queue/discovery/HashPartitionService.java | 16 +- .../validator/TenantProfileDataValidator.java | 2 - .../profile/tenant-profile.component.ts | 49 +++- 8 files changed, 300 insertions(+), 30 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/service/queue/QueueServiceTest.java diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java index 1219a5b929..98385ef8f7 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java @@ -44,12 +44,11 @@ public class DefaultTbTenantProfileService extends AbstractTbEntityService imple @Override public TenantProfile save(TenantId tenantId, TenantProfile tenantProfile, TenantProfile oldTenantProfile) throws ThingsboardException { TenantProfile savedTenantProfile = checkNotNull(tenantProfileService.saveTenantProfile(tenantId, tenantProfile)); - if (oldTenantProfile != null && savedTenantProfile.isIsolatedTbRuleEngine()) { - List tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId()); - tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile); - } - tenantProfileCache.put(savedTenantProfile); + + List tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId()); + tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile); + tbClusterService.onTenantProfileChange(savedTenantProfile, null); tbClusterService.broadcastEntityStateChangeEvent(TenantId.SYS_TENANT_ID, savedTenantProfile.getId(), tenantProfile.getId() == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index c7580867e0..449164776a 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -969,13 +969,20 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return (DeviceActorMessageProcessor) ReflectionTestUtils.getField(actor, "processor"); } - protected void updateDefaultTenantProfile(Consumer updater) throws ThingsboardException { - TenantProfile tenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID); - TenantProfileData profileData = tenantProfile.getProfileData(); - DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration(); - updater.accept(profileConfiguration); - tenantProfile.setProfileData(profileData); - tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, null); + protected void updateDefaultTenantProfileConfig(Consumer updater) throws ThingsboardException { + updateDefaultTenantProfile(tenantProfile -> { + TenantProfileData profileData = tenantProfile.getProfileData(); + DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration(); + updater.accept(profileConfiguration); + tenantProfile.setProfileData(profileData); + }); + } + + protected void updateDefaultTenantProfile(Consumer updater) throws ThingsboardException { + TenantProfile oldTenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID); + TenantProfile tenantProfile = JacksonUtil.clone(oldTenantProfile); + updater.accept(tenantProfile); + tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, oldTenantProfile); } } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/QueueServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/QueueServiceTest.java new file mode 100644 index 0000000000..4d2c50add3 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/queue/QueueServiceTest.java @@ -0,0 +1,225 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.queue.ProcessingStrategy; +import org.thingsboard.server.common.data.queue.ProcessingStrategyType; +import org.thingsboard.server.common.data.queue.Queue; +import org.thingsboard.server.common.data.queue.SubmitStrategy; +import org.thingsboard.server.common.data.queue.SubmitStrategyType; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.controller.AbstractControllerTest; +import org.thingsboard.server.dao.queue.QueueService; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.service.entitiy.queue.TbQueueService; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +@DaoSqlTest +public class QueueServiceTest extends AbstractControllerTest { + + @SpyBean + private ActorSystemContext actorContext; + @Autowired + private TbQueueService tbQueueService; + @Autowired + private QueueService queueService; + @SpyBean + private PartitionService partitionService; + @Autowired + private TbDeviceProfileCache deviceProfileCache; + + @Before + public void beforeEach() { + Queue mainQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, DataConstants.MAIN_QUEUE_NAME); + if (mainQueue == null) { + mainQueue = new Queue(TenantId.SYS_TENANT_ID, getMainQueueConfig()); + tbQueueService.saveQueue(mainQueue); + } + + Queue hpQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, DataConstants.HP_QUEUE_NAME); + if (hpQueue == null) { + hpQueue = new Queue(TenantId.SYS_TENANT_ID, getHighPriorityQueueConfig()); + tbQueueService.saveQueue(hpQueue); + } + } + + @Test + public void testQueuesUpdateOnTenantProfileUpdate() throws Exception { + loginTenantAdmin(); + DeviceProfile hpQueueProfile = createDeviceProfile("HighPriority profile"); + hpQueueProfile.setDefaultQueueName(DataConstants.HP_QUEUE_NAME); + hpQueueProfile = doPost("/api/deviceProfile", hpQueueProfile, DeviceProfile.class); + Device hpQueueDevice = createDevice("HP", hpQueueProfile.getName(), "HP"); + deviceProfileCache.evict(tenantId, hpQueueProfile.getId()); + + DeviceProfile mainQueueProfile = createDeviceProfile("Main profile"); + mainQueueProfile.setDefaultQueueName(DataConstants.MAIN_QUEUE_NAME); + mainQueueProfile = doPost("/api/deviceProfile", mainQueueProfile, DeviceProfile.class); + Device mainQueueDevice = createDevice("Main", mainQueueProfile.getName(), "Main"); + + verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID); + }); + verifyUsedQueueAndMessage(DataConstants.MAIN_QUEUE_NAME, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID); + }); + + updateDefaultTenantProfile(tenantProfile -> { + tenantProfile.setIsolatedTbRuleEngine(true); + tenantProfile.getProfileData().setQueueConfiguration(List.of( + getMainQueueConfig() + )); + }); + + verifyUsedQueueAndMessage(DataConstants.MAIN_QUEUE_NAME, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId); + }); + verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTopic()).endsWith("main"); + assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId); + }); + + updateDefaultTenantProfile(tenantProfile -> { + tenantProfile.setIsolatedTbRuleEngine(true); + tenantProfile.getProfileData().setQueueConfiguration(List.of( + getMainQueueConfig(), getHighPriorityQueueConfig() + )); + }); + + verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTopic()).endsWith("hp"); + assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId); + }); + verifyUsedQueueAndMessage(DataConstants.MAIN_QUEUE_NAME, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId); + }); + } + + private void verifyUsedQueueAndMessage(String queue, EntityId entityId, String msgType, Runnable action, Consumer tpiAssert) { + await().atMost(15, TimeUnit.SECONDS) + .untilAsserted(() -> { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId); + tpiAssert.accept(tpi); + }); + action.run(); + TbMsg tbMsg = awaitTbMsg(msg -> msg.getOriginator().equals(entityId) + && msg.getType().equals(msgType), 10000); + assertThat(tbMsg.getQueueName()).isEqualTo(queue); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId); + tpiAssert.accept(tpi); + } + + protected TbMsg awaitTbMsg(Predicate predicate, int timeoutMillis) { + AtomicReference tbMsgCaptor = new AtomicReference<>(); + verify(actorContext, timeout(timeoutMillis).atLeastOnce()).tell(argThat(actorMsg -> { + if (!(actorMsg instanceof QueueToRuleEngineMsg)) { + return false; + } + TbMsg tbMsg = ((QueueToRuleEngineMsg) actorMsg).getMsg(); + if (predicate.test(tbMsg)) { + tbMsgCaptor.set(tbMsg); + return true; + } + return false; + })); + return tbMsgCaptor.get(); + } + + private TenantProfileQueueConfiguration getHighPriorityQueueConfig() { + TenantProfileQueueConfiguration hpQueueConfig = new TenantProfileQueueConfiguration(); + hpQueueConfig.setName(DataConstants.HP_QUEUE_NAME); + hpQueueConfig.setTopic(DataConstants.HP_QUEUE_TOPIC); + hpQueueConfig.setPollInterval(25); + hpQueueConfig.setPartitions(10); + hpQueueConfig.setConsumerPerPartition(true); + hpQueueConfig.setPackProcessingTimeout(2000); + SubmitStrategy highPriorityQueueSubmitStrategy = new SubmitStrategy(); + highPriorityQueueSubmitStrategy.setType(SubmitStrategyType.BURST); + highPriorityQueueSubmitStrategy.setBatchSize(100); + hpQueueConfig.setSubmitStrategy(highPriorityQueueSubmitStrategy); + ProcessingStrategy highPriorityQueueProcessingStrategy = new ProcessingStrategy(); + highPriorityQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT); + highPriorityQueueProcessingStrategy.setRetries(0); + highPriorityQueueProcessingStrategy.setFailurePercentage(0); + highPriorityQueueProcessingStrategy.setPauseBetweenRetries(5); + highPriorityQueueProcessingStrategy.setMaxPauseBetweenRetries(5); + hpQueueConfig.setProcessingStrategy(highPriorityQueueProcessingStrategy); + return hpQueueConfig; + } + + private TenantProfileQueueConfiguration getMainQueueConfig() { + TenantProfileQueueConfiguration mainQueue = new TenantProfileQueueConfiguration(); + mainQueue.setName(DataConstants.MAIN_QUEUE_NAME); + mainQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC); + mainQueue.setPollInterval(25); + mainQueue.setPartitions(10); + mainQueue.setConsumerPerPartition(true); + mainQueue.setPackProcessingTimeout(2000); + SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy(); + mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST); + mainQueueSubmitStrategy.setBatchSize(1000); + mainQueue.setSubmitStrategy(mainQueueSubmitStrategy); + ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy(); + mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES); + mainQueueProcessingStrategy.setRetries(3); + mainQueueProcessingStrategy.setFailurePercentage(0); + mainQueueProcessingStrategy.setPauseBetweenRetries(3); + mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3); + mainQueue.setProcessingStrategy(mainQueueProcessingStrategy); + return mainQueue; + } + +} diff --git a/application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java index 6cabff788e..09e78df9ff 100644 --- a/application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java @@ -67,7 +67,7 @@ public class AlarmsCleanUpServiceTest extends AbstractControllerTest { @Test public void testAlarmsCleanUp() throws Exception { int ttlDays = 1; - updateDefaultTenantProfile(profileConfiguration -> { + updateDefaultTenantProfileConfig(profileConfiguration -> { profileConfiguration.setAlarmsTtlDays(ttlDays); }); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index ad1604f7b0..857c45ad84 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -15,7 +15,8 @@ */ package org.thingsboard.server.actors; -import lombok.Data; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.msg.MsgType; @@ -31,7 +32,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; @Slf4j -@Data +@Getter +@RequiredArgsConstructor public final class TbActorMailbox implements TbActorCtx { private static final boolean HIGH_PRIORITY = true; private static final boolean NORMAL_PRIORITY = false; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 14fb0369ee..f56233b144 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -165,6 +165,9 @@ public class HashPartitionService implements PartitionService { partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic()); partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions()); myPartitions.remove(queueKey); + if (!tenantId.isSysTenantId()) { + tenantRoutingInfoMap.remove(tenantId); + } } @Override @@ -358,16 +361,9 @@ public class HashPartitionService implements PartitionService { if (TenantId.SYS_TENANT_ID.equals(tenantId)) { return false; } - TenantRoutingInfo routingInfo = tenantRoutingInfoMap.get(tenantId); - if (routingInfo == null) { - synchronized (tenantRoutingInfoMap) { - routingInfo = tenantRoutingInfoMap.get(tenantId); - if (routingInfo == null) { - routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId); - tenantRoutingInfoMap.put(tenantId, routingInfo); - } - } - } + TenantRoutingInfo routingInfo = tenantRoutingInfoMap.computeIfAbsent(tenantId, k -> { + return tenantRoutingInfoService.getRoutingInfo(tenantId); + }); if (routingInfo == null) { throw new TenantNotFoundException(tenantId); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java b/dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java index 0477ccf32f..aa166ba12e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java @@ -99,8 +99,6 @@ public class TenantProfileDataValidator extends DataValidator { TenantProfile old = tenantProfileDao.findById(TenantId.SYS_TENANT_ID, tenantProfile.getId().getId()); if (old == null) { throw new DataValidationException("Can't update non existing tenant profile!"); - } else if (old.isIsolatedTbRuleEngine() != tenantProfile.isIsolatedTbRuleEngine()) { - throw new DataValidationException("Can't update isolatedTbRuleEngine property!"); } return old; } diff --git a/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts b/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts index 0257fb35af..2aafa97d99 100644 --- a/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts +++ b/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts @@ -76,6 +76,52 @@ export class TenantProfileComponent extends EntityComponent { additionalInfo: { description: '' } + }, + { + id: guid(), + name: 'HighPriority', + topic: 'tb_rule_engine.hp', + pollInterval: 25, + partitions: 10, + consumerPerPartition: true, + packProcessingTimeout: 2000, + submitStrategy: { + type: 'BURST', + batchSize: 100 + }, + processingStrategy: { + type: 'RETRY_FAILED_AND_TIMED_OUT', + retries: 0, + failurePercentage: 0, + pauseBetweenRetries: 5, + maxPauseBetweenRetries: 5 + }, + additionalInfo: { + description: '' + } + }, + { + id: guid(), + name: 'SequentialByOriginator', + topic: 'tb_rule_engine.sq', + pollInterval: 25, + partitions: 10, + consumerPerPartition: true, + packProcessingTimeout: 2000, + submitStrategy: { + type: 'SEQUENTIAL_BY_ORIGINATOR', + batchSize: 100 + }, + processingStrategy: { + type: 'RETRY_FAILED_AND_TIMED_OUT', + retries: 3, + failurePercentage: 0, + pauseBetweenRetries: 5, + maxPauseBetweenRetries: 5 + }, + additionalInfo: { + description: '' + } } ]; const formGroup = this.fb.group( @@ -118,9 +164,6 @@ export class TenantProfileComponent extends EntityComponent { if (this.entityForm) { if (this.isEditValue) { this.entityForm.enable({emitEvent: false}); - if (!this.isAdd) { - this.entityForm.get('isolatedTbRuleEngine').disable({emitEvent: false}); - } } else { this.entityForm.disable({emitEvent: false}); }