added ability to set time unit using pattern

This commit is contained in:
IrynaMatveieva 2024-07-03 16:34:27 +03:00
parent 6afb3f25aa
commit 1e703f0264
3 changed files with 221 additions and 97 deletions

View File

@ -15,8 +15,9 @@
*/ */
package org.thingsboard.rule.engine.delay; package org.thingsboard.rule.engine.delay;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNode;
@ -26,10 +27,12 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -38,6 +41,7 @@ import java.util.concurrent.TimeUnit;
@RuleNode( @RuleNode(
type = ComponentType.ACTION, type = ComponentType.ACTION,
name = "delay (deprecated)", name = "delay (deprecated)",
version = 1,
configClazz = TbMsgDelayNodeConfiguration.class, configClazz = TbMsgDelayNodeConfiguration.class,
nodeDescription = "Delays incoming message (deprecated)", nodeDescription = "Delays incoming message (deprecated)",
nodeDetails = "Delays messages for a configurable period. " + nodeDetails = "Delays messages for a configurable period. " +
@ -45,7 +49,7 @@ import java.util.concurrent.TimeUnit;
"Deprecated because the acknowledged message still stays in memory (to be delayed) and this " + "Deprecated because the acknowledged message still stays in memory (to be delayed) and this " +
"does not guarantee that message will be processed even if the \"retry failures and timeouts\" processing strategy will be chosen.", "does not guarantee that message will be processed even if the \"retry failures and timeouts\" processing strategy will be chosen.",
icon = "pause", icon = "pause",
uiResources = {"static/rulenode/rulenode-core-config.js"}, uiResources = {""},
configDirective = "tbActionNodeMsgDelayConfig" configDirective = "tbActionNodeMsgDelayConfig"
) )
public class TbMsgDelayNode implements TbNode { public class TbMsgDelayNode implements TbNode {
@ -89,25 +93,58 @@ public class TbMsgDelayNode implements TbNode {
} }
private long getDelay(TbMsg msg) { private long getDelay(TbMsg msg) {
int periodInSeconds; String timeUnitPattern = TbNodeUtils.processPattern(config.getTimeUnit(), msg);
if (config.isUseMetadataPeriodInSecondsPatterns()) { String periodPattern = TbNodeUtils.processPattern(config.getPeriod(), msg);
if (isParsable(msg, config.getPeriodInSecondsPattern())) { try {
periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg)); TimeUnit timeUnit = TimeUnit.valueOf(timeUnitPattern.toUpperCase());
} else { int period = Integer.parseInt(periodPattern);
throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern()); return timeUnit.toMillis(period);
} } catch (NumberFormatException e) {
} else { throw new RuntimeException("Can't parse period value : " + periodPattern);
periodInSeconds = config.getPeriodInSeconds(); } catch (IllegalArgumentException e) {
throw new RuntimeException("Invalid value for period time unit : " + timeUnitPattern);
} }
return TimeUnit.SECONDS.toMillis(periodInSeconds);
}
private boolean isParsable(TbMsg msg, String pattern) {
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg));
} }
@Override @Override
public void destroy() { public void destroy() {
pendingMsgs.clear(); pendingMsgs.clear();
} }
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
var periodInSeconds = "periodInSeconds";
var periodInSecondsPattern = "periodInSecondsPattern";
var useMetadataPeriodInSecondsPatterns = "useMetadataPeriodInSecondsPatterns";
var period = "period";
if (oldConfiguration.has(useMetadataPeriodInSecondsPatterns)) {
var isUsedPattern = oldConfiguration.get(useMetadataPeriodInSecondsPatterns).asBoolean();
if (isUsedPattern) {
if (!oldConfiguration.has(periodInSecondsPattern)) {
throw new TbNodeException("Property to update: '" + periodInSecondsPattern + "' does not exist in configuration.");
}
((ObjectNode) oldConfiguration).set(period, oldConfiguration.get(periodInSecondsPattern));
} else {
if (!oldConfiguration.has(periodInSeconds)) {
throw new TbNodeException("Property to update: '" + periodInSeconds + "' does not exist in configuration.");
}
((ObjectNode) oldConfiguration).set(period, oldConfiguration.get(periodInSeconds));
}
((ObjectNode) oldConfiguration).remove(List.of(periodInSeconds, periodInSecondsPattern, useMetadataPeriodInSecondsPatterns));
hasChanges = true;
}
var timeUnit = "timeUnit";
if (!oldConfiguration.has(timeUnit)) {
((ObjectNode) oldConfiguration).put(timeUnit, TimeUnit.SECONDS.name());
hasChanges = true;
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
} }

View File

@ -18,20 +18,21 @@ package org.thingsboard.rule.engine.delay;
import lombok.Data; import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration; import org.thingsboard.rule.engine.api.NodeConfiguration;
import java.util.concurrent.TimeUnit;
@Data @Data
public class TbMsgDelayNodeConfiguration implements NodeConfiguration<TbMsgDelayNodeConfiguration> { public class TbMsgDelayNodeConfiguration implements NodeConfiguration<TbMsgDelayNodeConfiguration> {
private int periodInSeconds; private String period;
private String timeUnit;
private int maxPendingMsgs; private int maxPendingMsgs;
private String periodInSecondsPattern;
private boolean useMetadataPeriodInSecondsPatterns;
@Override @Override
public TbMsgDelayNodeConfiguration defaultConfiguration() { public TbMsgDelayNodeConfiguration defaultConfiguration() {
TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration(); TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration();
configuration.setPeriodInSeconds(60); configuration.setPeriod("60");
configuration.setTimeUnit(TimeUnit.SECONDS.name());
configuration.setMaxPendingMsgs(1000); configuration.setMaxPendingMsgs(1000);
configuration.setUseMetadataPeriodInSecondsPatterns(false);
return configuration; return configuration;
} }
} }

View File

@ -26,7 +26,9 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
@ -36,9 +38,12 @@ import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -46,15 +51,15 @@ import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.never;
import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.spy;
import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.times; import static org.mockito.BDDMockito.times;
import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willAnswer;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class TbMsgDelayNodeTest { public class TbMsgDelayNodeTest extends AbstractRuleNodeUpgradeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("20107cf0-1c5e-4ac4-8131-7c466c955a7c")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("20107cf0-1c5e-4ac4-8131-7c466c955a7c"));
@ -72,10 +77,9 @@ public class TbMsgDelayNodeTest {
@Test @Test
public void verifyDefaultConfig() { public void verifyDefaultConfig() {
assertThat(config.getPeriodInSeconds()).isEqualTo(60); assertThat(config.getPeriod()).isEqualTo("60");
assertThat(config.getMaxPendingMsgs()).isEqualTo(1000); assertThat(config.getMaxPendingMsgs()).isEqualTo(1000);
assertThat(config.isUseMetadataPeriodInSecondsPatterns()).isFalse(); assertThat(config.getTimeUnit()).isEqualTo(TimeUnit.SECONDS.name());
assertThat(config.getPeriodInSecondsPattern()).isNull();
} }
@Test @Test
@ -85,57 +89,95 @@ public class TbMsgDelayNodeTest {
@ParameterizedTest @ParameterizedTest
@MethodSource @MethodSource
public void givenPeriodInSecondsPattern_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext( public void givenPeriodValueAndPeriodTimeUnitPatterns_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext(
String periodInSecondsPattern, TbMsgMetaData metaData, String data, long expectedDelay) throws TbNodeException { String periodPattern, String timeUnitPattern, TbMsgMetaData metaData, String data, long expectedDelay) throws TbNodeException {
config.setUseMetadataPeriodInSecondsPatterns(true); config.setPeriod(periodPattern);
config.setPeriodInSecondsPattern(periodInSecondsPattern); config.setTimeUnit(timeUnitPattern);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); var ruleNodeId = new RuleNodeId(UUID.fromString("e8172ef8-bf91-4821-b9f5-ccd7b865e418"));
RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("5236e9b9-1e29-4b95-b219-7043ff8f0414"));
TbMsg tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString());
given(ctxMock.getSelfId()).willReturn(ruleNodeId); given(ctxMock.getSelfId()).willReturn(ruleNodeId);
given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg);
willAnswer(invocation -> { willAnswer(invocation -> {
node.onMsg(ctxMock, invocation.getArgument(0)); node.onMsg(ctxMock, invocation.getArgument(0));
return null; return null;
}).given(ctxMock).tellSelf(tickMsg, expectedDelay); }).given(ctxMock).tellSelf(any(TbMsg.class), any(Long.class));
node.onMsg(ctxMock, msg); List<TbMsg> incomingMsgs = new ArrayList<>();
List<TbMsg> tickMsgs = new ArrayList<>();
for (int i = 0; i < 9; i++) {
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data);
incomingMsgs.add(msg);
var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString());
tickMsgs.add(tickMsg);
given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), eq(msg.getId().toString()))).willReturn(tickMsg);
}
then(ctxMock).should().newMsg(null, TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, null, TbMsgMetaData.EMPTY, msg.getId().toString()); incomingMsgs.forEach(msg -> node.onMsg(ctxMock, msg));
then(ctxMock).should().tellSelf(tickMsg, expectedDelay);
then(ctxMock).should().ack(msg); incomingMsgs.forEach(incomingMsg -> {
then(node).should().onMsg(ctxMock, tickMsg); then(ctxMock).should().newMsg(incomingMsg.getQueueName(), TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, incomingMsg.getCustomerId(), TbMsgMetaData.EMPTY, incomingMsg.getId().toString());
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class); then(ctxMock).should().ack(incomingMsg);
then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS)); });
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("id", "ts").isEqualTo(msg); tickMsgs.forEach(tickMsg -> {
then(ctxMock).should().tellSelf(tickMsg, expectedDelay);
then(node).should().onMsg(ctxMock, tickMsg);
});
var actualMsgsCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(ctxMock).should(times(9)).enqueueForTellNext(actualMsgsCaptor.capture(), eq(TbNodeConnectionType.SUCCESS));
var actualMsgs = actualMsgsCaptor.getAllValues();
for (int i = 0; i < 9; i++) {
var actualMsg = actualMsgs.get(i);
then(ctxMock).should().enqueueForTellNext(actualMsg, TbNodeConnectionType.SUCCESS);
assertThat(actualMsg).usingRecursiveComparison().ignoringFields("id", "ts").isEqualTo(incomingMsgs.get(i));
}
} }
private static Stream<Arguments> givenPeriodInSecondsPattern_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext() { private static Stream<Arguments> givenPeriodValueAndPeriodTimeUnitPatterns_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext() {
return Stream.of( return Stream.of(
Arguments.of("${md-period-in-seconds}", new TbMsgMetaData(Map.of("md-period-in-seconds", "10")), TbMsg.EMPTY_JSON_OBJECT, 10000L), Arguments.of("1", "HOURS", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT, TimeUnit.HOURS.toMillis(1L)),
Arguments.of("$[msg-period-in-seconds]", TbMsgMetaData.EMPTY, "{\"msg-period-in-seconds\":5}", 5000L) Arguments.of("${md-period}", "${md-time-unit}",
new TbMsgMetaData(Map.of(
"md-period", "5",
"md-time-unit", "MINUTES"
)), TbMsg.EMPTY_JSON_OBJECT, TimeUnit.MINUTES.toMillis(5L)),
Arguments.of("$[msg-period]", "$[msg-time-unit]", TbMsgMetaData.EMPTY,
"{\"msg-period\":10,\"msg-time-unit\":\"SECONDS\"}", TimeUnit.SECONDS.toMillis(10L))
); );
} }
@Test @Test
public void givenPeriodInSecondsPatternIsUnparsable_whenOnMsg_thenThrowsException() throws TbNodeException { public void givenPeriodIsUnparsable_whenOnMsg_thenThrowsException() throws TbNodeException {
config.setUseMetadataPeriodInSecondsPatterns(true); config.setPeriod("five");
config.setPeriodInSecondsPattern("$[msg-period-in-seconds]");
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"msg-period-in-seconds\":\"five\"}"); var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("5236e9b9-1e29-4b95-b219-7043ff8f0414")); var ruleNodeId = new RuleNodeId(UUID.fromString("5236e9b9-1e29-4b95-b219-7043ff8f0414"));
TbMsg tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString()); var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString());
given(ctxMock.getSelfId()).willReturn(ruleNodeId); given(ctxMock.getSelfId()).willReturn(ruleNodeId);
given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg); given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg);
assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
.isInstanceOf(RuntimeException.class) .isInstanceOf(RuntimeException.class)
.hasMessage("Can't parse period in seconds from metadata using pattern: $[msg-period-in-seconds]"); .hasMessage("Can't parse period value : five");
}
@Test
public void givenInvalidTimeUnit_whenOnMsg_thenThrowsException() throws TbNodeException {
config.setTimeUnit("sec");
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
var ruleNodeId = new RuleNodeId(UUID.fromString("0210d69a-247f-4488-a242-dd3244b55088"));
var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString());
given(ctxMock.getSelfId()).willReturn(ruleNodeId);
given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg);
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
.isInstanceOf(RuntimeException.class)
.hasMessage("Invalid value for period time unit : sec");
} }
@Test @Test
@ -146,60 +188,38 @@ public class TbMsgDelayNodeTest {
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("d1440f09-ca81-41f3-b67e-1495aee87dc6")); RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("d1440f09-ca81-41f3-b67e-1495aee87dc6"));
String msgId = "e38c87c5-8916-4bb0-b448-b8d08ad639df";
TbMsg tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msgId);
given(ctxMock.getSelfId()).willReturn(ruleNodeId); given(ctxMock.getSelfId()).willReturn(ruleNodeId);
given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg);
List<TbMsg> incomingMsgs = new ArrayList<>();
List<TbMsg> tickMsgs = new ArrayList<>();
for (int i = 0; i < 6; i++) { for (int i = 0; i < 6; i++) {
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "msg : " + (i + 1)); var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg); incomingMsgs.add(msg);
var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString());
tickMsgs.add(tickMsg);
}
for (int i = 0; i < maxPendingMsgs; i++) {
given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), eq(incomingMsgs.get(i).getId().toString()))).willReturn(tickMsgs.get(i));
} }
then(ctxMock).should(times(maxPendingMsgs)).newMsg(isNull(), eq(TbMsgType.DELAY_TIMEOUT_SELF_MSG), eq(ruleNodeId), isNull(), eq(TbMsgMetaData.EMPTY), any()); incomingMsgs.forEach(msg -> node.onMsg(ctxMock, msg));
then(ctxMock).should(times(maxPendingMsgs)).tellSelf(tickMsg, 60000L);
then(ctxMock).should(times(maxPendingMsgs)).ack(any()); var lastMsg = incomingMsgs.remove(maxPendingMsgs);
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class); incomingMsgs.forEach(incomingMsg -> {
then(ctxMock).should().newMsg(incomingMsg.getQueueName(), TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, incomingMsg.getCustomerId(), TbMsgMetaData.EMPTY, incomingMsg.getId().toString());
then(ctxMock).should().ack(incomingMsg);
});
var tickForLastMsg = tickMsgs.remove(maxPendingMsgs);
tickMsgs.forEach(tickMsg -> then(ctxMock).should().tellSelf(tickMsg, TimeUnit.SECONDS.toMillis(60L)));
then(ctxMock).should(never()).tellSelf(eq(tickForLastMsg), any(Long.class));
ArgumentCaptor<Throwable> throwable = ArgumentCaptor.forClass(Throwable.class); ArgumentCaptor<Throwable> throwable = ArgumentCaptor.forClass(Throwable.class);
then(ctxMock).should().tellFailure(actualMsg.capture(), throwable.capture()); then(ctxMock).should().tellFailure(eq(lastMsg), throwable.capture());
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "msg : 6");
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("id", "ts").isEqualTo(msg);
assertThat(throwable.getValue()).isInstanceOf(RuntimeException.class).hasMessage("Max limit of pending messages reached!"); assertThat(throwable.getValue()).isInstanceOf(RuntimeException.class).hasMessage("Max limit of pending messages reached!");
} }
@Test
public void givenNumberOfMsgsMoreThenMaxPendingMsgs_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext() throws TbNodeException {
config.setMaxPendingMsgs(3);
config.setPeriodInSeconds(1);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
RuleNodeId ruleNodeId = new RuleNodeId(UUID.fromString("e8172ef8-bf91-4821-b9f5-ccd7b865e418"));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
TbMsg tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, TbMsgMetaData.EMPTY, msg.getId().toString());
given(ctxMock.getSelfId()).willReturn(ruleNodeId);
given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg);
willAnswer(invocation -> {
node.onMsg(ctxMock, invocation.getArgument(0));
return null;
}).given(ctxMock).tellSelf(tickMsg, 1000L);
for (int i = 0; i < 9; i++) {
node.onMsg(ctxMock, msg);
}
then(ctxMock).should(times(9)).newMsg(null, TbMsgType.DELAY_TIMEOUT_SELF_MSG, ruleNodeId, null, TbMsgMetaData.EMPTY, msg.getId().toString());
then(ctxMock).should(times(9)).tellSelf(tickMsg, 1000L);
then(ctxMock).should(times(9)).ack(msg);
then(node).should(times(9)).onMsg(ctxMock, tickMsg);
then(ctxMock).should(times(9)).enqueueForTellNext(any(TbMsg.class), eq(TbNodeConnectionType.SUCCESS));
}
@Test @Test
public void verifyDestroyMethod() { public void verifyDestroyMethod() {
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
var pendingMsgs = new HashMap<UUID, TbMsg>(); var pendingMsgs = new HashMap<UUID, TbMsg>();
pendingMsgs.put(UUID.fromString("321f0301-9bed-4e7d-b92f-a978f53ec5d6"), msg); pendingMsgs.put(UUID.fromString("321f0301-9bed-4e7d-b92f-a978f53ec5d6"), msg);
ReflectionTestUtils.setField(node, "pendingMsgs", pendingMsgs); ReflectionTestUtils.setField(node, "pendingMsgs", pendingMsgs);
@ -210,4 +230,70 @@ public class TbMsgDelayNodeTest {
assertThat(actualPendingMsgs).isEmpty(); assertThat(actualPendingMsgs).isEmpty();
} }
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// config for version 1 with upgrade from version 0
Arguments.of(0,
"""
{
"periodInSeconds": 60,
"maxPendingMsgs": 1000,
"periodInSecondsPattern": null,
"useMetadataPeriodInSecondsPatterns": false
}
""",
true,
"""
{
"period": 60,
"timeUnit": "SECONDS",
"maxPendingMsgs": 1000
}
"""
),
// config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPattern is true)
Arguments.of(0,
"""
{
"periodInSeconds": 60,
"maxPendingMsgs": 1000,
"periodInSecondsPattern": "${period-pattern}",
"useMetadataPeriodInSecondsPatterns": true
}
""",
true,
"""
{
"period": "${period-pattern}",
"timeUnit": "SECONDS",
"maxPendingMsgs": 1000
}
"""
),
// config for version 1 with upgrade from version 0 (hasChanges is false)
Arguments.of(0,
"""
{
"period": "${period-pattern}",
"timeUnit": "SECONDS",
"maxPendingMsgs": 1000
}
""",
false,
"""
{
"period": "${period-pattern}",
"timeUnit": "SECONDS",
"maxPendingMsgs": 1000
}
"""
)
);
}
@Override
protected TbNode getTestNode() {
return node;
}
} }