Ability to update isolated processing option in tenant profile

This commit is contained in:
ViacheslavKlimov 2023-07-14 13:14:01 +03:00
parent 04a9f41e52
commit 18ce96fd10
8 changed files with 300 additions and 30 deletions

View File

@ -44,12 +44,11 @@ public class DefaultTbTenantProfileService extends AbstractTbEntityService imple
@Override
public TenantProfile save(TenantId tenantId, TenantProfile tenantProfile, TenantProfile oldTenantProfile) throws ThingsboardException {
TenantProfile savedTenantProfile = checkNotNull(tenantProfileService.saveTenantProfile(tenantId, tenantProfile));
if (oldTenantProfile != null && savedTenantProfile.isIsolatedTbRuleEngine()) {
List<TenantId> tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId());
tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile);
}
tenantProfileCache.put(savedTenantProfile);
List<TenantId> tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId());
tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile);
tbClusterService.onTenantProfileChange(savedTenantProfile, null);
tbClusterService.broadcastEntityStateChangeEvent(TenantId.SYS_TENANT_ID, savedTenantProfile.getId(),
tenantProfile.getId() == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);

View File

@ -969,13 +969,20 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return (DeviceActorMessageProcessor) ReflectionTestUtils.getField(actor, "processor");
}
protected void updateDefaultTenantProfile(Consumer<DefaultTenantProfileConfiguration> updater) throws ThingsboardException {
TenantProfile tenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID);
TenantProfileData profileData = tenantProfile.getProfileData();
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
updater.accept(profileConfiguration);
tenantProfile.setProfileData(profileData);
tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, null);
protected void updateDefaultTenantProfileConfig(Consumer<DefaultTenantProfileConfiguration> updater) throws ThingsboardException {
updateDefaultTenantProfile(tenantProfile -> {
TenantProfileData profileData = tenantProfile.getProfileData();
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
updater.accept(profileConfiguration);
tenantProfile.setProfileData(profileData);
});
}
protected void updateDefaultTenantProfile(Consumer<TenantProfile> updater) throws ThingsboardException {
TenantProfile oldTenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID);
TenantProfile tenantProfile = JacksonUtil.clone(oldTenantProfile);
updater.accept(tenantProfile);
tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, oldTenantProfile);
}
}

View File

