Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Andrii Shvaika 2023-05-30 14:52:06 +03:00
commit 11b6e8ebd0
3 changed files with 100 additions and 4 deletions

View File

@ -83,7 +83,7 @@ public class QueueController extends BaseController {
@RequestParam(required = false) String sortOrder) throws ThingsboardException { @RequestParam(required = false) String sortOrder) throws ThingsboardException {
checkParameter("serviceType", serviceType); checkParameter("serviceType", serviceType);
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
ServiceType type = ServiceType.valueOf(serviceType); ServiceType type = ServiceType.of(serviceType);
switch (type) { switch (type) {
case TB_RULE_ENGINE: case TB_RULE_ENGINE:
return queueService.findQueuesByTenantId(getTenantId(), pageLink); return queueService.findQueuesByTenantId(getTenantId(), pageLink);
@ -136,7 +136,7 @@ public class QueueController extends BaseController {
checkEntity(queue.getId(), queue, Resource.QUEUE); checkEntity(queue.getId(), queue, Resource.QUEUE);
ServiceType type = ServiceType.valueOf(serviceType); ServiceType type = ServiceType.of(serviceType);
switch (type) { switch (type) {
case TB_RULE_ENGINE: case TB_RULE_ENGINE:
queue.setTenantId(getTenantId()); queue.setTenantId(getTenantId());

View File

@ -0,0 +1,96 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.SubmitStrategy;
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@DaoSqlTest
public class BaseQueueControllerTest extends AbstractControllerTest {
@Test
public void testQueueWithServiceTypeRE() throws Exception {
loginSysAdmin();
// create queue
Queue queue = new Queue();
queue.setName("qwerty");
queue.setTopic("tb_rule_engine.qwerty");
queue.setPollInterval(25);
queue.setPartitions(10);
queue.setTenantId(TenantId.SYS_TENANT_ID);
queue.setConsumerPerPartition(false);
queue.setPackProcessingTimeout(2000);
SubmitStrategy submitStrategy = new SubmitStrategy();
submitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR);
queue.setSubmitStrategy(submitStrategy);
ProcessingStrategy processingStrategy = new ProcessingStrategy();
processingStrategy.setType(ProcessingStrategyType.RETRY_ALL);
processingStrategy.setRetries(3);
processingStrategy.setFailurePercentage(0.7);
processingStrategy.setPauseBetweenRetries(3);
processingStrategy.setMaxPauseBetweenRetries(5);
queue.setProcessingStrategy(processingStrategy);
// create queue
Queue queue2 = new Queue();
queue2.setName("qwerty2");
queue2.setTopic("tb_rule_engine.qwerty2");
queue2.setPollInterval(25);
queue2.setPartitions(10);
queue2.setTenantId(TenantId.SYS_TENANT_ID);
queue2.setConsumerPerPartition(false);
queue2.setPackProcessingTimeout(2000);
submitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR);
queue2.setSubmitStrategy(submitStrategy);
processingStrategy.setType(ProcessingStrategyType.RETRY_ALL);
processingStrategy.setRetries(3);
processingStrategy.setFailurePercentage(0.7);
processingStrategy.setPauseBetweenRetries(3);
processingStrategy.setMaxPauseBetweenRetries(5);
queue2.setProcessingStrategy(processingStrategy);
Queue savedQueue = doPost("/api/queues?serviceType=" + "TB-RULE-ENGINE", queue, Queue.class);
Queue savedQueue2 = doPost("/api/queues?serviceType=" + "TB_RULE_ENGINE", queue2, Queue.class);
PageLink pageLink = new PageLink(10);
PageData<Queue> pageData;
pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB-RULE-ENGINE&", new TypeReference<>() {
}, pageLink);
Assert.assertFalse(pageData.getData().isEmpty());
doDelete("/api/queues/" + savedQueue.getUuidId())
.andExpect(status().isOk());
pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference<>() {
}, pageLink);
Assert.assertFalse(pageData.getData().isEmpty());
doDelete("/api/queues/" + savedQueue2.getUuidId())
.andExpect(status().isOk());
}
}

View File

@ -326,7 +326,7 @@ public class HashPartitionService implements PartitionService {
final Map<QueueKey, List<ServiceInfo>> currentMap = new HashMap<>(); final Map<QueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
services.forEach(serviceInfo -> { services.forEach(serviceInfo -> {
for (String serviceTypeStr : serviceInfo.getServiceTypesList()) { for (String serviceTypeStr : serviceInfo.getServiceTypesList()) {
ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase()); ServiceType serviceType = ServiceType.of(serviceTypeStr);
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
partitionTopicsMap.keySet().forEach(queueKey -> partitionTopicsMap.keySet().forEach(queueKey ->
currentMap.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(serviceInfo)); currentMap.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(serviceInfo));
@ -389,7 +389,7 @@ public class HashPartitionService implements PartitionService {
private void addNode(Map<QueueKey, List<ServiceInfo>> queueServiceList, ServiceInfo instance) { private void addNode(Map<QueueKey, List<ServiceInfo>> queueServiceList, ServiceInfo instance) {
for (String serviceTypeStr : instance.getServiceTypesList()) { for (String serviceTypeStr : instance.getServiceTypesList()) {
ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase()); ServiceType serviceType = ServiceType.of(serviceTypeStr);
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
partitionTopicsMap.keySet().forEach(key -> { partitionTopicsMap.keySet().forEach(key -> {
if (key.getType().equals(ServiceType.TB_RULE_ENGINE)) { if (key.getType().equals(ServiceType.TB_RULE_ENGINE)) {