Merge branch 'master' into release-3.6
This commit is contained in:
commit
6a9fab766e
@ -280,9 +280,9 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
try {
|
||||
if (msg.getSuccess()) {
|
||||
sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
|
||||
log.debug("[{}][{}] Msg has been processed successfully!Msd Id: [{}], Msg: {}", this.tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg);
|
||||
log.debug("[{}][{}] Msg has been processed successfully!Msg Id: [{}], Msg: {}", this.tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg);
|
||||
} else {
|
||||
log.error("[{}][{}] Msg processing failed! Msd Id: [{}], Error msg: {}", this.tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg.getErrorMsg());
|
||||
log.error("[{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", this.tenantId, edge.getRoutingKey(), msg.getDownlinkMsgId(), msg.getErrorMsg());
|
||||
}
|
||||
if (sessionState.getPendingMsgsMap().isEmpty()) {
|
||||
log.debug("[{}][{}] Pending msgs map is empty. Stopping current iteration", this.tenantId, edge.getRoutingKey());
|
||||
|
||||
@ -52,10 +52,10 @@ public class EdgeSyncCursor {
|
||||
|
||||
public EdgeSyncCursor(EdgeContextComponent ctx, Edge edge, boolean fullSync) {
|
||||
if (fullSync) {
|
||||
fetchers.add(new TenantEdgeEventFetcher(ctx.getTenantService()));
|
||||
fetchers.add(new QueuesEdgeEventFetcher(ctx.getQueueService()));
|
||||
fetchers.add(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService()));
|
||||
fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService(), ctx.getFreemarkerConfig()));
|
||||
fetchers.add(new TenantEdgeEventFetcher(ctx.getTenantService()));
|
||||
fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService()));
|
||||
Customer publicCustomer = ctx.getCustomerService().findOrCreatePublicCustomer(edge.getTenantId());
|
||||
fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId()));
|
||||
|
||||
@ -18,11 +18,23 @@ package org.thingsboard.server.edge;
|
||||
import com.google.protobuf.AbstractMessage;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
|
||||
import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
|
||||
import org.thingsboard.server.common.data.queue.SubmitStrategy;
|
||||
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
|
||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.gen.edge.v1.QueueUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@DaoSqlTest
|
||||
public class TenantProfileEdgeTest extends AbstractEdgeTest {
|
||||
|
||||
@ -51,4 +63,65 @@ public class TenantProfileEdgeTest extends AbstractEdgeTest {
|
||||
|
||||
loginTenantAdmin();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsolatedTenantProfile() throws Exception {
|
||||
loginSysAdmin();
|
||||
|
||||
TenantProfile edgeTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class);
|
||||
|
||||
// set tenant profile isolated and add 2 queues - main and isolated
|
||||
edgeTenantProfile.setIsolatedTbRuleEngine(true);
|
||||
TenantProfileQueueConfiguration mainQueueConfiguration = createQueueConfig(DataConstants.MAIN_QUEUE_NAME, DataConstants.MAIN_QUEUE_TOPIC);
|
||||
TenantProfileQueueConfiguration isolatedQueueConfiguration = createQueueConfig("IsolatedHighPriority", "tb_rule_engine.isolated_hp");
|
||||
edgeTenantProfile.getProfileData().setQueueConfiguration(List.of(mainQueueConfiguration, isolatedQueueConfiguration));
|
||||
edgeImitator.expectMessageAmount(1);
|
||||
edgeTenantProfile = doPost("/api/tenantProfile", edgeTenantProfile, TenantProfile.class);
|
||||
Assert.assertTrue(edgeImitator.waitForMessages());
|
||||
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
|
||||
Assert.assertTrue(latestMessage instanceof TenantProfileUpdateMsg);
|
||||
TenantProfileUpdateMsg tenantProfileUpdateMsg = (TenantProfileUpdateMsg) latestMessage;
|
||||
Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, tenantProfileUpdateMsg.getMsgType());
|
||||
Assert.assertEquals(edgeTenantProfile.getUuidId().getMostSignificantBits(), tenantProfileUpdateMsg.getIdMSB());
|
||||
Assert.assertEquals(edgeTenantProfile.getUuidId().getLeastSignificantBits(), tenantProfileUpdateMsg.getIdLSB());
|
||||
Assert.assertEquals(edgeTenantProfile.getDescription(), tenantProfileUpdateMsg.getDescription());
|
||||
|
||||
loginTenantAdmin();
|
||||
|
||||
edgeImitator.expectMessageAmount(21);
|
||||
doPost("/api/edge/sync/" + edge.getId());
|
||||
assertThat(edgeImitator.waitForMessages()).as("await for messages after edge sync rest api call").isTrue();
|
||||
|
||||
Assert.assertTrue(edgeImitator.getDownlinkMsgs().get(0) instanceof TenantUpdateMsg);
|
||||
Assert.assertTrue(edgeImitator.getDownlinkMsgs().get(1) instanceof TenantProfileUpdateMsg);
|
||||
|
||||
List<QueueUpdateMsg> queueUpdateMsgs = edgeImitator.findAllMessagesByType(QueueUpdateMsg.class);
|
||||
Assert.assertEquals(2, queueUpdateMsgs.size());
|
||||
for (QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) {
|
||||
Assert.assertEquals(tenantId.getId().getMostSignificantBits(), queueUpdateMsg.getTenantIdMSB());
|
||||
Assert.assertEquals(tenantId.getId().getLeastSignificantBits(), queueUpdateMsg.getTenantIdLSB());
|
||||
}
|
||||
}
|
||||
|
||||
private TenantProfileQueueConfiguration createQueueConfig(String queueName, String queueTopic) {
|
||||
TenantProfileQueueConfiguration queueConfiguration = new TenantProfileQueueConfiguration();
|
||||
queueConfiguration.setName(queueName);
|
||||
queueConfiguration.setTopic(queueTopic);
|
||||
queueConfiguration.setPollInterval(25);
|
||||
queueConfiguration.setPartitions(10);
|
||||
queueConfiguration.setConsumerPerPartition(true);
|
||||
queueConfiguration.setPackProcessingTimeout(2000);
|
||||
SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
|
||||
mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
|
||||
mainQueueSubmitStrategy.setBatchSize(1000);
|
||||
queueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy);
|
||||
ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
|
||||
mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
|
||||
mainQueueProcessingStrategy.setRetries(3);
|
||||
mainQueueProcessingStrategy.setFailurePercentage(0);
|
||||
mainQueueProcessingStrategy.setPauseBetweenRetries(3);
|
||||
mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
|
||||
queueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy);
|
||||
return queueConfiguration;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user