@ -0,0 +1,225 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.SubmitStrategy;
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.service.entitiy.queue.TbQueueService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@DaoSqlTest
public class QueueServiceTest extends AbstractControllerTest {
@SpyBean
private ActorSystemContext actorContext;
@Autowired
private TbQueueService tbQueueService;
@Autowired
private QueueService queueService;
@SpyBean
private PartitionService partitionService;
@Autowired
private TbDeviceProfileCache deviceProfileCache;
@Before
public void beforeEach() {
Queue mainQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, DataConstants.MAIN_QUEUE_NAME);
if (mainQueue == null) {
mainQueue = new Queue(TenantId.SYS_TENANT_ID, getMainQueueConfig());
tbQueueService.saveQueue(mainQueue);
}
Queue hpQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, DataConstants.HP_QUEUE_NAME);
if (hpQueue == null) {
hpQueue = new Queue(TenantId.SYS_TENANT_ID, getHighPriorityQueueConfig());
tbQueueService.saveQueue(hpQueue);
}
}
@Test
public void testQueuesUpdateOnTenantProfileUpdate() throws Exception {
loginTenantAdmin();
DeviceProfile hpQueueProfile = createDeviceProfile("HighPriority profile");
hpQueueProfile.setDefaultQueueName(DataConstants.HP_QUEUE_NAME);
hpQueueProfile = doPost("/api/deviceProfile", hpQueueProfile, DeviceProfile.class);
Device hpQueueDevice = createDevice("HP", hpQueueProfile.getName(), "HP");
deviceProfileCache.evict(tenantId, hpQueueProfile.getId());
DeviceProfile mainQueueProfile = createDeviceProfile("Main profile");
mainQueueProfile.setDefaultQueueName(DataConstants.MAIN_QUEUE_NAME);
mainQueueProfile = doPost("/api/deviceProfile", mainQueueProfile, DeviceProfile.class);
Device mainQueueDevice = createDevice("Main", mainQueueProfile.getName(), "Main");
verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID);
});
verifyUsedQueueAndMessage(DataConstants.MAIN_QUEUE_NAME, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID);
});
updateDefaultTenantProfile(tenantProfile -> {
tenantProfile.setIsolatedTbRuleEngine(true);
tenantProfile.getProfileData().setQueueConfiguration(List.of(
getMainQueueConfig()
));
});
verifyUsedQueueAndMessage(DataConstants.MAIN_QUEUE_NAME, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId);
});
verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTopic()).endsWith("main");
assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId);
});
updateDefaultTenantProfile(tenantProfile -> {
tenantProfile.setIsolatedTbRuleEngine(true);
tenantProfile.getProfileData().setQueueConfiguration(List.of(
getMainQueueConfig(), getHighPriorityQueueConfig()
));
});
verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTopic()).endsWith("hp");
assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId);
});
verifyUsedQueueAndMessage(DataConstants.MAIN_QUEUE_NAME, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId);
});
}
private void verifyUsedQueueAndMessage(String queue, EntityId entityId, String msgType, Runnable action, Consumer<TopicPartitionInfo> tpiAssert) {
await().atMost(15, TimeUnit.SECONDS)
.untilAsserted(() -> {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId);
tpiAssert.accept(tpi);
});
action.run();
TbMsg tbMsg = awaitTbMsg(msg -> msg.getOriginator().equals(entityId)
&& msg.getType().equals(msgType), 10000);
assertThat(tbMsg.getQueueName()).isEqualTo(queue);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId);
tpiAssert.accept(tpi);
}
protected TbMsg awaitTbMsg(Predicate<TbMsg> predicate, int timeoutMillis) {
AtomicReference<TbMsg> tbMsgCaptor = new AtomicReference<>();
verify(actorContext, timeout(timeoutMillis).atLeastOnce()).tell(argThat(actorMsg -> {
if (!(actorMsg instanceof QueueToRuleEngineMsg)) {
return false;
}
TbMsg tbMsg = ((QueueToRuleEngineMsg) actorMsg).getMsg();
if (predicate.test(tbMsg)) {
tbMsgCaptor.set(tbMsg);
return true;
}
return false;
}));
return tbMsgCaptor.get();
}
private TenantProfileQueueConfiguration getHighPriorityQueueConfig() {
TenantProfileQueueConfiguration hpQueueConfig = new TenantProfileQueueConfiguration();
hpQueueConfig.setName(DataConstants.HP_QUEUE_NAME);
hpQueueConfig.setTopic(DataConstants.HP_QUEUE_TOPIC);
hpQueueConfig.setPollInterval(25);
hpQueueConfig.setPartitions(10);
hpQueueConfig.setConsumerPerPartition(true);
hpQueueConfig.setPackProcessingTimeout(2000);
SubmitStrategy highPriorityQueueSubmitStrategy = new SubmitStrategy();
highPriorityQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
highPriorityQueueSubmitStrategy.setBatchSize(100);
hpQueueConfig.setSubmitStrategy(highPriorityQueueSubmitStrategy);
ProcessingStrategy highPriorityQueueProcessingStrategy = new ProcessingStrategy();
highPriorityQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT);
highPriorityQueueProcessingStrategy.setRetries(0);
highPriorityQueueProcessingStrategy.setFailurePercentage(0);
highPriorityQueueProcessingStrategy.setPauseBetweenRetries(5);
highPriorityQueueProcessingStrategy.setMaxPauseBetweenRetries(5);
hpQueueConfig.setProcessingStrategy(highPriorityQueueProcessingStrategy);
return hpQueueConfig;
}
private TenantProfileQueueConfiguration getMainQueueConfig() {
TenantProfileQueueConfiguration mainQueue = new TenantProfileQueueConfiguration();
mainQueue.setName(DataConstants.MAIN_QUEUE_NAME);
mainQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
mainQueue.setPollInterval(25);
mainQueue.setPartitions(10);
mainQueue.setConsumerPerPartition(true);
mainQueue.setPackProcessingTimeout(2000);
SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
mainQueueSubmitStrategy.setBatchSize(1000);
mainQueue.setSubmitStrategy(mainQueueSubmitStrategy);
ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
mainQueueProcessingStrategy.setRetries(3);
mainQueueProcessingStrategy.setFailurePercentage(0);
mainQueueProcessingStrategy.setPauseBetweenRetries(3);
mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
mainQueue.setProcessingStrategy(mainQueueProcessingStrategy);
return mainQueue;
}
}

