From 2c7e5ef5f8c13bac554c8927b93d6fa6945be6f1 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 20 Sep 2023 17:25:12 +0300 Subject: [PATCH] Added TenantProfileEdgeTest.testIsolatedTenantProfile --- .../server/edge/TenantProfileEdgeTest.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/edge/TenantProfileEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/TenantProfileEdgeTest.java index ad94a8ba11..7ec32f8aff 100644 --- a/application/src/test/java/org/thingsboard/server/edge/TenantProfileEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/TenantProfileEdgeTest.java @@ -18,11 +18,23 @@ package org.thingsboard.server.edge; import com.google.protobuf.AbstractMessage; import org.junit.Assert; import org.junit.Test; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.queue.ProcessingStrategy; +import org.thingsboard.server.common.data.queue.ProcessingStrategyType; +import org.thingsboard.server.common.data.queue.SubmitStrategy; +import org.thingsboard.server.common.data.queue.SubmitStrategyType; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.gen.edge.v1.QueueUpdateMsg; import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg; +import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + @DaoSqlTest public class TenantProfileEdgeTest extends AbstractEdgeTest { @@ -51,4 +63,65 @@ public class TenantProfileEdgeTest extends AbstractEdgeTest { loginTenantAdmin(); } + + @Test + public void testIsolatedTenantProfile() throws Exception { + loginSysAdmin(); + + TenantProfile edgeTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class); + + // set tenant profile isolated and add 2 queues - main and isolated + edgeTenantProfile.setIsolatedTbRuleEngine(true); + TenantProfileQueueConfiguration mainQueueConfiguration = createQueueConfig(DataConstants.MAIN_QUEUE_NAME, DataConstants.MAIN_QUEUE_TOPIC); + TenantProfileQueueConfiguration isolatedQueueConfiguration = createQueueConfig("IsolatedHighPriority", "tb_rule_engine.isolated_hp"); + edgeTenantProfile.getProfileData().setQueueConfiguration(List.of(mainQueueConfiguration, isolatedQueueConfiguration)); + edgeImitator.expectMessageAmount(1); + edgeTenantProfile = doPost("/api/tenantProfile", edgeTenantProfile, TenantProfile.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof TenantProfileUpdateMsg); + TenantProfileUpdateMsg tenantProfileUpdateMsg = (TenantProfileUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, tenantProfileUpdateMsg.getMsgType()); + Assert.assertEquals(edgeTenantProfile.getUuidId().getMostSignificantBits(), tenantProfileUpdateMsg.getIdMSB()); + Assert.assertEquals(edgeTenantProfile.getUuidId().getLeastSignificantBits(), tenantProfileUpdateMsg.getIdLSB()); + Assert.assertEquals(edgeTenantProfile.getDescription(), tenantProfileUpdateMsg.getDescription()); + + loginTenantAdmin(); + + edgeImitator.expectMessageAmount(21); + doPost("/api/edge/sync/" + edge.getId()); + assertThat(edgeImitator.waitForMessages()).as("await for messages after edge sync rest api call").isTrue(); + + Assert.assertTrue(edgeImitator.getDownlinkMsgs().get(0) instanceof TenantUpdateMsg); + Assert.assertTrue(edgeImitator.getDownlinkMsgs().get(1) instanceof TenantProfileUpdateMsg); + + List queueUpdateMsgs = edgeImitator.findAllMessagesByType(QueueUpdateMsg.class); + Assert.assertEquals(2, queueUpdateMsgs.size()); + for (QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) { + Assert.assertEquals(tenantId.getId().getMostSignificantBits(), queueUpdateMsg.getTenantIdMSB()); + Assert.assertEquals(tenantId.getId().getLeastSignificantBits(), queueUpdateMsg.getTenantIdLSB()); + } + } + + private TenantProfileQueueConfiguration createQueueConfig(String queueName, String queueTopic) { + TenantProfileQueueConfiguration queueConfiguration = new TenantProfileQueueConfiguration(); + queueConfiguration.setName(queueName); + queueConfiguration.setTopic(queueTopic); + queueConfiguration.setPollInterval(25); + queueConfiguration.setPartitions(10); + queueConfiguration.setConsumerPerPartition(true); + queueConfiguration.setPackProcessingTimeout(2000); + SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy(); + mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST); + mainQueueSubmitStrategy.setBatchSize(1000); + queueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy); + ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy(); + mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES); + mainQueueProcessingStrategy.setRetries(3); + mainQueueProcessingStrategy.setFailurePercentage(0); + mainQueueProcessingStrategy.setPauseBetweenRetries(3); + mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3); + queueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy); + return queueConfiguration; + } }