diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 5ed8f588e2..40e15c7b0e 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -220,6 +220,7 @@ public class ThingsboardInstallService { case "3.3.4": log.info("Upgrading ThingsBoard from version 3.3.4 to 3.4.0 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.3.4"); + dataUpdateService.updateData("3.3.4"); log.info("Updating system data..."); systemDataLoaderService.updateSystemWidgets(); break; diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java index d22660969d..cece3377b0 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java @@ -611,67 +611,76 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { @Override public void createQueues() { - Queue mainQueue = new Queue(); - mainQueue.setTenantId(TenantId.SYS_TENANT_ID); - mainQueue.setName("Main"); - mainQueue.setTopic("tb_rule_engine.main"); - mainQueue.setPollInterval(25); - mainQueue.setPartitions(10); - mainQueue.setConsumerPerPartition(true); - mainQueue.setPackProcessingTimeout(2000); - SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy(); - mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST); - mainQueueSubmitStrategy.setBatchSize(1000); - mainQueue.setSubmitStrategy(mainQueueSubmitStrategy); - ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy(); - mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES); - mainQueueProcessingStrategy.setRetries(3); - mainQueueProcessingStrategy.setFailurePercentage(0); - mainQueueProcessingStrategy.setPauseBetweenRetries(3); - mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3); - mainQueue.setProcessingStrategy(mainQueueProcessingStrategy); - queueService.saveQueue(mainQueue); + Queue mainQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "Main"); + if (mainQueue == null) { + mainQueue = new Queue(); + mainQueue.setTenantId(TenantId.SYS_TENANT_ID); + mainQueue.setName("Main"); + mainQueue.setTopic("tb_rule_engine.main"); + mainQueue.setPollInterval(25); + mainQueue.setPartitions(10); + mainQueue.setConsumerPerPartition(true); + mainQueue.setPackProcessingTimeout(2000); + SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy(); + mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST); + mainQueueSubmitStrategy.setBatchSize(1000); + mainQueue.setSubmitStrategy(mainQueueSubmitStrategy); + ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy(); + mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES); + mainQueueProcessingStrategy.setRetries(3); + mainQueueProcessingStrategy.setFailurePercentage(0); + mainQueueProcessingStrategy.setPauseBetweenRetries(3); + mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3); + mainQueue.setProcessingStrategy(mainQueueProcessingStrategy); + queueService.saveQueue(mainQueue); + } - Queue highPriorityQueue = new Queue(); - highPriorityQueue.setTenantId(TenantId.SYS_TENANT_ID); - highPriorityQueue.setName("HighPriority"); - highPriorityQueue.setTopic("tb_rule_engine.hp"); - highPriorityQueue.setPollInterval(25); - highPriorityQueue.setPartitions(10); - highPriorityQueue.setConsumerPerPartition(true); - highPriorityQueue.setPackProcessingTimeout(2000); - SubmitStrategy highPriorityQueueSubmitStrategy = new SubmitStrategy(); - highPriorityQueueSubmitStrategy.setType(SubmitStrategyType.BURST); - highPriorityQueueSubmitStrategy.setBatchSize(100); - highPriorityQueue.setSubmitStrategy(highPriorityQueueSubmitStrategy); - ProcessingStrategy highPriorityQueueProcessingStrategy = new ProcessingStrategy(); - highPriorityQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT); - highPriorityQueueProcessingStrategy.setRetries(0); - highPriorityQueueProcessingStrategy.setFailurePercentage(0); - highPriorityQueueProcessingStrategy.setPauseBetweenRetries(5); - highPriorityQueueProcessingStrategy.setMaxPauseBetweenRetries(5); - highPriorityQueue.setProcessingStrategy(highPriorityQueueProcessingStrategy); - queueService.saveQueue(highPriorityQueue); + Queue highPriorityQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "HighPriority"); + if (highPriorityQueue == null) { + highPriorityQueue = new Queue(); + highPriorityQueue.setTenantId(TenantId.SYS_TENANT_ID); + highPriorityQueue.setName("HighPriority"); + highPriorityQueue.setTopic("tb_rule_engine.hp"); + highPriorityQueue.setPollInterval(25); + highPriorityQueue.setPartitions(10); + highPriorityQueue.setConsumerPerPartition(true); + highPriorityQueue.setPackProcessingTimeout(2000); + SubmitStrategy highPriorityQueueSubmitStrategy = new SubmitStrategy(); + highPriorityQueueSubmitStrategy.setType(SubmitStrategyType.BURST); + highPriorityQueueSubmitStrategy.setBatchSize(100); + highPriorityQueue.setSubmitStrategy(highPriorityQueueSubmitStrategy); + ProcessingStrategy highPriorityQueueProcessingStrategy = new ProcessingStrategy(); + highPriorityQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT); + highPriorityQueueProcessingStrategy.setRetries(0); + highPriorityQueueProcessingStrategy.setFailurePercentage(0); + highPriorityQueueProcessingStrategy.setPauseBetweenRetries(5); + highPriorityQueueProcessingStrategy.setMaxPauseBetweenRetries(5); + highPriorityQueue.setProcessingStrategy(highPriorityQueueProcessingStrategy); + queueService.saveQueue(highPriorityQueue); + } - Queue sequentialByOriginatorQueue = new Queue(); - sequentialByOriginatorQueue.setTenantId(TenantId.SYS_TENANT_ID); - sequentialByOriginatorQueue.setName("SequentialByOriginator"); - sequentialByOriginatorQueue.setTopic("tb_rule_engine.sq"); - sequentialByOriginatorQueue.setPollInterval(25); - sequentialByOriginatorQueue.setPartitions(10); - sequentialByOriginatorQueue.setPackProcessingTimeout(2000); - sequentialByOriginatorQueue.setConsumerPerPartition(true); - SubmitStrategy sequentialByOriginatorQueueSubmitStrategy = new SubmitStrategy(); - sequentialByOriginatorQueueSubmitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR); - sequentialByOriginatorQueueSubmitStrategy.setBatchSize(100); - sequentialByOriginatorQueue.setSubmitStrategy(sequentialByOriginatorQueueSubmitStrategy); - ProcessingStrategy sequentialByOriginatorQueueProcessingStrategy = new ProcessingStrategy(); - sequentialByOriginatorQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT); - sequentialByOriginatorQueueProcessingStrategy.setRetries(3); - sequentialByOriginatorQueueProcessingStrategy.setFailurePercentage(0); - sequentialByOriginatorQueueProcessingStrategy.setPauseBetweenRetries(5); - sequentialByOriginatorQueueProcessingStrategy.setMaxPauseBetweenRetries(5); - sequentialByOriginatorQueue.setProcessingStrategy(sequentialByOriginatorQueueProcessingStrategy); - queueService.saveQueue(sequentialByOriginatorQueue); + Queue sequentialByOriginatorQueue = queueService.findQueueByTenantIdAndName(TenantId.SYS_TENANT_ID, "SequentialByOriginator"); + if (sequentialByOriginatorQueue == null) { + sequentialByOriginatorQueue = new Queue(); + sequentialByOriginatorQueue.setTenantId(TenantId.SYS_TENANT_ID); + sequentialByOriginatorQueue.setName("SequentialByOriginator"); + sequentialByOriginatorQueue.setTopic("tb_rule_engine.sq"); + sequentialByOriginatorQueue.setPollInterval(25); + sequentialByOriginatorQueue.setPartitions(10); + sequentialByOriginatorQueue.setPackProcessingTimeout(2000); + sequentialByOriginatorQueue.setConsumerPerPartition(true); + SubmitStrategy sequentialByOriginatorQueueSubmitStrategy = new SubmitStrategy(); + sequentialByOriginatorQueueSubmitStrategy.setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR); + sequentialByOriginatorQueueSubmitStrategy.setBatchSize(100); + sequentialByOriginatorQueue.setSubmitStrategy(sequentialByOriginatorQueueSubmitStrategy); + ProcessingStrategy sequentialByOriginatorQueueProcessingStrategy = new ProcessingStrategy(); + sequentialByOriginatorQueueProcessingStrategy.setType(ProcessingStrategyType.RETRY_FAILED_AND_TIMED_OUT); + sequentialByOriginatorQueueProcessingStrategy.setRetries(3); + sequentialByOriginatorQueueProcessingStrategy.setFailurePercentage(0); + sequentialByOriginatorQueueProcessingStrategy.setPauseBetweenRetries(5); + sequentialByOriginatorQueueProcessingStrategy.setMaxPauseBetweenRetries(5); + sequentialByOriginatorQueue.setProcessingStrategy(sequentialByOriginatorQueueProcessingStrategy); + queueService.saveQueue(sequentialByOriginatorQueue); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index a86adeb578..d0d5f37e00 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -571,73 +571,10 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", SCHEMA_UPDATE_SQL); loadSql(schemaUpdateFile, conn); - log.info("Loading queues..."); - try { - if (!CollectionUtils.isEmpty(queueConfig.getQueues())) { - queueConfig.getQueues().forEach(queueSettings -> { - queueService.saveQueue(queueConfigToQueue(queueSettings)); - }); - } else { - systemDataLoaderService.createQueues(); - } - } catch (Exception e) { - } - log.info("Updating device profiles..."); schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql"); loadSql(schemaUpdateFile, conn); - log.info("Updating checkpoint rule nodes..."); - PageLink pageLink = new PageLink(100); - PageData pageData; - do { - pageData = tenantService.findTenants(pageLink); - for (Tenant tenant : pageData.getData()) { - TenantId tenantId = tenant.getId(); - Map queues = - queueService.findQueuesByTenantId(tenantId).stream().collect(Collectors.toMap(Queue::getName, Queue::getId)); - try { - List checkpointNodes = - ruleChainService.findRuleNodesByTenantIdAndType(tenantId, "org.thingsboard.rule.engine.flow.TbCheckpointNode"); - checkpointNodes.forEach(node -> { - ObjectNode configuration = (ObjectNode) node.getConfiguration(); - JsonNode queueNameNode = configuration.remove("queueName"); - if (queueNameNode != null) { - String queueName = queueNameNode.asText(); - configuration.put("queueId", queues.get(queueName).toString()); - ruleChainService.saveRuleNode(tenantId, node); - } - }); - } catch (Exception e) { - } - } - pageLink = pageLink.nextPageLink(); - } while (pageData.hasNext()); - - log.info("Updating tenant profiles..."); - PageLink profilePageLink = new PageLink(100); - PageData profilePageData; - do { - profilePageData = tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, profilePageLink); - - profilePageData.getData().forEach(profile -> { - try { - List queueConfiguration = profile.getProfileData().getQueueConfiguration(); - if (profile.isIsolatedTbRuleEngine() && (queueConfiguration == null || queueConfiguration.isEmpty())) { - TenantProfileQueueConfiguration mainQueueConfig = getMainQueueConfiguration(); - profile.getProfileData().setQueueConfiguration(Collections.singletonList((mainQueueConfig))); - tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, profile); - List isolatedTenants = tenantService.findTenantIdsByTenantProfileId(profile.getId()); - isolatedTenants.forEach(tenantId -> { - queueService.saveQueue(new Queue(tenantId, mainQueueConfig)); - }); - } - } catch (Exception e) { - } - - }); - profilePageLink = profilePageLink.nextPageLink(); - } while (profilePageData.hasNext()); log.info("Updating schema settings..."); conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004000;"); log.info("Schema updated."); @@ -691,48 +628,4 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService return isOldSchema; } - private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) { - Queue queue = new Queue(); - queue.setTenantId(TenantId.SYS_TENANT_ID); - queue.setName(queueSettings.getName()); - queue.setTopic(queueSettings.getTopic()); - queue.setPollInterval(queueSettings.getPollInterval()); - queue.setPartitions(queueSettings.getPartitions()); - queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout()); - SubmitStrategy submitStrategy = new SubmitStrategy(); - submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize()); - submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType())); - queue.setSubmitStrategy(submitStrategy); - ProcessingStrategy processingStrategy = new ProcessingStrategy(); - processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType())); - processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries()); - processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage()); - processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries()); - processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries()); - queue.setProcessingStrategy(processingStrategy); - queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition()); - return queue; - } - - private TenantProfileQueueConfiguration getMainQueueConfiguration() { - TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration(); - mainQueueConfiguration.setName("Main"); - mainQueueConfiguration.setTopic("tb_rule_engine.main"); - mainQueueConfiguration.setPollInterval(25); - mainQueueConfiguration.setPartitions(10); - mainQueueConfiguration.setConsumerPerPartition(true); - mainQueueConfiguration.setPackProcessingTimeout(2000); - SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy(); - mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST); - mainQueueSubmitStrategy.setBatchSize(1000); - mainQueueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy); - ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy(); - mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES); - mainQueueProcessingStrategy.setRetries(3); - mainQueueProcessingStrategy.setFailurePercentage(0); - mainQueueProcessingStrategy.setPauseBetweenRetries(3); - mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3); - mainQueueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy); - return mainQueueConfiguration; - } } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 93c4bb835e..7b87668319 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; @@ -31,14 +32,12 @@ import org.thingsboard.rule.engine.profile.TbDeviceProfileNode; import org.thingsboard.rule.engine.profile.TbDeviceProfileNodeConfiguration; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.alarm.AlarmQuery; import org.thingsboard.server.common.data.alarm.AlarmSeverity; -import org.thingsboard.server.common.data.id.EntityViewId; -import org.thingsboard.server.common.data.id.RuleChainId; -import org.thingsboard.server.common.data.id.RuleNodeId; -import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.*; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -47,12 +46,14 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.query.DynamicValue; import org.thingsboard.server.common.data.query.FilterPredicateValue; +import org.thingsboard.server.common.data.queue.*; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.alarm.AlarmDao; import org.thingsboard.server.dao.alarm.AlarmService; @@ -61,16 +62,22 @@ import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.model.sql.DeviceProfileEntity; import org.thingsboard.server.dao.model.sql.RelationEntity; import org.thingsboard.server.dao.oauth2.OAuth2Service; +import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.sql.device.DeviceProfileRepository; +import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.service.install.InstallScripts; +import org.thingsboard.server.service.install.SystemDataLoaderService; +import org.thingsboard.server.service.install.TbRuleEngineQueueConfigService; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -115,6 +122,18 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private OAuth2Service oAuth2Service; + @Autowired + private TenantProfileService tenantProfileService; + + @Autowired + private QueueService queueService; + + @Autowired + private TbRuleEngineQueueConfigService queueConfig; + + @Autowired + private SystemDataLoaderService systemDataLoaderService; + @Override public void updateData(String fromVersion) throws Exception { switch (fromVersion) { @@ -141,6 +160,26 @@ public class DefaultDataUpdateService implements DataUpdateService { log.info("Updating data from version 3.3.2 to 3.3.3 ..."); updateNestedRuleChains(); break; + case "3.3.4": + log.info("Updating data from version 3.3.4 to 3.4.0 ..."); + log.info("Loading queues..."); + try { + if (!CollectionUtils.isEmpty(queueConfig.getQueues())) { + queueConfig.getQueues().forEach(queueSettings -> { + Queue queue = queueConfigToQueue(queueSettings); + Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName()); + if (existing == null) { + queueService.saveQueue(queue); + } + }); + } else { + systemDataLoaderService.createQueues(); + } + } catch (Exception e) { + } + tenantsProfileQueueConfigurationUpdater.updateEntities(null); + checkPointRuleNodesUpdater.updateEntities(null); + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } @@ -547,4 +586,133 @@ public class DefaultDataUpdateService implements DataUpdateService { log.warn("CAUTION: Update of Oauth2 parameters from 3.2.2 to 3.3.0 available only in ThingsBoard versions 3.3.0/3.3.1"); } + private final PaginatedUpdater tenantsProfileQueueConfigurationUpdater = + new PaginatedUpdater<>() { + + @Override + protected String getName() { + return "Tenant profiles queue configuration updater"; + } + + @Override + protected boolean forceReportTotal() { + return true; + } + + @Override + protected PageData findEntities(String id, PageLink pageLink) { + return tenantProfileService.findTenantProfiles(TenantId.SYS_TENANT_ID, pageLink); + } + + @Override + protected void updateEntity(TenantProfile tenantProfile) { + updateTenantProfileQueueConfiguration(tenantProfile); + } + }; + + private void updateTenantProfileQueueConfiguration(TenantProfile profile) { + try { + List queueConfiguration = profile.getProfileData().getQueueConfiguration(); + if (profile.isIsolatedTbRuleEngine() && (queueConfiguration == null || queueConfiguration.isEmpty())) { + TenantProfileQueueConfiguration mainQueueConfig = getMainQueueConfiguration(); + profile.getProfileData().setQueueConfiguration(Collections.singletonList((mainQueueConfig))); + tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, profile); + List isolatedTenants = tenantService.findTenantIdsByTenantProfileId(profile.getId()); + isolatedTenants.forEach(tenantId -> { + queueService.saveQueue(new Queue(tenantId, mainQueueConfig)); + }); + } + } catch (Exception e) { + log.error("Failed to update tenant profile queue configuration name=["+profile.getName()+"], id=["+ profile.getId().getId() +"]", e); + } + } + + private TenantProfileQueueConfiguration getMainQueueConfiguration() { + TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration(); + mainQueueConfiguration.setName("Main"); + mainQueueConfiguration.setTopic("tb_rule_engine.main"); + mainQueueConfiguration.setPollInterval(25); + mainQueueConfiguration.setPartitions(10); + mainQueueConfiguration.setConsumerPerPartition(true); + mainQueueConfiguration.setPackProcessingTimeout(2000); + SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy(); + mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST); + mainQueueSubmitStrategy.setBatchSize(1000); + mainQueueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy); + ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy(); + mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES); + mainQueueProcessingStrategy.setRetries(3); + mainQueueProcessingStrategy.setFailurePercentage(0); + mainQueueProcessingStrategy.setPauseBetweenRetries(3); + mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3); + mainQueueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy); + return mainQueueConfiguration; + } + + private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) { + Queue queue = new Queue(); + queue.setTenantId(TenantId.SYS_TENANT_ID); + queue.setName(queueSettings.getName()); + queue.setTopic(queueSettings.getTopic()); + queue.setPollInterval(queueSettings.getPollInterval()); + queue.setPartitions(queueSettings.getPartitions()); + queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout()); + SubmitStrategy submitStrategy = new SubmitStrategy(); + submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize()); + submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType())); + queue.setSubmitStrategy(submitStrategy); + ProcessingStrategy processingStrategy = new ProcessingStrategy(); + processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType())); + processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries()); + processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage()); + processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries()); + processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries()); + queue.setProcessingStrategy(processingStrategy); + queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition()); + return queue; + } + + private final PaginatedUpdater checkPointRuleNodesUpdater = + new PaginatedUpdater<>() { + + @Override + protected String getName() { + return "Checkpoint rule nodes updater"; + } + + @Override + protected boolean forceReportTotal() { + return true; + } + + @Override + protected PageData findEntities(String id, PageLink pageLink) { + return ruleChainService.findAllRuleNodesByType("org.thingsboard.rule.engine.flow.TbCheckpointNode", pageLink); + } + + @Override + protected void updateEntity(RuleNode ruleNode) { + updateCheckPointRuleNodeConfiguration(ruleNode); + } + }; + + private void updateCheckPointRuleNodeConfiguration(RuleNode node) { + try { + ObjectNode configuration = (ObjectNode) node.getConfiguration(); + JsonNode queueNameNode = configuration.remove("queueName"); + if (queueNameNode != null) { + RuleChain ruleChain = this.ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, node.getRuleChainId()); + TenantId tenantId = ruleChain.getTenantId(); + Map queues = + queueService.findQueuesByTenantId(tenantId).stream().collect(Collectors.toMap(Queue::getName, Queue::getId)); + String queueName = queueNameNode.asText(); + QueueId queueId = queues.get(queueName); + configuration.put("queueId", queueId != null ? queueId.toString() : ""); + ruleChainService.saveRuleNode(tenantId, node); + } + } catch (Exception e) { + log.error("Failed to update checkpoint rule node configuration name=["+node.getName()+"], id=["+ node.getId().getId() +"]", e); + } + } + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java index 1d99534636..3328372f2e 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java @@ -94,5 +94,7 @@ public interface RuleChainService { List findRuleNodesByTenantIdAndType(TenantId tenantId, String type); + PageData findAllRuleNodesByType(String type, PageLink pageLink); + RuleNode saveRuleNode(TenantId tenantId, RuleNode ruleNode); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 166d142baf..4be7227214 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -68,8 +68,7 @@ import java.util.Set; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.TENANT; -import static org.thingsboard.server.dao.service.Validator.validateId; -import static org.thingsboard.server.dao.service.Validator.validateString; +import static org.thingsboard.server.dao.service.Validator.*; /** * Created by igor on 3/12/18. @@ -676,6 +675,14 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC return ruleNodeDao.findRuleNodesByTenantIdAndType(tenantId, type, ""); } + @Override + public PageData findAllRuleNodesByType(String type, PageLink pageLink) { + log.trace("Executing findAllRuleNodesByType, type {}, pageLink {}", type, pageLink); + validateString(type, "Incorrect type of the rule node"); + validatePageLink(pageLink); + return ruleNodeDao.findAllRuleNodesByType(type, pageLink); + } + @Override public RuleNode saveRuleNode(TenantId tenantId, RuleNode ruleNode) { return ruleNodeDao.save(tenantId, ruleNode); diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java index 3984622d89..b4f087b98a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java @@ -16,6 +16,8 @@ package org.thingsboard.server.dao.rule; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.Dao; @@ -27,4 +29,6 @@ import java.util.List; public interface RuleNodeDao extends Dao { List findRuleNodesByTenantIdAndType(TenantId tenantId, String type, String search); + + PageData findAllRuleNodesByType(String type, PageLink pageLink); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java index a499e68da8..a0f84758c6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java @@ -20,6 +20,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.RuleNodeEntity; @@ -27,6 +29,7 @@ import org.thingsboard.server.dao.rule.RuleNodeDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; import java.util.List; +import java.util.Objects; import java.util.UUID; @Slf4j @@ -50,4 +53,14 @@ public class JpaRuleNodeDao extends JpaAbstractSearchTextDao findRuleNodesByTenantIdAndType(TenantId tenantId, String type, String search) { return DaoUtil.convertDataList(ruleNodeRepository.findRuleNodesByTenantIdAndType(tenantId.getId(), type, search)); } + + @Override + public PageData findAllRuleNodesByType(String type, PageLink pageLink) { + return DaoUtil.toPageData(ruleNodeRepository + .findAllRuleNodesByType( + type, + Objects.toString(pageLink.getTextSearch(), ""), + DaoUtil.toPageable(pageLink))); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java index b11923a50f..f584ff2b02 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.dao.sql.rule; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; @@ -32,4 +34,9 @@ public interface RuleNodeRepository extends JpaRepository @Param("ruleType") String ruleType, @Param("searchText") String searchText); + @Query("SELECT r FROM RuleNodeEntity r WHERE r.type = :ruleType AND LOWER(r.configuration) LIKE LOWER(CONCAT('%', :searchText, '%')) ") + Page findAllRuleNodesByType(@Param("ruleType") String ruleType, + @Param("searchText") String searchText, + Pageable pageable); + }