Merge pull request #10300 from irynamatveieva/fix-calculate-delta-rule-node

Added property to ignore delta in output messages if it is zero
This commit is contained in:
Viacheslav Klimov 2024-04-09 13:13:14 +03:00 committed by GitHub
commit 19c2c5e955
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 146 additions and 3 deletions

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -46,7 +47,9 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
@Slf4j @Slf4j
@RuleNode(type = ComponentType.ENRICHMENT, @RuleNode(type = ComponentType.ENRICHMENT,
name = "calculate delta", relationTypes = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE, TbNodeConnectionType.OTHER}, name = "calculate delta",
version = 1,
relationTypes = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE, TbNodeConnectionType.OTHER},
configClazz = CalculateDeltaNodeConfiguration.class, configClazz = CalculateDeltaNodeConfiguration.class,
nodeDescription = "Calculates delta and amount of time passed between previous timeseries key reading " + nodeDescription = "Calculates delta and amount of time passed between previous timeseries key reading " +
"and current value for this key from the incoming message", "and current value for this key from the incoming message",
@ -101,6 +104,11 @@ public class CalculateDeltaNode implements TbNode {
return; return;
} }
if (config.isExcludeZeroDeltas() && delta.doubleValue() == 0) {
ctx.tellSuccess(msg);
return;
}
if (config.getRound() != null) { if (config.getRound() != null) {
delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP); delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP);
} }
@ -128,6 +136,23 @@ public class CalculateDeltaNode implements TbNode {
} }
} }
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
String excludeZeroDeltas = "excludeZeroDeltas";
if (!oldConfiguration.has(excludeZeroDeltas)) {
hasChanges = true;
((ObjectNode) oldConfiguration).put(excludeZeroDeltas, false);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
private ListenableFuture<ValueWithTs> fetchLatestValueAsync(EntityId entityId) { private ListenableFuture<ValueWithTs> fetchLatestValueAsync(EntityId entityId) {
return Futures.transform(timeseriesService.findLatest(ctx.getTenantId(), entityId, Collections.singletonList(config.getInputValueKey())), return Futures.transform(timeseriesService.findLatest(ctx.getTenantId(), entityId, Collections.singletonList(config.getInputValueKey())),
list -> extractValue(list.get(0)) list -> extractValue(list.get(0))

View File

@ -30,6 +30,7 @@ public class CalculateDeltaNodeConfiguration implements NodeConfiguration<Calcul
private String periodValueKey; private String periodValueKey;
private Integer round; private Integer round;
private boolean tellFailureIfDeltaIsNegative; private boolean tellFailureIfDeltaIsNegative;
private boolean excludeZeroDeltas;
@Override @Override
public CalculateDeltaNodeConfiguration defaultConfiguration() { public CalculateDeltaNodeConfiguration defaultConfiguration() {
@ -40,6 +41,7 @@ public class CalculateDeltaNodeConfiguration implements NodeConfiguration<Calcul
configuration.setAddPeriodBetweenMsgs(false); configuration.setAddPeriodBetweenMsgs(false);
configuration.setPeriodValueKey("periodInMs"); configuration.setPeriodValueKey("periodInMs");
configuration.setTellFailureIfDeltaIsNegative(true); configuration.setTellFailureIfDeltaIsNegative(true);
configuration.setExcludeZeroDeltas(false);
return configuration; return configuration;
} }

View File

@ -16,19 +16,26 @@
package org.thingsboard.rule.engine.metadata; package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import lombok.Data;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.TestDbCallbackExecutor; import org.thingsboard.rule.engine.TestDbCallbackExecutor;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
@ -48,7 +55,10 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -63,10 +73,11 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class CalculateDeltaNodeTest { public class CalculateDeltaNodeTest extends AbstractRuleNodeUpgradeTest {
private static final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.randomUUID()); private static final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.randomUUID());
private static final TenantId TENANT_ID = new TenantId(UUID.randomUUID()); private static final TenantId TENANT_ID = new TenantId(UUID.randomUUID());
@ -75,13 +86,13 @@ public class CalculateDeltaNodeTest {
private TbContext ctxMock; private TbContext ctxMock;
@Mock @Mock
private TimeseriesService timeseriesServiceMock; private TimeseriesService timeseriesServiceMock;
@Spy
private CalculateDeltaNode node; private CalculateDeltaNode node;
private CalculateDeltaNodeConfiguration config; private CalculateDeltaNodeConfiguration config;
private TbNodeConfiguration nodeConfiguration; private TbNodeConfiguration nodeConfiguration;
@BeforeEach @BeforeEach
public void setUp() throws TbNodeException { public void setUp() throws TbNodeException {
node = new CalculateDeltaNode();
config = new CalculateDeltaNodeConfiguration().defaultConfiguration(); config = new CalculateDeltaNodeConfiguration().defaultConfiguration();
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
when(ctxMock.getTimeseriesService()).thenReturn(timeseriesServiceMock); when(ctxMock.getTimeseriesService()).thenReturn(timeseriesServiceMock);
@ -424,6 +435,91 @@ public class CalculateDeltaNodeTest {
.hasMessage("Calculation failed. JSON values are not supported!"); .hasMessage("Calculation failed. JSON values are not supported!");
} }
@ParameterizedTest
@MethodSource("CalculateDeltaTestConfig")
public void givenCalculateDeltaConfig_whenOnMsg_thenVerify(CalculateDeltaTestConfig testConfig) throws TbNodeException {
// GIVEN
config.setTellFailureIfDeltaIsNegative(testConfig.isTellFailureIfDeltaIsNegative());
config.setExcludeZeroDeltas(testConfig.isExcludeZeroDeltas());
config.setInputValueKey("temperature");
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, nodeConfiguration);
mockFindLatest(new BasicTsKvEntry(1L, new DoubleDataEntry("temperature", testConfig.getPrevValue())));
var msgData = "{\"temperature\":" + testConfig.getCurrentValue() + ",\"airPressure\":123}";
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, msgData);
// WHEN
node.onMsg(ctxMock, msg);
// THEN
testConfig.getVerificationMethod().accept(ctxMock, msg);
}
private static Stream<CalculateDeltaTestConfig> CalculateDeltaTestConfig() {
return Stream.of(
// delta = 0, tell failure if delta is negative is set to true and exclude zero deltas is set to true so delta should filter out the message.
new CalculateDeltaTestConfig(true, true, 40, 40, (ctx, msg) -> {
verify(ctx).tellSuccess(eq(msg));
verify(ctx).getDbCallbackExecutor();
verifyNoMoreInteractions(ctx);
}),
// delta < 0, tell failure if delta is negative is set to true so it should throw exception.
new CalculateDeltaTestConfig(true, true, 41, 40, (ctx, msg) -> {
var errorCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(ctx).tellFailure(eq(msg), errorCaptor.capture());
verify(ctx).getDbCallbackExecutor();
verifyNoMoreInteractions(ctx);
assertThat(errorCaptor.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage("Delta value is negative!");
}),
// delta < 0, exclude zero deltas is set to true so it should return message with delta if delta is negative is set to false.
new CalculateDeltaTestConfig(false, true, 41, 40, (ctx, msg) -> {
var actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellSuccess(actualMsgCaptor.capture());
verify(ctx).getDbCallbackExecutor();
verifyNoMoreInteractions(ctx);
String expectedMsgData = "{\"temperature\":40.0,\"airPressure\":123,\"delta\":-1}";
assertEquals(expectedMsgData, actualMsgCaptor.getValue().getData());
}),
// delta = 0, tell failure if delta is negative is set to false and exclude zero deltas is set to true so delta should filter out the message.
new CalculateDeltaTestConfig(false, true, 40, 40, (ctx, msg) -> {
verify(ctx).tellSuccess(eq(msg));
verify(ctx).getDbCallbackExecutor();
verifyNoMoreInteractions(ctx);
}),
// delta > 0, exclude zero deltas is set to true so it should return message with delta.
new CalculateDeltaTestConfig(false, true, 39, 40, (ctx, msg) -> {
var actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellSuccess(actualMsgCaptor.capture());
verify(ctx).getDbCallbackExecutor();
verifyNoMoreInteractions(ctx);
String expectedMsgData = "{\"temperature\":40.0,\"airPressure\":123,\"delta\":1}";
assertEquals(expectedMsgData, actualMsgCaptor.getValue().getData());
}),
// delta > 0, exclude zero deltas is set to false so it should return message with delta.
new CalculateDeltaTestConfig(false, false, 39, 40, (ctx, msg) -> {
var actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx).tellSuccess(actualMsgCaptor.capture());
verify(ctx).getDbCallbackExecutor();
verifyNoMoreInteractions(ctx);
String expectedMsgData = "{\"temperature\":40.0,\"airPressure\":123,\"delta\":1}";
assertEquals(expectedMsgData, actualMsgCaptor.getValue().getData());
})
);
}
@Data
@RequiredArgsConstructor
private static class CalculateDeltaTestConfig {
private final boolean tellFailureIfDeltaIsNegative;
private final boolean excludeZeroDeltas;
private final double prevValue;
private final double currentValue;
private final BiConsumer<TbContext, TbMsg> verificationMethod;
}
private void mockFindLatest(TsKvEntry tsKvEntry) { private void mockFindLatest(TsKvEntry tsKvEntry) {
when(ctxMock.getTenantId()).thenReturn(TENANT_ID); when(ctxMock.getTenantId()).thenReturn(TENANT_ID);
when(timeseriesServiceMock.findLatestSync( when(timeseriesServiceMock.findLatestSync(
@ -457,4 +553,24 @@ public class CalculateDeltaNodeTest {
} }
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// default config for version 0
Arguments.of(0,
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true}",
true,
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"excludeZeroDeltas\":false}"),
// default config for version 1 with upgrade from version 0
Arguments.of(1,
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"excludeZeroDeltas\":false}",
false,
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"excludeZeroDeltas\":false}")
);
}
@Override
protected TbNode getTestNode() {
return node;
}
} }