used TbNodeUpgradeUtils instead of rule-node upgrade

This commit is contained in:
YevhenBondarenko 2023-12-12 20:47:25 +01:00
parent b65391e6ac
commit e8dc2ddc2d
8 changed files with 70 additions and 70 deletions

View File

@ -281,7 +281,6 @@ public class ThingsboardInstallService {
case "3.6.2": case "3.6.2":
log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ..."); log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ...");
databaseEntitiesUpgradeService.upgradeDatabase("3.6.2"); databaseEntitiesUpgradeService.upgradeDatabase("3.6.2");
dataUpdateService.updateData("3.6.2");
//TODO DON'T FORGET to update switch statement in the CacheCleanupService if you need to clear the cache //TODO DON'T FORGET to update switch statement in the CacheCleanupService if you need to clear the cache
break; break;
default: default:

View File

@ -227,25 +227,11 @@ public class DefaultDataUpdateService implements DataUpdateService {
log.info("Updating data from version 3.6.0 to 3.6.1 ..."); log.info("Updating data from version 3.6.0 to 3.6.1 ...");
migrateDeviceConnectivity(); migrateDeviceConnectivity();
break; break;
case "3.6.2":
updateRuleNodesQueueName();
break;
default: default:
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
} }
} }
private void updateRuleNodesQueueName() {
String[] ruleNodeTypes = {
"org.thingsboard.rule.engine.debug.TbMsgGeneratorNode",
"org.thingsboard.rule.engine.flow.TbCheckpointNode",
"org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode"
};
for (String ruleNodeType : ruleNodeTypes) {
ruleNodeQueueNameUpdater.updateEntities(ruleNodeType);
}
}
private void migrateEdgeEvents(String logPrefix) { private void migrateEdgeEvents(String logPrefix) {
boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false); boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false);
if (!skipEdgeEventsMigration) { if (!skipEdgeEventsMigration) {
@ -806,39 +792,4 @@ public class DefaultDataUpdateService implements DataUpdateService {
} }
} }
private final PaginatedUpdater<String, RuleNode> ruleNodeQueueNameUpdater =
new PaginatedUpdater<>() {
@Override
protected String getName() {
return "RuleNode queue name updater";
}
@Override
protected boolean forceReportTotal() {
return true;
}
@Override
protected PageData<RuleNode> findEntities(String type, PageLink pageLink) {
return ruleChainService.findAllRuleNodesByType(type, pageLink);
}
@Override
protected void updateEntity(RuleNode ruleNode) {
try {
ObjectNode configuration = (ObjectNode) ruleNode.getConfiguration();
JsonNode queueName = configuration.remove("queueName");
if (queueName != null) {
if (!queueName.isNull()) {
ruleNode.setQueueName(queueName.asText());
}
ruleChainService.saveRuleNode(null, ruleNode);
}
} catch (Exception e) {
log.error("Unable to update RuleNode", e);
}
}
};
} }

View File