View File

@ -67,7 +67,7 @@ public class AlarmsCleanUpServiceTest extends AbstractControllerTest {
@Test
public void testAlarmsCleanUp() throws Exception {
int ttlDays = 1;
updateDefaultTenantProfile(profileConfiguration -> {
updateDefaultTenantProfileConfig(profileConfiguration -> {
profileConfiguration.setAlarmsTtlDays(ttlDays);
});

View File

@ -15,7 +15,8 @@
*/
package org.thingsboard.server.actors;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.msg.MsgType;
@ -31,7 +32,8 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
@Slf4j
@Data
@Getter
@RequiredArgsConstructor
public final class TbActorMailbox implements TbActorCtx {
private static final boolean HIGH_PRIORITY = true;
private static final boolean NORMAL_PRIORITY = false;

View File

@ -165,6 +165,9 @@ public class HashPartitionService implements PartitionService {
partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic());
partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions());
myPartitions.remove(queueKey);
if (!tenantId.isSysTenantId()) {
tenantRoutingInfoMap.remove(tenantId);
}
}
@Override
@ -358,16 +361,9 @@ public class HashPartitionService implements PartitionService {
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
return false;
}
TenantRoutingInfo routingInfo = tenantRoutingInfoMap.get(tenantId);
if (routingInfo == null) {
synchronized (tenantRoutingInfoMap) {
routingInfo = tenantRoutingInfoMap.get(tenantId);
if (routingInfo == null) {
routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId);
tenantRoutingInfoMap.put(tenantId, routingInfo);
}
}
}
TenantRoutingInfo routingInfo = tenantRoutingInfoMap.computeIfAbsent(tenantId, k -> {
return tenantRoutingInfoService.getRoutingInfo(tenantId);
});
if (routingInfo == null) {
throw new TenantNotFoundException(tenantId);
}

View File

@ -99,8 +99,6 @@ public class TenantProfileDataValidator extends DataValidator<TenantProfile> {
TenantProfile old = tenantProfileDao.findById(TenantId.SYS_TENANT_ID, tenantProfile.getId().getId());
if (old == null) {
throw new DataValidationException("Can't update non existing tenant profile!");
} else if (old.isIsolatedTbRuleEngine() != tenantProfile.isIsolatedTbRuleEngine()) {
throw new DataValidationException("Can't update isolatedTbRuleEngine property!");
}
return old;
}

View File

@ -76,6 +76,52 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
additionalInfo: {
description: ''
}
},
{
id: guid(),
name: 'HighPriority',
topic: 'tb_rule_engine.hp',
pollInterval: 25,
partitions: 10,
consumerPerPartition: true,
packProcessingTimeout: 2000,
submitStrategy: {
type: 'BURST',
batchSize: 100
},
processingStrategy: {
type: 'RETRY_FAILED_AND_TIMED_OUT',
retries: 0,
failurePercentage: 0,
pauseBetweenRetries: 5,
maxPauseBetweenRetries: 5
},
additionalInfo: {
description: ''
}
},
{
id: guid(),
name: 'SequentialByOriginator',
topic: 'tb_rule_engine.sq',
pollInterval: 25,
partitions: 10,
consumerPerPartition: true,
packProcessingTimeout: 2000,
submitStrategy: {
type: 'SEQUENTIAL_BY_ORIGINATOR',
batchSize: 100
},
processingStrategy: {
type: 'RETRY_FAILED_AND_TIMED_OUT',
retries: 3,
failurePercentage: 0,
pauseBetweenRetries: 5,
maxPauseBetweenRetries: 5
},
additionalInfo: {
description: ''
}
}
];
const formGroup = this.fb.group(
@ -118,9 +164,6 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
if (this.entityForm) {
if (this.isEditValue) {
this.entityForm.enable({emitEvent: false});
if (!this.isAdd) {
this.entityForm.get('isolatedTbRuleEngine').disable({emitEvent: false});
}
} else {
this.entityForm.disable({emitEvent: false});
}