Merge pull request #11646 from thingsboard/revert-11140-improvements/delay-node
Revert "Delay node: improvements"
This commit is contained in:
commit
6e76509619
@ -15,9 +15,8 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.delay;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
@ -27,27 +26,18 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.dao.service.ConstraintValidator.validateFields;
|
||||
|
||||
@Slf4j
|
||||
@RuleNode(
|
||||
type = ComponentType.ACTION,
|
||||
name = "delay (deprecated)",
|
||||
version = 1,
|
||||
configClazz = TbMsgDelayNodeConfiguration.class,
|
||||
nodeDescription = "Delays incoming message (deprecated)",
|
||||
nodeDetails = "Delays messages for a configurable period. " +
|
||||
@ -60,22 +50,13 @@ import static org.thingsboard.server.dao.service.ConstraintValidator.validateFie
|
||||
)
|
||||
public class TbMsgDelayNode implements TbNode {
|
||||
|
||||
private static final Set<TimeUnit> supportedTimeUnits = EnumSet.of(TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
|
||||
private static final String supportedTimeUnitsStr = supportedTimeUnits.stream().map(TimeUnit::name).collect(Collectors.joining(", "));
|
||||
|
||||
private TbMsgDelayNodeConfiguration config;
|
||||
private ConcurrentMap<UUID, TbMsg> pendingMsgs;
|
||||
private Map<UUID, TbMsg> pendingMsgs;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, TbMsgDelayNodeConfiguration.class);
|
||||
String errorPrefix = "'" + ctx.getSelf().getName() + "' node configuration is invalid: ";
|
||||
try {
|
||||
validateFields(config, errorPrefix);
|
||||
} catch (DataValidationException e) {
|
||||
throw new TbNodeException(e, true);
|
||||
}
|
||||
this.pendingMsgs = new ConcurrentHashMap<>();
|
||||
this.pendingMsgs = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -86,7 +67,7 @@ public class TbMsgDelayNode implements TbNode {
|
||||
ctx.enqueueForTellNext(
|
||||
TbMsg.newMsg(
|
||||
pendingMsg.getQueueName(),
|
||||
pendingMsg.getInternalType(),
|
||||
pendingMsg.getType(),
|
||||
pendingMsg.getOriginator(),
|
||||
pendingMsg.getCustomerId(),
|
||||
pendingMsg.getMetaData(),
|
||||
@ -108,69 +89,25 @@ public class TbMsgDelayNode implements TbNode {
|
||||
}
|
||||
|
||||
private long getDelay(TbMsg msg) {
|
||||
String timeUnitPattern = TbNodeUtils.processPattern(config.getTimeUnit(), msg);
|
||||
String periodPattern = TbNodeUtils.processPattern(config.getPeriod(), msg);
|
||||
try {
|
||||
TimeUnit timeUnit = TimeUnit.valueOf(timeUnitPattern.toUpperCase());
|
||||
if (!supportedTimeUnits.contains(timeUnit)) {
|
||||
throw new RuntimeException("Time unit '" + timeUnit + "' is not supported! " +
|
||||
"Only " + supportedTimeUnitsStr + " are supported.");
|
||||
int periodInSeconds;
|
||||
if (config.isUseMetadataPeriodInSecondsPatterns()) {
|
||||
if (isParsable(msg, config.getPeriodInSecondsPattern())) {
|
||||
periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg));
|
||||
} else {
|
||||
throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern());
|
||||
}
|
||||
int period = Integer.parseInt(periodPattern);
|
||||
return timeUnit.toMillis(period);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new NumberFormatException("Can't parse period value : " + periodPattern);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalArgumentException("Invalid value for period time unit : " + timeUnitPattern);
|
||||
} else {
|
||||
periodInSeconds = config.getPeriodInSeconds();
|
||||
}
|
||||
return TimeUnit.SECONDS.toMillis(periodInSeconds);
|
||||
}
|
||||
|
||||
private boolean isParsable(TbMsg msg, String pattern) {
|
||||
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
pendingMsgs.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
|
||||
boolean hasChanges = false;
|
||||
switch (fromVersion) {
|
||||
case 0:
|
||||
var periodInSeconds = "periodInSeconds";
|
||||
var periodInSecondsPattern = "periodInSecondsPattern";
|
||||
var useMetadataPeriodInSecondsPatterns = "useMetadataPeriodInSecondsPatterns";
|
||||
var period = "period";
|
||||
if (oldConfiguration.has(useMetadataPeriodInSecondsPatterns)) {
|
||||
var isUsedPattern = oldConfiguration.get(useMetadataPeriodInSecondsPatterns).booleanValue();
|
||||
if (isUsedPattern) {
|
||||
if (!oldConfiguration.has(periodInSecondsPattern)) {
|
||||
throw new TbNodeException("Property to update: '" + periodInSecondsPattern + "' does not exist in configuration.");
|
||||
}
|
||||
((ObjectNode) oldConfiguration).set(period, oldConfiguration.get(periodInSecondsPattern));
|
||||
} else {
|
||||
if (!oldConfiguration.has(periodInSeconds)) {
|
||||
throw new TbNodeException("Property to update: '" + periodInSeconds + "' does not exist in configuration.");
|
||||
}
|
||||
((ObjectNode) oldConfiguration).put(period, oldConfiguration.get(periodInSeconds).asText());
|
||||
}
|
||||
hasChanges = true;
|
||||
} else if (oldConfiguration.has(periodInSeconds)) {
|
||||
((ObjectNode) oldConfiguration).put(period, oldConfiguration.get(periodInSeconds).asText());
|
||||
hasChanges = true;
|
||||
}
|
||||
if (!oldConfiguration.has(period)) {
|
||||
((ObjectNode) oldConfiguration).put(period, "60");
|
||||
hasChanges = true;
|
||||
}
|
||||
var timeUnit = "timeUnit";
|
||||
if (!oldConfiguration.has(timeUnit)) {
|
||||
((ObjectNode) oldConfiguration).put(timeUnit, TimeUnit.SECONDS.name());
|
||||
hasChanges = true;
|
||||
}
|
||||
((ObjectNode) oldConfiguration).remove(List.of(periodInSeconds, periodInSecondsPattern, useMetadataPeriodInSecondsPatterns));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return new TbPair<>(hasChanges, oldConfiguration);
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,31 +15,23 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.delay;
|
||||
|
||||
import jakarta.validation.constraints.Max;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Data
|
||||
public class TbMsgDelayNodeConfiguration implements NodeConfiguration<TbMsgDelayNodeConfiguration> {
|
||||
|
||||
@NotNull
|
||||
private String period;
|
||||
@NotNull
|
||||
private String timeUnit;
|
||||
@Min(1)
|
||||
@Max(100000)
|
||||
private int periodInSeconds;
|
||||
private int maxPendingMsgs;
|
||||
private String periodInSecondsPattern;
|
||||
private boolean useMetadataPeriodInSecondsPatterns;
|
||||
|
||||
@Override
|
||||
public TbMsgDelayNodeConfiguration defaultConfiguration() {
|
||||
TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration();
|
||||
configuration.setPeriod("60");
|
||||
configuration.setTimeUnit(TimeUnit.SECONDS.name());
|
||||
configuration.setPeriodInSeconds(60);
|
||||
configuration.setMaxPendingMsgs(1000);
|
||||
configuration.setUseMetadataPeriodInSecondsPatterns(false);
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,343 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.delay;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
|
||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatNoException;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.BDDMockito.spy;
|
||||
import static org.mockito.BDDMockito.then;
|
||||
import static org.mockito.BDDMockito.willAnswer;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TbMsgDelayNodeTest extends AbstractRuleNodeUpgradeTest {
|
||||
|
||||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("20107cf0-1c5e-4ac4-8131-7c466c955a7c"));
|
||||
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1be24225-b669-4b26-ab7e-083aaa82d0a0"));
|
||||
|
||||
private final Set<TimeUnit> supportedTimeUnits = EnumSet.of(TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
|
||||
private final String supportedTimeUnitsStr = supportedTimeUnits.stream().map(TimeUnit::name).collect(Collectors.joining(", "));
|
||||
|
||||
private TbMsgDelayNode node;
|
||||
private TbMsgDelayNodeConfiguration config;
|
||||
|
||||
@Mock
|
||||
private TbContext ctxMock;
|
||||
@Mock
|
||||
private RuleNode ruleNodeMock;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
node = spy(new TbMsgDelayNode());
|
||||
config = new TbMsgDelayNodeConfiguration().defaultConfiguration();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void verifyDefaultConfig() {
|
||||
assertThat(config.getPeriod()).isEqualTo("60");
|
||||
assertThat(config.getMaxPendingMsgs()).isEqualTo(1000);
|
||||
assertThat(config.getTimeUnit()).isEqualTo(TimeUnit.SECONDS.name());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDefaultConfig_whenInit_thenOk() {
|
||||
given(ctxMock.getSelf()).willReturn(ruleNodeMock);
|
||||
assertThatNoException().isThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(ints = {-1, 0, 5000000})
|
||||
public void givenInvalidMaxPendingMsgsValue_whenInit_thenThrowsException(int maxPendingMsgs) {
|
||||
config.setMaxPendingMsgs(maxPendingMsgs);
|
||||
verifyValidationExceptionOnInit();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenPeriodIsNull_whenInit_thenThrowsException() {
|
||||
config.setPeriod(null);
|
||||
verifyValidationExceptionOnInit();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenTimeUnitIsNull_whenInit_thenThrowsException() {
|
||||
config.setTimeUnit(null);
|
||||
verifyValidationExceptionOnInit();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource
|
||||
public void givenPeriodValueAndPeriodTimeUnitPatterns_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext(
|
||||
String periodPattern, String timeUnitPattern, TbMsgMetaData metaData, String data, long expectedDelay) throws TbNodeException {
|
||||
config.setPeriod(periodPattern);
|
||||
config.setTimeUnit(timeUnitPattern);
|
||||
given(ctxMock.getSelf()).willReturn(ruleNodeMock);
|
||||
|
||||
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
|
||||
|
||||
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
|
||||
var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, RULE_NODE_ID, TbMsgMetaData.EMPTY, msg.getId().toString());
|
||||
|
||||
given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg);
|
||||
given(ctxMock.getSelfId()).willReturn(RULE_NODE_ID);
|
||||
willAnswer(invocation -> {
|
||||
node.onMsg(ctxMock, invocation.getArgument(0));
|
||||
return null;
|
||||
}).given(ctxMock).tellSelf(any(TbMsg.class), any(Long.class));
|
||||
|
||||
node.onMsg(ctxMock, msg);
|
||||
|
||||
then(ctxMock).should().tellSelf(tickMsg, expectedDelay);
|
||||
then(ctxMock).should().ack(msg);
|
||||
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
|
||||
then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS));
|
||||
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("id", "ts").isEqualTo(msg);
|
||||
}
|
||||
|
||||
private static Stream<Arguments> givenPeriodValueAndPeriodTimeUnitPatterns_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext() {
|
||||
return Stream.of(
|
||||
Arguments.of("1", "HOURS", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT, TimeUnit.HOURS.toMillis(1L)),
|
||||
Arguments.of("${md-period}", "${md-time-unit}",
|
||||
new TbMsgMetaData(Map.of(
|
||||
"md-period", "5",
|
||||
"md-time-unit", "MINUTES"
|
||||
)), TbMsg.EMPTY_JSON_OBJECT, TimeUnit.MINUTES.toMillis(5L)),
|
||||
Arguments.of("$[msg-period]", "$[msg-time-unit]", TbMsgMetaData.EMPTY,
|
||||
"{\"msg-period\":10,\"msg-time-unit\":\"SECONDS\"}", TimeUnit.SECONDS.toMillis(10L))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(TimeUnit.class)
|
||||
public void givenTimeUnit_whenOnMsg_thenVerify(TimeUnit timeUnit) throws TbNodeException {
|
||||
config.setTimeUnit(timeUnit.name());
|
||||
given(ctxMock.getSelf()).willReturn(ruleNodeMock);
|
||||
|
||||
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
|
||||
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||
if (supportedTimeUnits.contains(timeUnit)) {
|
||||
assertThatNoException().isThrownBy(() -> node.onMsg(ctxMock, msg));
|
||||
} else {
|
||||
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
|
||||
.isInstanceOf(RuntimeException.class)
|
||||
.hasMessage("Time unit '" + timeUnit + "' is not supported! Only " + supportedTimeUnitsStr + " are supported.");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenPeriodIsUnparsable_whenOnMsg_thenThrowsException() throws TbNodeException {
|
||||
config.setPeriod("five");
|
||||
given(ctxMock.getSelf()).willReturn(ruleNodeMock);
|
||||
|
||||
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
|
||||
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
|
||||
.isInstanceOf(NumberFormatException.class)
|
||||
.hasMessage("Can't parse period value : five");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenInvalidTimeUnit_whenOnMsg_thenThrowsException() throws TbNodeException {
|
||||
config.setTimeUnit("sec");
|
||||
given(ctxMock.getSelf()).willReturn(ruleNodeMock);
|
||||
|
||||
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
|
||||
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
|
||||
.isInstanceOf(IllegalArgumentException.class)
|
||||
.hasMessage("Invalid value for period time unit : sec");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMaxLimitOfPendingMsgsReached_whenOnMsg_thenTellFailure() throws TbNodeException {
|
||||
config.setMaxPendingMsgs(1);
|
||||
given(ctxMock.getSelf()).willReturn(ruleNodeMock);
|
||||
|
||||
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
|
||||
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
node.onMsg(ctxMock, msg);
|
||||
}
|
||||
|
||||
ArgumentCaptor<Throwable> throwable = ArgumentCaptor.forClass(Throwable.class);
|
||||
then(ctxMock).should().tellFailure(eq(msg), throwable.capture());
|
||||
assertThat(throwable.getValue()).isInstanceOf(RuntimeException.class).hasMessage("Max limit of pending messages reached!");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void verifyDestroyMethod() {
|
||||
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||
var pendingMsgs = new ConcurrentHashMap<>();
|
||||
pendingMsgs.put(UUID.fromString("321f0301-9bed-4e7d-b92f-a978f53ec5d6"), msg);
|
||||
ReflectionTestUtils.setField(node, "pendingMsgs", pendingMsgs);
|
||||
var actualPendingMsgs = (Map<UUID, TbMsg>) ReflectionTestUtils.getField(node, "pendingMsgs");
|
||||
assertThat(actualPendingMsgs).isEqualTo(pendingMsgs);
|
||||
|
||||
node.destroy();
|
||||
|
||||
assertThat(actualPendingMsgs).isEmpty();
|
||||
}
|
||||
|
||||
private void verifyValidationExceptionOnInit() {
|
||||
RuleNode ruleNode = new RuleNode();
|
||||
ruleNode.setName("test");
|
||||
given(ctxMock.getSelf()).willReturn(ruleNode);
|
||||
String errorPrefix = "'test' node configuration is invalid: ";
|
||||
assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
|
||||
.isInstanceOf(TbNodeException.class)
|
||||
.hasMessageContaining(errorPrefix)
|
||||
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
|
||||
.isEqualTo(true);
|
||||
}
|
||||
|
||||
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
|
||||
return Stream.of(
|
||||
// config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPatterns does not exist and periodInSeconds exists)
|
||||
Arguments.of(0,
|
||||
"""
|
||||
{
|
||||
"periodInSeconds": 13,
|
||||
"maxPendingMsgs": 1000,
|
||||
"periodInSecondsPattern": "17"
|
||||
}
|
||||
""",
|
||||
true,
|
||||
"""
|
||||
{
|
||||
"period": "13",
|
||||
"timeUnit": "SECONDS",
|
||||
"maxPendingMsgs": 1000
|
||||
}
|
||||
"""
|
||||
),
|
||||
// config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPatterns and periodInSeconds do not exist)
|
||||
Arguments.of(0,
|
||||
"""
|
||||
{
|
||||
"maxPendingMsgs": 1000,
|
||||
"periodInSecondsPattern": "17"
|
||||
}
|
||||
""",
|
||||
true,
|
||||
"""
|
||||
{
|
||||
"period": "60",
|
||||
"timeUnit": "SECONDS",
|
||||
"maxPendingMsgs": 1000
|
||||
}
|
||||
"""
|
||||
),
|
||||
// config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPatterns is false)
|
||||
Arguments.of(0,
|
||||
"""
|
||||
{
|
||||
"periodInSeconds": 60,
|
||||
"maxPendingMsgs": 1000,
|
||||
"periodInSecondsPattern": null,
|
||||
"useMetadataPeriodInSecondsPatterns": false
|
||||
}
|
||||
""",
|
||||
true,
|
||||
"""
|
||||
{
|
||||
"period": "60",
|
||||
"timeUnit": "SECONDS",
|
||||
"maxPendingMsgs": 1000
|
||||
}
|
||||
"""
|
||||
),
|
||||
// config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPattern is true)
|
||||
Arguments.of(0,
|
||||
"""
|
||||
{
|
||||
"periodInSeconds": 60,
|
||||
"maxPendingMsgs": 1000,
|
||||
"periodInSecondsPattern": "${period-pattern}",
|
||||
"useMetadataPeriodInSecondsPatterns": true
|
||||
}
|
||||
""",
|
||||
true,
|
||||
"""
|
||||
{
|
||||
"period": "${period-pattern}",
|
||||
"timeUnit": "SECONDS",
|
||||
"maxPendingMsgs": 1000
|
||||
}
|
||||
"""
|
||||
),
|
||||
// config for version 1 with upgrade from version 0 (hasChanges is false)
|
||||
Arguments.of(0,
|
||||
"""
|
||||
{
|
||||
"period": "${period-pattern}",
|
||||
"timeUnit": "SECONDS",
|
||||
"maxPendingMsgs": 1000
|
||||
}
|
||||
""",
|
||||
false,
|
||||
"""
|
||||
{
|
||||
"period": "${period-pattern}",
|
||||
"timeUnit": "SECONDS",
|
||||
"maxPendingMsgs": 1000
|
||||
}
|
||||
"""
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TbNode getTestNode() {
|
||||
return node;
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user