@ -15,8 +15,6 @@
*/ */
package org.thingsboard.server.service.sync.ie.importing.impl; package org.thingsboard.server.service.sync.ie.importing.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -85,14 +83,12 @@ public class RuleChainImportService extends BaseEntityImportService<RuleChainId,
node.setRuleChainId(old.getId()); node.setRuleChainId(old.getId());
node.setExternalId(node.getId()); node.setExternalId(node.getId());
node.setId((RuleNodeId) ctx.getInternalId(node.getId())); node.setId((RuleNodeId) ctx.getInternalId(node.getId()));
setQueueName(node);
}); });
} else { } else {
ruleNodes.forEach(node -> { ruleNodes.forEach(node -> {
node.setRuleChainId(null); node.setRuleChainId(null);
node.setExternalId(node.getId()); node.setExternalId(node.getId());
node.setId(null); node.setId(null);
setQueueName(node);
}); });
} }
@ -108,22 +104,6 @@ public class RuleChainImportService extends BaseEntityImportService<RuleChainId,
return ruleChain; return ruleChain;
} }
private void setQueueName(RuleNode ruleNode) {
try {
Class<?> clazz = Class.forName(ruleNode.getType());
org.thingsboard.rule.engine.api.RuleNode ruleNodeAnnotation = clazz.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class);
if (ruleNodeAnnotation.hasQueueName()) {
ObjectNode configuration = (ObjectNode) ruleNode.getConfiguration();
JsonNode queueName = configuration.remove("queueName");
if (queueName != null && !queueName.isNull()) {
ruleNode.setQueueName(queueName.asText());
}
}
} catch (ClassNotFoundException e) {
log.warn("[{}] RuleNode class not found [{}]", ruleNode.getName(), ruleNode.getType());
}
}
@Override @Override
protected RuleChain saveOrUpdate(EntitiesImportCtx ctx, RuleChain ruleChain, RuleChainExportData exportData, IdProvider idProvider) { protected RuleChain saveOrUpdate(EntitiesImportCtx ctx, RuleChain ruleChain, RuleChainExportData exportData, IdProvider idProvider) {
ruleChain = ruleChainService.saveRuleChain(ruleChain); ruleChain = ruleChainService.saveRuleChain(ruleChain);

View File

@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.service.component.RuleNodeClassInfo; import org.thingsboard.server.service.component.RuleNodeClassInfo;
import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME;
@Slf4j @Slf4j
public class TbNodeUpgradeUtils { public class TbNodeUpgradeUtils {
@ -44,9 +46,13 @@ public class TbNodeUpgradeUtils {
} else { } else {
var tbVersionedNode = getTbVersionedNode(nodeInfo); var tbVersionedNode = getTbVersionedNode(nodeInfo);
try { try {
JsonNode queueName = oldConfiguration.get(QUEUE_NAME);
TbPair<Boolean, JsonNode> upgradeResult = tbVersionedNode.upgrade(configurationVersion, oldConfiguration); TbPair<Boolean, JsonNode> upgradeResult = tbVersionedNode.upgrade(configurationVersion, oldConfiguration);
if (upgradeResult.getFirst()) { if (upgradeResult.getFirst()) {
node.setConfiguration(upgradeResult.getSecond()); node.setConfiguration(upgradeResult.getSecond());
if (nodeInfo.getAnnotation().hasQueueName() && queueName != null && queueName.isTextual()) {
node.setQueueName(queueName.asText());
}
} }
} catch (Exception e) { } catch (Exception e) {
try { try {

View File

@ -36,6 +36,7 @@ import java.util.stream.Collectors;
*/ */
public class TbNodeUtils { public class TbNodeUtils {
public static final String QUEUE_NAME = "queueName";
private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])"); private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])");

View File

@ -15,6 +15,8 @@
*/ */
package org.thingsboard.rule.engine.debug; package org.thingsboard.rule.engine.debug;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.script.ScriptLanguage; import org.thingsboard.server.common.data.script.ScriptLanguage;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
@ -44,12 +47,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.thingsboard.common.util.DonAsynchron.withCallback; import static org.thingsboard.common.util.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME;
@Slf4j @Slf4j
@RuleNode( @RuleNode(
type = ComponentType.ACTION, type = ComponentType.ACTION,
name = "generator", name = "generator",
configClazz = TbMsgGeneratorNodeConfiguration.class, configClazz = TbMsgGeneratorNodeConfiguration.class,
version = 1,
hasQueueName = true, hasQueueName = true,
nodeDescription = "Periodically generates messages", nodeDescription = "Periodically generates messages",
nodeDetails = "Generates messages with configurable period. Javascript function used for message generation.", nodeDetails = "Generates messages with configurable period. Javascript function used for message generation.",
@ -177,4 +182,20 @@ public class TbMsgGeneratorNode implements TbNode {
scriptEngine = null; scriptEngine = null;
} }
} }
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (oldConfiguration.has(QUEUE_NAME)) {
hasChanges = true;
((ObjectNode) oldConfiguration).remove(QUEUE_NAME);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.rule.engine.deduplication; package org.thingsboard.rule.engine.deduplication;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -43,10 +44,13 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME;
@RuleNode( @RuleNode(
type = ComponentType.TRANSFORMATION, type = ComponentType.TRANSFORMATION,
name = "deduplication", name = "deduplication",
configClazz = TbMsgDeduplicationNodeConfiguration.class, configClazz = TbMsgDeduplicationNodeConfiguration.class,
version = 1,
hasQueueName = true, hasQueueName = true,
nodeDescription = "Deduplicate messages within the same originator entity for a configurable period " + nodeDescription = "Deduplicate messages within the same originator entity for a configurable period " +
"based on a specified deduplication strategy.", "based on a specified deduplication strategy.",
@ -94,6 +98,22 @@ public class TbMsgDeduplicationNode implements TbNode {
deduplicationMap.clear(); deduplicationMap.clear();
} }
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (oldConfiguration.has(QUEUE_NAME)) {
hasChanges = true;
((ObjectNode) oldConfiguration).remove(QUEUE_NAME);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
private void processOnRegularMsg(TbContext ctx, TbMsg msg) { private void processOnRegularMsg(TbContext ctx, TbMsg msg) {
EntityId id = msg.getOriginator(); EntityId id = msg.getOriginator();
DeduplicationData deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new DeduplicationData()); DeduplicationData deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new DeduplicationData());

View File

@ -15,6 +15,8 @@
*/ */
package org.thingsboard.rule.engine.flow; package org.thingsboard.rule.engine.flow;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
@ -25,13 +27,17 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME;
@Slf4j @Slf4j
@RuleNode( @RuleNode(
type = ComponentType.FLOW, type = ComponentType.FLOW,
name = "checkpoint", name = "checkpoint",
configClazz = EmptyNodeConfiguration.class, configClazz = EmptyNodeConfiguration.class,
version = 1,
hasQueueName = true, hasQueueName = true,
nodeDescription = "transfers the message to another queue", nodeDescription = "transfers the message to another queue",
nodeDetails = "After successful transfer incoming message is automatically acknowledged. Queue name is configurable.", nodeDetails = "After successful transfer incoming message is automatically acknowledged. Queue name is configurable.",
@ -52,4 +58,20 @@ public class TbCheckpointNode implements TbNode {
ctx.enqueueForTellNext(msg, queueName, TbNodeConnectionType.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error)); ctx.enqueueForTellNext(msg, queueName, TbNodeConnectionType.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
} }
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (oldConfiguration.has(QUEUE_NAME)) {
hasChanges = true;
((ObjectNode) oldConfiguration).remove(QUEUE_NAME);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
} }