From 631d314fcebe4f63c36597ead46549596bc1d2a4 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Mon, 13 Jan 2025 15:54:11 +0200 Subject: [PATCH] Save time series strategies: add Java upgrade script --- .../main/data/upgrade/basic/schema_update.sql | 5 +- .../engine/telemetry/TbMsgTimeseriesNode.java | 36 +++++++- .../telemetry/TbMsgTimeseriesNodeTest.java | 86 ++++++++++++++++++- 3 files changed, 121 insertions(+), 6 deletions(-) diff --git a/application/src/main/data/upgrade/basic/schema_update.sql b/application/src/main/data/upgrade/basic/schema_update.sql index 0ff57119f4..6aaf8c3dbd 100644 --- a/application/src/main/data/upgrade/basic/schema_update.sql +++ b/application/src/main/data/upgrade/basic/schema_update.sql @@ -269,8 +269,9 @@ DO $$ 'type', 'ON_EVERY_MESSAGE' ) ) - END::text - WHERE type = 'org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode'; + END::text, + configuration_version = 1 + WHERE type = 'org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode' AND configuration_version = 0; -- Drop the helper function DROP FUNCTION is_valid_jsonb(text); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 1a014c5a43..db2421e307 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -15,8 +15,11 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -24,6 +27,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TenantProfile; @@ -32,6 +36,7 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import java.util.ArrayList; @@ -66,7 +71,8 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE "So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeTimeseriesConfig", - icon = "file_upload" + icon = "file_upload", + version = 1 ) public class TbMsgTimeseriesNode implements TbNode { @@ -182,4 +188,32 @@ public class TbMsgTimeseriesNode implements TbNode { ctx.removeListeners(); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (oldConfiguration.has("persistenceSettings")) { + break; + } + hasChanges = true; + JsonNode skipLatestPersistence = oldConfiguration.get("skipLatestPersistence"); + if (skipLatestPersistence != null && "true".equals(skipLatestPersistence.asText())) { + var skipLatestPersistenceSettings = new Advanced( + PersistenceStrategy.onEveryMessage(), + PersistenceStrategy.skip(), + PersistenceStrategy.onEveryMessage() + ); + ((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(skipLatestPersistenceSettings)); + } else { + ((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(new OnEveryMessage())); + } + ((ObjectNode) oldConfiguration).remove("skipLatestPersistence"); + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index ba8c614944..71d060aab2 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -25,11 +25,12 @@ import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.ThrowingConsumer; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -60,12 +61,13 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.assertArg; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -public class TbMsgTimeseriesNodeTest { +public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384")); @@ -82,7 +84,7 @@ public class TbMsgTimeseriesNodeTest { @BeforeEach public void setUp() throws TbNodeException { - node = new TbMsgTimeseriesNode(); + node = spy(new TbMsgTimeseriesNode()); config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration(); } @@ -291,4 +293,82 @@ public class TbMsgTimeseriesNodeTest { return expectedList; } + @Override + protected TbNode getTestNode() { + return node; + } + + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + Arguments.of(0, """ + { + "defaultTTL": 0, + "useServerTs": false, + "skipLatestPersistence": false + }""", + true, + """ + { + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } + }"""), + Arguments.of(0, """ + { + "defaultTTL": 0, + "useServerTs": false + }""", + true, + """ + { + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } + }"""), + Arguments.of(0, """ + { + "defaultTTL": 0, + "useServerTs": false, + "skipLatestPersistence": null + }""", + true, + """ + { + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ON_EVERY_MESSAGE" + } + }"""), + Arguments.of(0, """ + { + "defaultTTL": 0, + "useServerTs": false, + "skipLatestPersistence": true + }""", + true, + """ + { + "defaultTTL": 0, + "useServerTs": false, + "persistenceSettings": { + "type": "ADVANCED", + "timeseries": { + "type": "ON_EVERY_MESSAGE" + }, + "latest": { + "type": "SKIP" + }, + "webSockets": { + "type": "ON_EVERY_MESSAGE" + } + } + }""") + ); + } + }