added upgrade tests, and minor fixes
This commit is contained in:
parent
65a8b0a34c
commit
4101fc3963
@ -277,7 +277,6 @@ public class ThingsboardInstallService {
|
|||||||
} else {
|
} else {
|
||||||
log.info("Skipping images migration. Run the upgrade with fromVersion as '3.6.2-images' to migrate");
|
log.info("Skipping images migration. Run the upgrade with fromVersion as '3.6.2-images' to migrate");
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
case "3.6.2":
|
case "3.6.2":
|
||||||
log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ...");
|
log.info("Upgrading ThingsBoard from version 3.6.2 to 3.6.3 ...");
|
||||||
databaseEntitiesUpgradeService.upgradeDatabase("3.6.2");
|
databaseEntitiesUpgradeService.upgradeDatabase("3.6.2");
|
||||||
|
|||||||
@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
|
|||||||
import org.thingsboard.server.common.data.util.TbPair;
|
import org.thingsboard.server.common.data.util.TbPair;
|
||||||
import org.thingsboard.server.service.component.RuleNodeClassInfo;
|
import org.thingsboard.server.service.component.RuleNodeClassInfo;
|
||||||
|
|
||||||
import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME;
|
import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class TbNodeUpgradeUtils {
|
public class TbNodeUpgradeUtils {
|
||||||
|
|||||||
@ -135,5 +135,6 @@ public class DataConstants {
|
|||||||
public static final String HP_QUEUE_TOPIC = "tb_rule_engine.hp";
|
public static final String HP_QUEUE_TOPIC = "tb_rule_engine.hp";
|
||||||
public static final String SQ_QUEUE_NAME = "SequentialByOriginator";
|
public static final String SQ_QUEUE_NAME = "SequentialByOriginator";
|
||||||
public static final String SQ_QUEUE_TOPIC = "tb_rule_engine.sq";
|
public static final String SQ_QUEUE_TOPIC = "tb_rule_engine.sq";
|
||||||
|
public static final String QUEUE_NAME = "queueName";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -55,7 +55,7 @@ public class ComponentDescriptor extends BaseData<ComponentDescriptorId> {
|
|||||||
@Length(fieldName = "actions")
|
@Length(fieldName = "actions")
|
||||||
@ApiModelProperty(position = 10, value = "Rule Node Actions. Deprecated. Always null.", accessMode = ApiModelProperty.AccessMode.READ_ONLY)
|
@ApiModelProperty(position = 10, value = "Rule Node Actions. Deprecated. Always null.", accessMode = ApiModelProperty.AccessMode.READ_ONLY)
|
||||||
@Getter @Setter private String actions;
|
@Getter @Setter private String actions;
|
||||||
@ApiModelProperty(position = 11, value = "Indicates that the RuleNode is support queue name.", accessMode = ApiModelProperty.AccessMode.READ_ONLY, example = "true")
|
@ApiModelProperty(position = 11, value = "Indicates that the RuleNode supports queue name configuration.", accessMode = ApiModelProperty.AccessMode.READ_ONLY, example = "true")
|
||||||
@Getter @Setter private boolean hasQueueName;
|
@Getter @Setter private boolean hasQueueName;
|
||||||
|
|
||||||
public ComponentDescriptor() {
|
public ComponentDescriptor() {
|
||||||
|
|||||||
@ -53,7 +53,7 @@
|
|||||||
"layoutY": 151
|
"layoutY": 151
|
||||||
},
|
},
|
||||||
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
|
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
|
||||||
"name": "swatch",
|
"name": "switch",
|
||||||
"debugMode": false,
|
"debugMode": false,
|
||||||
"singletonMode": false,
|
"singletonMode": false,
|
||||||
"configurationVersion": 0,
|
"configurationVersion": 0,
|
||||||
@ -76,4 +76,4 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"ruleChainConnections": null
|
"ruleChainConnections": null
|
||||||
}
|
}
|
||||||
|
|||||||
@ -36,8 +36,6 @@ import java.util.stream.Collectors;
|
|||||||
*/
|
*/
|
||||||
public class TbNodeUtils {
|
public class TbNodeUtils {
|
||||||
|
|
||||||
public static final String QUEUE_NAME = "queueName";
|
|
||||||
|
|
||||||
private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])");
|
private static final Pattern DATA_PATTERN = Pattern.compile("(\\$\\[)(.*?)(])");
|
||||||
|
|
||||||
public static <T> T convert(TbNodeConfiguration configuration, Class<T> clazz) throws TbNodeException {
|
public static <T> T convert(TbNodeConfiguration configuration, Class<T> clazz) throws TbNodeException {
|
||||||
|
|||||||
@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import static org.thingsboard.common.util.DonAsynchron.withCallback;
|
import static org.thingsboard.common.util.DonAsynchron.withCallback;
|
||||||
import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME;
|
import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RuleNode(
|
@RuleNode(
|
||||||
|
|||||||
@ -44,7 +44,7 @@ import java.util.Optional;
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME;
|
import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
|
||||||
|
|
||||||
@RuleNode(
|
@RuleNode(
|
||||||
type = ComponentType.TRANSFORMATION,
|
type = ComponentType.TRANSFORMATION,
|
||||||
@ -177,7 +177,7 @@ public class TbMsgDeduplicationNode implements TbNode {
|
|||||||
}
|
}
|
||||||
if (resultMsg != null) {
|
if (resultMsg != null) {
|
||||||
deduplicationResults.add(TbMsg.newMsg(
|
deduplicationResults.add(TbMsg.newMsg(
|
||||||
resultMsg.getQueueName(),
|
queueName,
|
||||||
resultMsg.getType(),
|
resultMsg.getType(),
|
||||||
resultMsg.getOriginator(),
|
resultMsg.getOriginator(),
|
||||||
resultMsg.getCustomerId(),
|
resultMsg.getCustomerId(),
|
||||||
|
|||||||
@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
|
|||||||
import org.thingsboard.server.common.data.util.TbPair;
|
import org.thingsboard.server.common.data.util.TbPair;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
|
|
||||||
import static org.thingsboard.rule.engine.api.util.TbNodeUtils.QUEUE_NAME;
|
import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RuleNode(
|
@RuleNode(
|
||||||
|
|||||||
@ -0,0 +1,52 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.rule.engine;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
|
import org.thingsboard.server.common.data.util.TbPair;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
|
import static org.mockito.BDDMockito.willCallRealMethod;
|
||||||
|
|
||||||
|
public abstract class AbstractRuleNodeUpgradeTest {
|
||||||
|
|
||||||
|
protected abstract TbNode getTestNode();
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource
|
||||||
|
public void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException {
|
||||||
|
// GIVEN
|
||||||
|
willCallRealMethod().given(getTestNode()).upgrade(anyInt(), any());
|
||||||
|
JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr);
|
||||||
|
JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
TbPair<Boolean, JsonNode> upgradeResult = getTestNode().upgrade(givenVersion, givenConfig);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges);
|
||||||
|
ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond();
|
||||||
|
assertThat(upgradedConfig).isEqualTo(expectedConfig);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,53 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.rule.engine.debug;
|
||||||
|
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
|
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
|
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
|
public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest {
|
||||||
|
|
||||||
|
// Rule nodes upgrade
|
||||||
|
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
||||||
|
return Stream.of(
|
||||||
|
// default config for version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"queueName\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}",
|
||||||
|
true,
|
||||||
|
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}"),
|
||||||
|
// default config for version 0 with queueName
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"queueName\":\"Main\", \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}",
|
||||||
|
true,
|
||||||
|
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}"),
|
||||||
|
// default config for version 1 with upgrade from version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}",
|
||||||
|
false,
|
||||||
|
"{\"msgCount\":0,\"periodInSeconds\":1,\"originatorId\":null,\"originatorType\":null, \"scriptLang\":\"TBEL\",\"jsScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\",\"tbelScript\":\"var msg = { temp: 42, humidity: 77 };\\nvar metadata = { data: 40 };\\nvar msgType = \\\"POST_TELEMETRY_REQUEST\\\";\\n\\nreturn { msg: msg, metadata: metadata, msgType: msgType };\"}")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TbNode getTestNode() {
|
||||||
|
return spy(TbMsgGeneratorNode.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,55 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.rule.engine.flow;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
|
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
|
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class TbCheckpointNodeTest extends AbstractRuleNodeUpgradeTest {
|
||||||
|
|
||||||
|
// Rule nodes upgrade
|
||||||
|
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
||||||
|
return Stream.of(
|
||||||
|
// default config for version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"queueName\":null}",
|
||||||
|
true,
|
||||||
|
"{}"),
|
||||||
|
// default config for version 0 with queueName
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"queueName\":\"Main\"}",
|
||||||
|
true,
|
||||||
|
"{}"),
|
||||||
|
// default config for version 1 with upgrade from version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{}",
|
||||||
|
false,
|
||||||
|
"{}")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TbNode getTestNode() {
|
||||||
|
return spy(TbCheckpointNode.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.telemetry;
|
package org.thingsboard.rule.engine.telemetry;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
@ -25,8 +24,10 @@ import org.junit.jupiter.params.provider.Arguments;
|
|||||||
import org.junit.jupiter.params.provider.MethodSource;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
|
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
|
||||||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
@ -40,7 +41,6 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
|
|||||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
import org.thingsboard.server.common.data.util.TbPair;
|
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
@ -53,7 +53,6 @@ import java.util.stream.Stream;
|
|||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||||
import static org.mockito.ArgumentMatchers.anyInt;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.BDDMockito.willCallRealMethod;
|
import static org.mockito.BDDMockito.willCallRealMethod;
|
||||||
@ -65,7 +64,7 @@ import static org.mockito.Mockito.when;
|
|||||||
import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY;
|
import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
class TbMsgAttributesNodeTest {
|
class TbMsgAttributesNodeTest extends AbstractRuleNodeUpgradeTest {
|
||||||
|
|
||||||
private TenantId tenantId;
|
private TenantId tenantId;
|
||||||
private DeviceId deviceId;
|
private DeviceId deviceId;
|
||||||
@ -223,21 +222,9 @@ class TbMsgAttributesNodeTest {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Override
|
||||||
@MethodSource
|
protected TbNode getTestNode() {
|
||||||
void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException {
|
return node;
|
||||||
// GIVEN
|
|
||||||
willCallRealMethod().given(node).upgrade(anyInt(), any());
|
|
||||||
JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr);
|
|
||||||
JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr);
|
|
||||||
|
|
||||||
// WHEN
|
|
||||||
TbPair<Boolean, JsonNode> upgradeResult = node.upgrade(givenVersion, givenConfig);
|
|
||||||
|
|
||||||
// THEN
|
|
||||||
assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges);
|
|
||||||
ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond();
|
|
||||||
assertThat(upgradedConfig).isEqualTo(expectedConfig);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,12 +22,15 @@ import org.junit.jupiter.api.AfterEach;
|
|||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.ArgumentMatchers;
|
import org.mockito.ArgumentMatchers;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
|
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
|
||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy;
|
import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy;
|
||||||
@ -40,7 +43,6 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
|
|||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
|
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
|
||||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
@ -56,6 +58,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
@ -69,7 +72,7 @@ import static org.mockito.Mockito.verify;
|
|||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class TbMsgDeduplicationNodeTest {
|
public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
|
||||||
|
|
||||||
private TbContext ctx;
|
private TbContext ctx;
|
||||||
|
|
||||||
@ -293,7 +296,7 @@ public class TbMsgDeduplicationNodeTest {
|
|||||||
for (TbMsg msg : firstMsgPack) {
|
for (TbMsg msg : firstMsgPack) {
|
||||||
node.onMsg(ctx, msg);
|
node.onMsg(ctx, msg);
|
||||||
}
|
}
|
||||||
long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval);
|
long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval);
|
||||||
|
|
||||||
List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500);
|
List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500);
|
||||||
for (TbMsg msg : secondMsgPack) {
|
for (TbMsg msg : secondMsgPack) {
|
||||||
@ -386,6 +389,27 @@ public class TbMsgDeduplicationNodeTest {
|
|||||||
Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType());
|
Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Rule nodes upgrade
|
||||||
|
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
||||||
|
return Stream.of(
|
||||||
|
// default config for version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3, \"queueName\":null}",
|
||||||
|
true,
|
||||||
|
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}"),
|
||||||
|
// default config for version 0 with queueName
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3, \"queueName\":\"Main\"}",
|
||||||
|
true,
|
||||||
|
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}"),
|
||||||
|
// default config for version 1 with upgrade from version 0
|
||||||
|
Arguments.of(0,
|
||||||
|
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}",
|
||||||
|
false,
|
||||||
|
"{\"interval\":60,\"strategy\":\"FIRST\",\"outMsgType\":null,\"maxPendingMsgs\":100,\"maxRetries\":3}")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private TbMsg getMsgWithLatestTs(List<TbMsg> firstMsgPack) {
|
private TbMsg getMsgWithLatestTs(List<TbMsg> firstMsgPack) {
|
||||||
int indexOfLastMsgInArray = firstMsgPack.size() - 1;
|
int indexOfLastMsgInArray = firstMsgPack.size() - 1;
|
||||||
int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1;
|
int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1;
|
||||||
@ -430,4 +454,9 @@ public class TbMsgDeduplicationNodeTest {
|
|||||||
return JacksonUtil.toString(mergedData);
|
return JacksonUtil.toString(mergedData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TbNode getTestNode() {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user