Save time series strategies: add Java upgrade script

This commit is contained in:
Dmytro Skarzhynets 2025-01-13 15:54:11 +02:00
parent 698a0c19ec
commit 631d314fce
3 changed files with 121 additions and 6 deletions

View File

@ -269,8 +269,9 @@ DO $$
'type', 'ON_EVERY_MESSAGE' 'type', 'ON_EVERY_MESSAGE'
) )
) )
END::text END::text,
WHERE type = 'org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode'; configuration_version = 1
WHERE type = 'org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode' AND configuration_version = 0;
-- Drop the helper function -- Drop the helper function
DROP FUNCTION is_valid_jsonb(text); DROP FUNCTION is_valid_jsonb(text);

View File

@ -15,8 +15,11 @@
*/ */
package org.thingsboard.rule.engine.telemetry; package org.thingsboard.rule.engine.telemetry;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNode;
@ -24,6 +27,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.TenantProfile;
@ -32,6 +36,7 @@ import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import java.util.ArrayList; import java.util.ArrayList;
@ -66,7 +71,8 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE
"So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.", "So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.",
uiResources = {"static/rulenode/rulenode-core-config.js"}, uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeTimeseriesConfig", configDirective = "tbActionNodeTimeseriesConfig",
icon = "file_upload" icon = "file_upload",
version = 1
) )
public class TbMsgTimeseriesNode implements TbNode { public class TbMsgTimeseriesNode implements TbNode {
@ -182,4 +188,32 @@ public class TbMsgTimeseriesNode implements TbNode {
ctx.removeListeners(); ctx.removeListeners();
} }
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (oldConfiguration.has("persistenceSettings")) {
break;
}
hasChanges = true;
JsonNode skipLatestPersistence = oldConfiguration.get("skipLatestPersistence");
if (skipLatestPersistence != null && "true".equals(skipLatestPersistence.asText())) {
var skipLatestPersistenceSettings = new Advanced(
PersistenceStrategy.onEveryMessage(),
PersistenceStrategy.skip(),
PersistenceStrategy.onEveryMessage()
);
((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(skipLatestPersistenceSettings));
} else {
((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(new OnEveryMessage()));
}
((ObjectNode) oldConfiguration).remove("skipLatestPersistence");
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
} }

View File

@ -25,11 +25,12 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.ThrowingConsumer;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
@ -60,12 +61,13 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.assertArg; import static org.mockito.ArgumentMatchers.assertArg;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class TbMsgTimeseriesNodeTest { public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377")); private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377"));
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384"));
@ -82,7 +84,7 @@ public class TbMsgTimeseriesNodeTest {
@BeforeEach @BeforeEach
public void setUp() throws TbNodeException { public void setUp() throws TbNodeException {
node = new TbMsgTimeseriesNode(); node = spy(new TbMsgTimeseriesNode());
config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration(); config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration();
} }
@ -291,4 +293,82 @@ public class TbMsgTimeseriesNodeTest {
return expectedList; return expectedList;
} }
@Override
protected TbNode getTestNode() {
return node;
}
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
Arguments.of(0, """
{
"defaultTTL": 0,
"useServerTs": false,
"skipLatestPersistence": false
}""",
true,
"""
{
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
}"""),
Arguments.of(0, """
{
"defaultTTL": 0,
"useServerTs": false
}""",
true,
"""
{
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
}"""),
Arguments.of(0, """
{
"defaultTTL": 0,
"useServerTs": false,
"skipLatestPersistence": null
}""",
true,
"""
{
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ON_EVERY_MESSAGE"
}
}"""),
Arguments.of(0, """
{
"defaultTTL": 0,
"useServerTs": false,
"skipLatestPersistence": true
}""",
true,
"""
{
"defaultTTL": 0,
"useServerTs": false,
"persistenceSettings": {
"type": "ADVANCED",
"timeseries": {
"type": "ON_EVERY_MESSAGE"
},
"latest": {
"type": "SKIP"
},
"webSockets": {
"type": "ON_EVERY_MESSAGE"
}
}
}""")
);
}
} }