move default queue names and topics to DataConstants

This commit is contained in:
ShvaykaD 2023-01-24 18:21:55 +02:00
parent a8d27b9f17
commit f5e20b5a3e
16 changed files with 60 additions and 41 deletions

View File

@ -203,8 +203,8 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
isolatedRuleEngineTenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration());
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
mainQueueConfiguration.setName("Main");
mainQueueConfiguration.setTopic("tb_rule_engine.main");
mainQueueConfiguration.setName(DataConstants.MAIN_QUEUE_NAME);
mainQueueConfiguration.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
mainQueueConfiguration.setPollInterval(25);
mainQueueConfiguration.setPartitions(10);
mainQueueConfiguration.setConsumerPerPartition(true);
@ -598,12 +598,12 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
@Override
public void createQueues() {
Queue mainQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "Main");
Queue mainQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, DataConstants.MAIN_QUEUE_NAME);
if (mainQueue == null) {
mainQueue = new Queue();
mainQueue.setTenantId(TenantId.SYS_TENANT_ID);
mainQueue.setName("Main");
mainQueue.setTopic("tb_rule_engine.main");
mainQueue.setName(DataConstants.MAIN_QUEUE_NAME);
mainQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
mainQueue.setPollInterval(25);
mainQueue.setPartitions(10);
mainQueue.setConsumerPerPartition(true);
@ -622,12 +622,12 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
queueService.saveQueue(mainQueue);
}
Queue highPriorityQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "HighPriority");
Queue highPriorityQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, DataConstants.HP_QUEUE_NAME);
if (highPriorityQueue == null) {
highPriorityQueue = new Queue();
highPriorityQueue.setTenantId(TenantId.SYS_TENANT_ID);
highPriorityQueue.setName("HighPriority");
highPriorityQueue.setTopic("tb_rule_engine.hp");
highPriorityQueue.setName(DataConstants.HP_QUEUE_NAME);
highPriorityQueue.setTopic(DataConstants.HP_QUEUE_TOPIC);
highPriorityQueue.setPollInterval(25);
highPriorityQueue.setPartitions(10);
highPriorityQueue.setConsumerPerPartition(true);
@ -646,12 +646,12 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
queueService.saveQueue(highPriorityQueue);
}
Queue sequentialByOriginatorQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "SequentialByOriginator");
Queue sequentialByOriginatorQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, DataConstants.SQ_QUEUE_NAME);
if (sequentialByOriginatorQueue == null) {
sequentialByOriginatorQueue = new Queue();
sequentialByOriginatorQueue.setTenantId(TenantId.SYS_TENANT_ID);
sequentialByOriginatorQueue.setName("SequentialByOriginator");
sequentialByOriginatorQueue.setTopic("tb_rule_engine.sq");
sequentialByOriginatorQueue.setName(DataConstants.SQ_QUEUE_NAME);
sequentialByOriginatorQueue.setTopic(DataConstants.SQ_QUEUE_TOPIC);
sequentialByOriginatorQueue.setPollInterval(25);
sequentialByOriginatorQueue.setPartitions(10);
sequentialByOriginatorQueue.setPackProcessingTimeout(2000);

View File

@ -21,6 +21,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PostConstruct;
@ -39,7 +40,7 @@ public class TbRuleEngineQueueConfigService {
@PostConstruct
public void validate() {
queues.stream().filter(queue -> queue.getName().equals("Main")).findFirst().orElseThrow(() -> {
queues.stream().filter(queue -> queue.getName().equals(DataConstants.MAIN_QUEUE_NAME)).findFirst().orElseThrow(() -> {
log.error("Main queue is not configured in thingsboard.yml");
return new RuntimeException("No \"Main\" queue configured!");
});

View File

@ -30,6 +30,7 @@ import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
import org.thingsboard.rule.engine.flow.TbRuleChainInputNodeConfiguration;
import org.thingsboard.rule.engine.profile.TbDeviceProfileNode;
import org.thingsboard.rule.engine.profile.TbDeviceProfileNodeConfiguration;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
@ -651,8 +652,8 @@ public class DefaultDataUpdateService implements DataUpdateService {
private TenantProfileQueueConfiguration getMainQueueConfiguration() {
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
mainQueueConfiguration.setName("Main");
mainQueueConfiguration.setTopic("tb_rule_engine.main");
mainQueueConfiguration.setName(DataConstants.MAIN_QUEUE_NAME);
mainQueueConfiguration.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
mainQueueConfiguration.setPollInterval(25);
mainQueueConfiguration.setPartitions(10);
mainQueueConfiguration.setConsumerPerPartition(true);

View File

@ -30,6 +30,7 @@ import org.mockito.Mockito;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.ResultActions;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantInfo;
@ -427,7 +428,7 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest {
tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration());
tenantProfile.setProfileData(tenantProfileData);
tenantProfile.setIsolatedTbRuleEngine(true);
addQueueConfig(tenantProfile, "Main");
addQueueConfig(tenantProfile, DataConstants.MAIN_QUEUE_NAME);
addQueueConfig(tenantProfile, "Test");
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
@ -456,7 +457,7 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest {
tenantProfileData2.setConfiguration(new DefaultTenantProfileConfiguration());
tenantProfile2.setProfileData(tenantProfileData2);
tenantProfile2.setIsolatedTbRuleEngine(true);
addQueueConfig(tenantProfile2, "Main");
addQueueConfig(tenantProfile2, DataConstants.MAIN_QUEUE_NAME);
addQueueConfig(tenantProfile2, "Test");
addQueueConfig(tenantProfile2, "Test2");
tenantProfile2 = doPost("/api/tenantProfile", tenantProfile2, TenantProfile.class);

View File

@ -20,6 +20,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityInfo;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
@ -341,8 +342,8 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
private void addMainQueueConfig(TenantProfile tenantProfile) {
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
mainQueueConfiguration.setName("Main");
mainQueueConfiguration.setTopic("tb_rule_engine.main");
mainQueueConfiguration.setName(DataConstants.MAIN_QUEUE_NAME);
mainQueueConfiguration.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
mainQueueConfiguration.setPollInterval(25);
mainQueueConfiguration.setPartitions(10);
mainQueueConfiguration.setConsumerPerPartition(true);

View File

@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.OtaPackageInfo;
@ -427,8 +428,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
UUID queueUUID = new UUID(queueUpdateMsg.getIdMSB(), queueUpdateMsg.getIdLSB());
Queue queue = doGet("/api/queues/" + queueUUID, Queue.class);
Assert.assertNotNull(queue);
Assert.assertEquals("Main", queueUpdateMsg.getName());
Assert.assertEquals("tb_rule_engine.main", queueUpdateMsg.getTopic());
Assert.assertEquals(DataConstants.MAIN_QUEUE_NAME, queueUpdateMsg.getName());
Assert.assertEquals(DataConstants.MAIN_QUEUE_TOPIC, queueUpdateMsg.getTopic());
Assert.assertEquals(10, queueUpdateMsg.getPartitions());
Assert.assertEquals(25, queueUpdateMsg.getPollInterval());
testAutoGeneratedCodeByProtobuf(queueUpdateMsg);

View File

@ -24,6 +24,7 @@ import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
@ -239,7 +240,7 @@ public class DefaultTbClusterServiceTest {
TenantId tenantId = TenantId.SYS_TENANT_ID;
Queue queue = new Queue(new QueueId(UUID.randomUUID()));
queue.setTenantId(tenantId);
queue.setName("Main");
queue.setName(DataConstants.MAIN_QUEUE_NAME);
queue.setTopic("main");
queue.setPartitions(10);
return queue;

View File

@ -22,6 +22,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
@ -69,7 +70,7 @@ public class TbMsgPackProcessingContextTest {
TbRuleEngineSubmitStrategy strategyMock = mock(TbRuleEngineSubmitStrategy.class);
when(strategyMock.getPendingMap()).thenReturn(messages);
TbMsgPackProcessingContext context = new TbMsgPackProcessingContext("Main", strategyMock, false);
TbMsgPackProcessingContext context = new TbMsgPackProcessingContext(DataConstants.MAIN_QUEUE_NAME, strategyMock, false);
for (UUID uuid : messages.keySet()) {
final CountDownLatch readyLatch = new CountDownLatch(parallelCount);
final CountDownLatch startLatch = new CountDownLatch(1);

View File

@ -118,4 +118,12 @@ public class DataConstants {
public static final String MSG_SOURCE_KEY = "source";
public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway";
public static final String MAIN_QUEUE_NAME = "Main";
public static final String MAIN_QUEUE_TOPIC = "tb_rule_engine.main";
public static final String HP_QUEUE_NAME = "HighPriority";
public static final String HP_QUEUE_TOPIC = "tb_rule_engine.hp";
public static final String SQ_QUEUE_NAME = "SequentialByOriginator";
public static final String SQ_QUEUE_TOPIC = "tb_rule_engine.sq";
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.queue.discovery;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
@ -24,7 +25,6 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
@Data
@AllArgsConstructor
public class QueueKey {
private static final String MAIN = "Main";
private final ServiceType type;
private final String queueName;
@ -44,13 +44,13 @@ public class QueueKey {
public QueueKey(ServiceType type, TenantId tenantId) {
this.type = type;
this.queueName = MAIN;
this.queueName = DataConstants.MAIN_QUEUE_NAME;
this.tenantId = tenantId != null ? tenantId : TenantId.SYS_TENANT_ID;
}
public QueueKey(ServiceType type) {
this.type = type;
this.queueName = MAIN;
this.queueName = DataConstants.MAIN_QUEUE_NAME;
this.tenantId = TenantId.SYS_TENANT_ID;
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.queue.discovery;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
@ -29,7 +30,7 @@ class QueueKeyTest {
@Test
void testToStringSystemTenant() {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "Main", TenantId.SYS_TENANT_ID);
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TenantId.SYS_TENANT_ID);
log.info("The queue key is {}",queueKey);
assertThat(queueKey.toString()).isEqualTo("QK(Main,TB_RULE_ENGINE,system)");
}
@ -37,7 +38,7 @@ class QueueKeyTest {
@Test
void testToStringCustomTenant() {
TenantId tenantId = TenantId.fromUUID(UUID.fromString("3ebd39eb-43d4-4911-a818-cdbf8d508f88"));
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "Main", tenantId);
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
log.info("The queue key is {}",queueKey);
assertThat(queueKey.toString()).isEqualTo("QK(Main,TB_RULE_ENGINE,3ebd39eb-43d4-4911-a818-cdbf8d508f88)");
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.service.validator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
@ -72,7 +73,7 @@ public class TenantProfileDataValidator extends DataValidator<TenantProfile> {
Optional<TenantProfileQueueConfiguration> mainQueueConfig =
queueConfiguration
.stream()
.filter(q -> q.getName().equals("Main"))
.filter(q -> q.getName().equals(DataConstants.MAIN_QUEUE_NAME))
.findAny();
if (mainQueueConfig.isEmpty()) {
throw new DataValidationException("Main queue configuration should be specified!");

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
@ -70,7 +71,7 @@ public class JpaQueueDao extends JpaAbstractDao<QueueEntity, Queue> implements Q
@Override
public List<Queue> findAllMainQueues() {
List<QueueEntity> entities = Lists.newArrayList(queueRepository.findAllByName("Main"));
List<QueueEntity> entities = Lists.newArrayList(queueRepository.findAllByName(DataConstants.MAIN_QUEUE_NAME));
return DaoUtil.convertDataList(entities);
}

View File

@ -19,6 +19,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
@ -54,8 +55,8 @@ public abstract class BaseQueueServiceTest extends AbstractServiceTest {
tenantProfile.setIsolatedTbRuleEngine(true);
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
mainQueueConfiguration.setName("Main");
mainQueueConfiguration.setTopic("tb_rule_engine.main");
mainQueueConfiguration.setName(DataConstants.MAIN_QUEUE_NAME);
mainQueueConfiguration.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
mainQueueConfiguration.setPollInterval(25);
mainQueueConfiguration.setPartitions(10);
mainQueueConfiguration.setConsumerPerPartition(true);
@ -495,7 +496,7 @@ public abstract class BaseQueueServiceTest extends AbstractServiceTest {
for (int i = 0; i < loadedQueues.size(); i++) {
Queue queue = loadedQueues.get(i);
if (queue.getName().equals("Main")) {
if (queue.getName().equals(DataConstants.MAIN_QUEUE_NAME)) {
loadedQueues.remove(queue);
break;
}

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.node.NullNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityInfo;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
@ -57,8 +58,8 @@ public abstract class BaseTenantProfileServiceTest extends AbstractServiceTest {
tenantProfile.setIsolatedTbRuleEngine(true);
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
mainQueueConfiguration.setName("Main");
mainQueueConfiguration.setTopic("tb_rule_engine.main");
mainQueueConfiguration.setName(DataConstants.MAIN_QUEUE_NAME);
mainQueueConfiguration.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
mainQueueConfiguration.setPollInterval(25);
mainQueueConfiguration.setPartitions(10);
mainQueueConfiguration.setConsumerPerPartition(true);
@ -295,8 +296,8 @@ public abstract class BaseTenantProfileServiceTest extends AbstractServiceTest {
private void addMainQueueConfig(TenantProfile tenantProfile) {
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
mainQueueConfiguration.setName("Main");
mainQueueConfiguration.setTopic("tb_rule_engine.main");
mainQueueConfiguration.setName(DataConstants.MAIN_QUEUE_NAME);
mainQueueConfiguration.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
mainQueueConfiguration.setPollInterval(25);
mainQueueConfiguration.setPartitions(10);
mainQueueConfiguration.setConsumerPerPartition(true);

View File

@ -34,6 +34,7 @@ import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy;
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode;
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNodeConfiguration;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@ -69,8 +70,6 @@ import static org.mockito.Mockito.when;
@Slf4j
public class TbMsgDeduplicationNodeTest {
private static final String MAIN_QUEUE_NAME = "Main";
private static final String HIGH_PRIORITY_QUEUE_NAME = "HighPriority";
private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg";
private TbContext ctx;
@ -227,7 +226,7 @@ public class TbMsgDeduplicationNodeTest {
config.setInterval(deduplicationInterval);
config.setStrategy(DeduplicationStrategy.ALL);
config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name());
config.setQueueName(HIGH_PRIORITY_QUEUE_NAME);
config.setQueueName(DataConstants.HP_QUEUE_NAME);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
@ -267,7 +266,7 @@ public class TbMsgDeduplicationNodeTest {
config.setInterval(deduplicationInterval);
config.setStrategy(DeduplicationStrategy.ALL);
config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name());
config.setQueueName(HIGH_PRIORITY_QUEUE_NAME);
config.setQueueName(DataConstants.HP_QUEUE_NAME);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
@ -381,7 +380,7 @@ public class TbMsgDeduplicationNodeTest {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("ts", String.valueOf(ts));
return TbMsg.newMsg(
MAIN_QUEUE_NAME,
DataConstants.MAIN_QUEUE_NAME,
SessionMsgType.POST_TELEMETRY_REQUEST.name(),
deviceId,
metaData,