diff --git a/application/src/main/java/org/thingsboard/server/controller/QueueController.java b/application/src/main/java/org/thingsboard/server/controller/QueueController.java index 3ba1df8c2a..0e79ae7955 100644 --- a/application/src/main/java/org/thingsboard/server/controller/QueueController.java +++ b/application/src/main/java/org/thingsboard/server/controller/QueueController.java @@ -83,7 +83,7 @@ public class QueueController extends BaseController { @RequestParam(required = false) String sortOrder) throws ThingsboardException { checkParameter("serviceType", serviceType); PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); - ServiceType type = ServiceType.valueOf(serviceType); + ServiceType type = ServiceType.of(serviceType); switch (type) { case TB_RULE_ENGINE: return queueService.findQueuesByTenantId(getTenantId(), pageLink); @@ -136,7 +136,7 @@ public class QueueController extends BaseController { checkEntity(queue.getId(), queue, Resource.QUEUE); - ServiceType type = ServiceType.valueOf(serviceType); + ServiceType type = ServiceType.of(serviceType); switch (type) { case TB_RULE_ENGINE: queue.setTenantId(getTenantId()); diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java new file mode 100644 index 0000000000..465ec37bb9 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java @@ -0,0 +1,96 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.controller; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.junit.Assert; +import org.junit.Test; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.queue.ProcessingStrategy; +import org.thingsboard.server.common.data.queue.ProcessingStrategyType; +import org.thingsboard.server.common.data.queue.Queue; +import org.thingsboard.server.common.data.queue.SubmitStrategy; +import org.thingsboard.server.common.data.queue.SubmitStrategyType; +import org.thingsboard.server.dao.service.DaoSqlTest; + +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@DaoSqlTest +public class BaseQueueControllerTest extends AbstractControllerTest { + + @Test + public void testQueueWithServiceTypeRE() throws Exception { + loginSysAdmin(); + + // create queue + Queue queue = new Queue(); + queue.setName("qwerty"); + queue.setTopic("tb_rule_engine.qwerty"); + queue.setPollInterval(25); + queue.setPartitions(10); + queue.setTenantId(TenantId.SYS_TENANT_ID); + queue.setConsumerPerPartition(false); + queue.setPackProcessingTimeout(2000); + SubmitStrategy submitStrategy = new SubmitStrategy(); + submitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR); + queue.setSubmitStrategy(submitStrategy); + ProcessingStrategy processingStrategy = new ProcessingStrategy(); + processingStrategy.setType(ProcessingStrategyType.RETRY_ALL); + processingStrategy.setRetries(3); + processingStrategy.setFailurePercentage(0.7); + processingStrategy.setPauseBetweenRetries(3); + processingStrategy.setMaxPauseBetweenRetries(5); + queue.setProcessingStrategy(processingStrategy); + + // create queue + Queue queue2 = new Queue(); + queue2.setName("qwerty2"); + queue2.setTopic("tb_rule_engine.qwerty2"); + queue2.setPollInterval(25); + queue2.setPartitions(10); + queue2.setTenantId(TenantId.SYS_TENANT_ID); + queue2.setConsumerPerPartition(false); + queue2.setPackProcessingTimeout(2000); + submitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR); + queue2.setSubmitStrategy(submitStrategy); + processingStrategy.setType(ProcessingStrategyType.RETRY_ALL); + processingStrategy.setRetries(3); + processingStrategy.setFailurePercentage(0.7); + processingStrategy.setPauseBetweenRetries(3); + processingStrategy.setMaxPauseBetweenRetries(5); + queue2.setProcessingStrategy(processingStrategy); + + Queue savedQueue = doPost("/api/queues?serviceType=" + "TB-RULE-ENGINE", queue, Queue.class); + Queue savedQueue2 = doPost("/api/queues?serviceType=" + "TB_RULE_ENGINE", queue2, Queue.class); + + PageLink pageLink = new PageLink(10); + PageData pageData; + pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB-RULE-ENGINE&", new TypeReference<>() { + }, pageLink); + Assert.assertFalse(pageData.getData().isEmpty()); + doDelete("/api/queues/" + savedQueue.getUuidId()) + .andExpect(status().isOk()); + + pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference<>() { + }, pageLink); + Assert.assertFalse(pageData.getData().isEmpty()); + doDelete("/api/queues/" + savedQueue2.getUuidId()) + .andExpect(status().isOk()); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 08f2e849f8..14fb0369ee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -326,7 +326,7 @@ public class HashPartitionService implements PartitionService { final Map> currentMap = new HashMap<>(); services.forEach(serviceInfo -> { for (String serviceTypeStr : serviceInfo.getServiceTypesList()) { - ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase()); + ServiceType serviceType = ServiceType.of(serviceTypeStr); if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { partitionTopicsMap.keySet().forEach(queueKey -> currentMap.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(serviceInfo)); @@ -389,7 +389,7 @@ public class HashPartitionService implements PartitionService { private void addNode(Map> queueServiceList, ServiceInfo instance) { for (String serviceTypeStr : instance.getServiceTypesList()) { - ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase()); + ServiceType serviceType = ServiceType.of(serviceTypeStr); if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { partitionTopicsMap.keySet().forEach(key -> { if (key.getType().equals(ServiceType.TB_RULE_ENGINE)) {