diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 1be7fdecc4..054c7a6751 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -281,7 +281,6 @@ public class ThingsboardInstallService { case "3.6.2": log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.6.2"); - dataUpdateService.updateData("3.6.2"); //TODO DON'T FORGET to update switch statement in the CacheCleanupService if you need to clear the cache break; default: diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index c57d38b66d..7d08fdca00 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -227,25 +227,11 @@ public class DefaultDataUpdateService implements DataUpdateService { log.info("Updating data from version 3.6.0 to 3.6.1 ..."); migrateDeviceConnectivity(); break; - case "3.6.2": - updateRuleNodesQueueName(); - break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } } - private void updateRuleNodesQueueName() { - String[] ruleNodeTypes = { - "org.thingsboard.rule.engine.debug.TbMsgGeneratorNode", - "org.thingsboard.rule.engine.flow.TbCheckpointNode", - "org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode" - }; - for (String ruleNodeType : ruleNodeTypes) { - ruleNodeQueueNameUpdater.updateEntities(ruleNodeType); - } - } - private void migrateEdgeEvents(String logPrefix) { boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false); if (!skipEdgeEventsMigration) { @@ -806,39 +792,4 @@ public class DefaultDataUpdateService implements DataUpdateService { } } - private final PaginatedUpdater ruleNodeQueueNameUpdater = - new PaginatedUpdater<>() { - - @Override - protected String getName() { - return "RuleNode queue name updater"; - } - - @Override - protected boolean forceReportTotal() { - return true; - } - - @Override - protected PageData findEntities(String type, PageLink pageLink) { - return ruleChainService.findAllRuleNodesByType(type, pageLink); - } - - @Override - protected void updateEntity(RuleNode ruleNode) { - try { - ObjectNode configuration = (ObjectNode) ruleNode.getConfiguration(); - JsonNode queueName = configuration.remove("queueName"); - if (queueName != null) { - if (!queueName.isNull()) { - ruleNode.setQueueName(queueName.asText()); - } - ruleChainService.saveRuleNode(null, ruleNode); - } - } catch (Exception e) { - log.error("Unable to update RuleNode", e); - } - } - }; - } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java index f3a01913a2..bb930f9634 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/RuleChainImportService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.service.sync.ie.importing.impl; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -85,14 +83,12 @@ public class RuleChainImportService extends BaseEntityImportService { node.setRuleChainId(null); node.setExternalId(node.getId()); node.setId(null); - setQueueName(node); }); } @@ -108,22 +104,6 @@ public class RuleChainImportService extends BaseEntityImportService clazz = Class.forName(ruleNode.getType()); - org.thingsboard.rule.engine.api.RuleNode ruleNodeAnnotation = clazz.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class); - if (ruleNodeAnnotation.hasQueueName()) { - ObjectNode configuration = (ObjectNode) ruleNode.getConfiguration(); - JsonNode queueName = configuration.remove("queueName"); - if (queueName != null && !queueName.isNull()) { - ruleNode.setQueueName(queueName.asText()); - } - } - } catch (ClassNotFoundException e) { - log.warn("[{}] RuleNode class not found [{}]", ruleNode.getName(), ruleNode.getType()); - } - } - @Override protected RuleChain saveOrUpdate(EntitiesImportCtx ctx, RuleChain ruleChain, RuleChainExportData exportData, IdProvider idProvider) { ruleChain = ruleChainService.saveRuleChain(ruleChain); diff --git a/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java b/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java index 530679683d..23c75b430b 100644 --- a/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/TbNodeUpgradeUtils.java @@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.service.component.RuleNodeClassInfo; +import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; + @Slf4j public class TbNodeUpgradeUtils { @@ -44,9 +46,13 @@ public class TbNodeUpgradeUtils { } else { var tbVersionedNode = getTbVersionedNode(nodeInfo); try { + JsonNode queueName = oldConfiguration.get(QUEUE_NAME); TbPair upgradeResult = tbVersionedNode.upgrade(configurationVersion, oldConfiguration); if (upgradeResult.getFirst()) { node.setConfiguration(upgradeResult.getSecond()); + if (nodeInfo.getAnnotation().hasQueueName() && queueName != null && queueName.isTextual()) { + node.setQueueName(queueName.asText()); + } } } catch (Exception e) { try { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java index 396d810b00..cce5649336 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; */ public class TbNodeUtils { + public static final String QUEUE_NAME = "queueName"; private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])"); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index 0b60f38981..80170a28f1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.debug; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.script.ScriptLanguage; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; @@ -44,12 +47,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.thingsboard.common.util.DonAsynchron.withCallback; +import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; @Slf4j @RuleNode( type = ComponentType.ACTION, name = "generator", configClazz = TbMsgGeneratorNodeConfiguration.class, + version = 1, hasQueueName = true, nodeDescription = "Periodically generates messages", nodeDetails = "Generates messages with configurable period. Javascript function used for message generation.", @@ -177,4 +182,20 @@ public class TbMsgGeneratorNode implements TbNode { scriptEngine = null; } } + + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index a7a8b44b21..8f818583e8 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.deduplication; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; @@ -43,10 +44,13 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; + @RuleNode( type = ComponentType.TRANSFORMATION, name = "deduplication", configClazz = TbMsgDeduplicationNodeConfiguration.class, + version = 1, hasQueueName = true, nodeDescription = "Deduplicate messages within the same originator entity for a configurable period " + "based on a specified deduplication strategy.", @@ -94,6 +98,22 @@ public class TbMsgDeduplicationNode implements TbNode { deduplicationMap.clear(); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + private void processOnRegularMsg(TbContext ctx, TbMsg msg) { EntityId id = msg.getOriginator(); DeduplicationData deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new DeduplicationData()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java index eb658ada68..8fe2de58b2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.flow; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleNode; @@ -25,13 +27,17 @@ import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; +import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME; + @Slf4j @RuleNode( type = ComponentType.FLOW, name = "checkpoint", configClazz = EmptyNodeConfiguration.class, + version = 1, hasQueueName = true, nodeDescription = "transfers the message to another queue", nodeDetails = "After successful transfer incoming message is automatically acknowledged. Queue name is configurable.", @@ -52,4 +58,20 @@ public class TbCheckpointNode implements TbNode { ctx.enqueueForTellNext(msg, queueName, TbNodeConnectionType.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error)); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has(QUEUE_NAME)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).remove(QUEUE_NAME); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + }