diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index b60a46ee0a..1ade5efb82 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -15,8 +15,9 @@ */ package org.thingsboard.rule.engine.delay; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.math.NumberUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -26,10 +27,12 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -38,6 +41,7 @@ import java.util.concurrent.TimeUnit; @RuleNode( type = ComponentType.ACTION, name = "delay (deprecated)", + version = 1, configClazz = TbMsgDelayNodeConfiguration.class, nodeDescription = "Delays incoming message (deprecated)", nodeDetails = "Delays messages for a configurable period. " + @@ -45,7 +49,7 @@ import java.util.concurrent.TimeUnit; "Deprecated because the acknowledged message still stays in memory (to be delayed) and this " + "does not guarantee that message will be processed even if the \"retry failures and timeouts\" processing strategy will be chosen.", icon = "pause", - uiResources = {"static/rulenode/rulenode-core-config.js"}, + uiResources = {""}, configDirective = "tbActionNodeMsgDelayConfig" ) public class TbMsgDelayNode implements TbNode { @@ -89,25 +93,58 @@ public class TbMsgDelayNode implements TbNode { } private long getDelay(TbMsg msg) { - int periodInSeconds; - if (config.isUseMetadataPeriodInSecondsPatterns()) { - if (isParsable(msg, config.getPeriodInSecondsPattern())) { - periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg)); - } else { - throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern()); - } - } else { - periodInSeconds = config.getPeriodInSeconds(); + String timeUnitPattern = TbNodeUtils.processPattern(config.getTimeUnit(), msg); + String periodPattern = TbNodeUtils.processPattern(config.getPeriod(), msg); + try { + TimeUnit timeUnit = TimeUnit.valueOf(timeUnitPattern.toUpperCase()); + int period = Integer.parseInt(periodPattern); + return timeUnit.toMillis(period); + } catch (NumberFormatException e) { + throw new RuntimeException("Can't parse period value : " + periodPattern); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Invalid value for period time unit : " + timeUnitPattern); } - return TimeUnit.SECONDS.toMillis(periodInSeconds); - } - - private boolean isParsable(TbMsg msg, String pattern) { - return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg)); } @Override public void destroy() { pendingMsgs.clear(); } + + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + var periodInSeconds = "periodInSeconds"; + var periodInSecondsPattern = "periodInSecondsPattern"; + var useMetadataPeriodInSecondsPatterns = "useMetadataPeriodInSecondsPatterns"; + var period = "period"; + if (oldConfiguration.has(useMetadataPeriodInSecondsPatterns)) { + var isUsedPattern = oldConfiguration.get(useMetadataPeriodInSecondsPatterns).asBoolean(); + if (isUsedPattern) { + if (!oldConfiguration.has(periodInSecondsPattern)) { + throw new TbNodeException("Property to update: '" + periodInSecondsPattern + "' does not exist in configuration."); + } + ((ObjectNode) oldConfiguration).set(period, oldConfiguration.get(periodInSecondsPattern)); + } else { + if (!oldConfiguration.has(periodInSeconds)) { + throw new TbNodeException("Property to update: '" + periodInSeconds + "' does not exist in configuration."); + } + ((ObjectNode) oldConfiguration).set(period, oldConfiguration.get(periodInSeconds)); + } + ((ObjectNode) oldConfiguration).remove(List.of(periodInSeconds, periodInSecondsPattern, useMetadataPeriodInSecondsPatterns)); + hasChanges = true; + } + var timeUnit = "timeUnit"; + if (!oldConfiguration.has(timeUnit)) { + ((ObjectNode) oldConfiguration).put(timeUnit, TimeUnit.SECONDS.name()); + hasChanges = true; + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java index f35552de18..b1ec01bb10 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java @@ -18,20 +18,21 @@ package org.thingsboard.rule.engine.delay; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; +import java.util.concurrent.TimeUnit; + @Data public class TbMsgDelayNodeConfiguration implements NodeConfiguration { - private int periodInSeconds; + private String period; + private String timeUnit; private int maxPendingMsgs; - private String periodInSecondsPattern; - private boolean useMetadataPeriodInSecondsPatterns; @Override public TbMsgDelayNodeConfiguration defaultConfiguration() { TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration(); - configuration.setPeriodInSeconds(60); + configuration.setPeriod("60"); + configuration.setTimeUnit(TimeUnit.SECONDS.name()); configuration.setMaxPendingMsgs(1000); - configuration.setUseMetadataPeriodInSecondsPatterns(false); return configuration; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java index 18d3ffc2c5..751faee18b 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java @@ -26,7 +26,9 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.id.DeviceId; @@ -36,9 +38,12 @@ import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -46,15 +51,15 @@ import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.never; import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.times; import static org.mockito.BDDMockito.willAnswer; @ExtendWith(MockitoExtension.class) -public class TbMsgDelayNodeTest { +public class TbMsgDelayNodeTest extends AbstractRuleNodeUpgradeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("20107cf0-1c5e-4ac4-8131-7c466c955a7c")); @@ -72,10 +77,9 @@ public class TbMsgDelayNodeTest { @Test public void verifyDefaultConfig() { - assertThat(config.getPeriodInSeconds()).isEqualTo(60); + assertThat(config.getPeriod()).isEqualTo("60"); assertThat(config.getMaxPendingMsgs()).isEqualTo(1000); - assertThat(config.isUseMetadataPeriodInSecondsPatterns()).isFalse(); - assertThat(config.getPeriodInSecondsPattern()).isNull(); + assertThat(config.getTimeUnit()).isEqualTo(TimeUnit.SECONDS.name()); } @Test @@ -85,57 +89,95 @@ public class TbMsgDelayNodeTest { @ParameterizedTest @MethodSource - public void givenPeriodInSecondsPattern_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext( - String periodInSecondsPattern, TbMsgMetaData metaData, String data, long expectedDelay) throws TbNodeException { - config.setUseMetadataPeriodInSecondsPatterns(true); - config.setPeriodInSecondsPattern(periodInSecondsPattern); + public void givenPeriodValueAndPeriodTimeUnitPatterns_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext( + String periodPattern, String timeUnitPattern, TbMsgMetaData metaData, String data, long expectedDelay) throws TbNodeException { + config.setPeriod(periodPattern); + config.setTimeUnit(timeUnitPattern); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); - RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("5236e9b9-1e29-4b95-b219-7043ff8f0414")); - TbMsg tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString()); - + var ruleNodeId = new RuleNodeId(UUID.fromString("e8172ef8-bf91-4821-b9f5-ccd7b865e418")); given(ctxMock.getSelfId()).willReturn(ruleNodeId); - given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg); willAnswer(invocation -> { node.onMsg(ctxMock, invocation.getArgument(0)); return null; - }).given(ctxMock).tellSelf(tickMsg, expectedDelay); + }).given(ctxMock).tellSelf(any(TbMsg.class), any(Long.class)); - node.onMsg(ctxMock, msg); + List incomingMsgs = new ArrayList<>(); + List tickMsgs = new ArrayList<>(); + for (int i = 0; i < 9; i++) { + var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); + incomingMsgs.add(msg); + var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString()); + tickMsgs.add(tickMsg); + given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), eq(msg.getId().toString()))).willReturn(tickMsg); + } - then(ctxMock).should().newMsg(null, TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, null, TbMsgMetaData.EMPTY, msg.getId().toString()); - then(ctxMock).should().tellSelf(tickMsg, expectedDelay); - then(ctxMock).should().ack(msg); - then(node).should().onMsg(ctxMock, tickMsg); - ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); - then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS)); - assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("id", "ts").isEqualTo(msg); + incomingMsgs.forEach(msg -> node.onMsg(ctxMock, msg)); + + incomingMsgs.forEach(incomingMsg -> { + then(ctxMock).should().newMsg(incomingMsg.getQueueName(), TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, incomingMsg.getCustomerId(), TbMsgMetaData.EMPTY, incomingMsg.getId().toString()); + then(ctxMock).should().ack(incomingMsg); + }); + tickMsgs.forEach(tickMsg -> { + then(ctxMock).should().tellSelf(tickMsg, expectedDelay); + then(node).should().onMsg(ctxMock, tickMsg); + }); + var actualMsgsCaptor = ArgumentCaptor.forClass(TbMsg.class); + then(ctxMock).should(times(9)).enqueueForTellNext(actualMsgsCaptor.capture(), eq(TbNodeConnectionType.SUCCESS)); + var actualMsgs = actualMsgsCaptor.getAllValues(); + for (int i = 0; i < 9; i++) { + var actualMsg = actualMsgs.get(i); + then(ctxMock).should().enqueueForTellNext(actualMsg, TbNodeConnectionType.SUCCESS); + assertThat(actualMsg).usingRecursiveComparison().ignoringFields("id", "ts").isEqualTo(incomingMsgs.get(i)); + } } - private static Stream givenPeriodInSecondsPattern_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext() { + private static Stream givenPeriodValueAndPeriodTimeUnitPatterns_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext() { return Stream.of( - Arguments.of("${md-period-in-seconds}", new TbMsgMetaData(Map.of("md-period-in-seconds", "10")), TbMsg.EMPTY_JSON_OBJECT, 10000L), - Arguments.of("$[msg-period-in-seconds]", TbMsgMetaData.EMPTY, "{\"msg-period-in-seconds\":5}", 5000L) + Arguments.of("1", "HOURS", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT, TimeUnit.HOURS.toMillis(1L)), + Arguments.of("${md-period}", "${md-time-unit}", + new TbMsgMetaData(Map.of( + "md-period", "5", + "md-time-unit", "MINUTES" + )), TbMsg.EMPTY_JSON_OBJECT, TimeUnit.MINUTES.toMillis(5L)), + Arguments.of("$[msg-period]", "$[msg-time-unit]", TbMsgMetaData.EMPTY, + "{\"msg-period\":10,\"msg-time-unit\":\"SECONDS\"}", TimeUnit.SECONDS.toMillis(10L)) ); } @Test - public void givenPeriodInSecondsPatternIsUnparsable_whenOnMsg_thenThrowsException() throws TbNodeException { - config.setUseMetadataPeriodInSecondsPatterns(true); - config.setPeriodInSecondsPattern("$[msg-period-in-seconds]"); + public void givenPeriodIsUnparsable_whenOnMsg_thenThrowsException() throws TbNodeException { + config.setPeriod("five"); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"msg-period-in-seconds\":\"five\"}"); - RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("5236e9b9-1e29-4b95-b219-7043ff8f0414")); - TbMsg tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString()); + var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + var ruleNodeId = new RuleNodeId(UUID.fromString("5236e9b9-1e29-4b95-b219-7043ff8f0414")); + var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString()); given(ctxMock.getSelfId()).willReturn(ruleNodeId); given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg); assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) .isInstanceOf(RuntimeException.class) - .hasMessage("Can't parse period in seconds from metadata using pattern: $[msg-period-in-seconds]"); + .hasMessage("Can't parse period value : five"); + } + + @Test + public void givenInvalidTimeUnit_whenOnMsg_thenThrowsException() throws TbNodeException { + config.setTimeUnit("sec"); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + var ruleNodeId = new RuleNodeId(UUID.fromString("0210d69a-247f-4488-a242-dd3244b55088")); + var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString()); + + given(ctxMock.getSelfId()).willReturn(ruleNodeId); + given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg); + + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Invalid value for period time unit : sec"); } @Test @@ -146,60 +188,38 @@ public class TbMsgDelayNodeTest { node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("d1440f09-ca81-41f3-b67e-1495aee87dc6")); - String msgId = "e38c87c5-8916-4bb0-b448-b8d08ad639df"; - TbMsg tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msgId); - given(ctxMock.getSelfId()).willReturn(ruleNodeId); - given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg); + List incomingMsgs = new ArrayList<>(); + List tickMsgs = new ArrayList<>(); for (int i = 0; i < 6; i++) { - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "msg : " + (i + 1)); - node.onMsg(ctxMock, msg); + var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + incomingMsgs.add(msg); + var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString()); + tickMsgs.add(tickMsg); + } + for (int i = 0; i < maxPendingMsgs; i++) { + given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), eq(incomingMsgs.get(i).getId().toString()))).willReturn(tickMsgs.get(i)); } - then(ctxMock).should(times(maxPendingMsgs)).newMsg(isNull(), eq(TbMsgType.DELAY_TIMEOUT_SELF_MSG), eq(ruleNodeId), isNull(), eq(TbMsgMetaData.EMPTY), any()); - then(ctxMock).should(times(maxPendingMsgs)).tellSelf(tickMsg, 60000L); - then(ctxMock).should(times(maxPendingMsgs)).ack(any()); - ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + incomingMsgs.forEach(msg -> node.onMsg(ctxMock, msg)); + + var lastMsg = incomingMsgs.remove(maxPendingMsgs); + incomingMsgs.forEach(incomingMsg -> { + then(ctxMock).should().newMsg(incomingMsg.getQueueName(), TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, incomingMsg.getCustomerId(), TbMsgMetaData.EMPTY, incomingMsg.getId().toString()); + then(ctxMock).should().ack(incomingMsg); + }); + var tickForLastMsg = tickMsgs.remove(maxPendingMsgs); + tickMsgs.forEach(tickMsg -> then(ctxMock).should().tellSelf(tickMsg, TimeUnit.SECONDS.toMillis(60L))); + then(ctxMock).should(never()).tellSelf(eq(tickForLastMsg), any(Long.class)); ArgumentCaptor throwable = ArgumentCaptor.forClass(Throwable.class); - then(ctxMock).should().tellFailure(actualMsg.capture(), throwable.capture()); - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "msg : 6"); - assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("id", "ts").isEqualTo(msg); + then(ctxMock).should().tellFailure(eq(lastMsg), throwable.capture()); assertThat(throwable.getValue()).isInstanceOf(RuntimeException.class).hasMessage("Max limit of pending messages reached!"); } - @Test - public void givenNumberOfMsgsMoreThenMaxPendingMsgs_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext() throws TbNodeException { - config.setMaxPendingMsgs(3); - config.setPeriodInSeconds(1); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("e8172ef8-bf91-4821-b9f5-ccd7b865e418")); - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - TbMsg tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString()); - - given(ctxMock.getSelfId()).willReturn(ruleNodeId); - given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg); - willAnswer(invocation -> { - node.onMsg(ctxMock, invocation.getArgument(0)); - return null; - }).given(ctxMock).tellSelf(tickMsg, 1000L); - - for (int i = 0; i < 9; i++) { - node.onMsg(ctxMock, msg); - } - - then(ctxMock).should(times(9)).newMsg(null, TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, null, TbMsgMetaData.EMPTY, msg.getId().toString()); - then(ctxMock).should(times(9)).tellSelf(tickMsg, 1000L); - then(ctxMock).should(times(9)).ack(msg); - then(node).should(times(9)).onMsg(ctxMock, tickMsg); - then(ctxMock).should(times(9)).enqueueForTellNext(any(TbMsg.class), eq(TbNodeConnectionType.SUCCESS)); - } - @Test public void verifyDestroyMethod() { - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); var pendingMsgs = new HashMap(); pendingMsgs.put(UUID.fromString("321f0301-9bed-4e7d-b92f-a978f53ec5d6"), msg); ReflectionTestUtils.setField(node, "pendingMsgs", pendingMsgs); @@ -210,4 +230,70 @@ public class TbMsgDelayNodeTest { assertThat(actualPendingMsgs).isEmpty(); } + + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // config for version 1 with upgrade from version 0 + Arguments.of(0, + """ + { + "periodInSeconds": 60, + "maxPendingMsgs": 1000, + "periodInSecondsPattern": null, + "useMetadataPeriodInSecondsPatterns": false + } + """, + true, + """ + { + "period": 60, + "timeUnit": "SECONDS", + "maxPendingMsgs": 1000 + } + """ + ), + // config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPattern is true) + Arguments.of(0, + """ + { + "periodInSeconds": 60, + "maxPendingMsgs": 1000, + "periodInSecondsPattern": "${period-pattern}", + "useMetadataPeriodInSecondsPatterns": true + } + """, + true, + """ + { + "period": "${period-pattern}", + "timeUnit": "SECONDS", + "maxPendingMsgs": 1000 + } + """ + ), + // config for version 1 with upgrade from version 0 (hasChanges is false) + Arguments.of(0, + """ + { + "period": "${period-pattern}", + "timeUnit": "SECONDS", + "maxPendingMsgs": 1000 + } + """, + false, + """ + { + "period": "${period-pattern}", + "timeUnit": "SECONDS", + "maxPendingMsgs": 1000 + } + """ + ) + ); + } + + @Override + protected TbNode getTestNode() { + return node; + } }