changed the check to exclude only 0 deltas from the message
This commit is contained in:
parent
0d0ca90cf9
commit
96a71ad8dc
@ -99,13 +99,13 @@ public class CalculateDeltaNode implements TbNode {
|
|||||||
|
|
||||||
BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
|
BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
|
||||||
|
|
||||||
if (config.isOnlyComputeTrueDeltas() && delta.doubleValue() == 0) {
|
if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
|
||||||
ctx.tellSuccess(msg);
|
ctx.tellFailure(msg, new IllegalArgumentException("Delta value is negative!"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
|
if (config.isExcludeZeroDeltasFromOutboundMessage() && delta.doubleValue() == 0) {
|
||||||
ctx.tellFailure(msg, new IllegalArgumentException("Delta value is negative!"));
|
ctx.tellSuccess(msg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,10 +141,10 @@ public class CalculateDeltaNode implements TbNode {
|
|||||||
boolean hasChanges = false;
|
boolean hasChanges = false;
|
||||||
switch (fromVersion) {
|
switch (fromVersion) {
|
||||||
case 0:
|
case 0:
|
||||||
String onlyComputeTrueDeltas = "onlyComputeTrueDeltas";
|
String excludeZeroDeltasFromOutboundMessage = "excludeZeroDeltasFromOutboundMessage";
|
||||||
if (!oldConfiguration.has(onlyComputeTrueDeltas)) {
|
if (!oldConfiguration.has(excludeZeroDeltasFromOutboundMessage)) {
|
||||||
hasChanges = true;
|
hasChanges = true;
|
||||||
((ObjectNode) oldConfiguration).put(onlyComputeTrueDeltas, false);
|
((ObjectNode) oldConfiguration).put(excludeZeroDeltasFromOutboundMessage, false);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|||||||
@ -30,7 +30,7 @@ public class CalculateDeltaNodeConfiguration implements NodeConfiguration<Calcul
|
|||||||
private String periodValueKey;
|
private String periodValueKey;
|
||||||
private Integer round;
|
private Integer round;
|
||||||
private boolean tellFailureIfDeltaIsNegative;
|
private boolean tellFailureIfDeltaIsNegative;
|
||||||
private boolean onlyComputeTrueDeltas;
|
private boolean excludeZeroDeltasFromOutboundMessage;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CalculateDeltaNodeConfiguration defaultConfiguration() {
|
public CalculateDeltaNodeConfiguration defaultConfiguration() {
|
||||||
@ -41,7 +41,8 @@ public class CalculateDeltaNodeConfiguration implements NodeConfiguration<Calcul
|
|||||||
configuration.setAddPeriodBetweenMsgs(false);
|
configuration.setAddPeriodBetweenMsgs(false);
|
||||||
configuration.setPeriodValueKey("periodInMs");
|
configuration.setPeriodValueKey("periodInMs");
|
||||||
configuration.setTellFailureIfDeltaIsNegative(true);
|
configuration.setTellFailureIfDeltaIsNegative(true);
|
||||||
configuration.setOnlyComputeTrueDeltas(false);
|
configuration.setExcludeZeroDeltasFromOutboundMessage(false);
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,12 +16,15 @@
|
|||||||
package org.thingsboard.rule.engine.metadata;
|
package org.thingsboard.rule.engine.metadata;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import lombok.Data;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.assertj.core.api.Assertions;
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.Arguments;
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
@ -52,8 +55,10 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
@ -68,6 +73,7 @@ import static org.mockito.Mockito.never;
|
|||||||
import static org.mockito.Mockito.reset;
|
import static org.mockito.Mockito.reset;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
@ -429,17 +435,19 @@ public class CalculateDeltaNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
.hasMessage("Calculation failed. JSON values are not supported!");
|
.hasMessage("Calculation failed. JSON values are not supported!");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void givenDeltaValueIsZeroAndOnlyComputeTrueDeltasTrue_whenOnMsg_thenShouldReturnMsgWithoutDelta() throws TbNodeException {
|
@MethodSource("CalculateDeltaTestConfig")
|
||||||
|
public void givenCalculateDeltaConfig_whenOnMsg_thenVerify(CalculateDeltaTestConfig testConfig) throws TbNodeException {
|
||||||
// GIVEN
|
// GIVEN
|
||||||
config.setOnlyComputeTrueDeltas(true);
|
config.setTellFailureIfDeltaIsNegative(testConfig.isTellFailureIfDeltaIsNegative());
|
||||||
|
config.setExcludeZeroDeltasFromOutboundMessage(testConfig.isComputeOnlyTrueDeltas());
|
||||||
config.setInputValueKey("temperature");
|
config.setInputValueKey("temperature");
|
||||||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||||
node.init(ctxMock, nodeConfiguration);
|
node.init(ctxMock, nodeConfiguration);
|
||||||
|
|
||||||
mockFindLatest(new BasicTsKvEntry(1L, new DoubleDataEntry("temperature", 40.0)));
|
mockFindLatest(new BasicTsKvEntry(1L, new DoubleDataEntry("temperature", testConfig.getPrevValue())));
|
||||||
|
|
||||||
var msgData = "{\"temperature\":40,\"airPressure\":123}";
|
var msgData = "{\"temperature\":" + testConfig.getCurrentValue() + ",\"airPressure\":123}";
|
||||||
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, msgData);
|
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, msgData);
|
||||||
|
|
||||||
// WHEN
|
// WHEN
|
||||||
@ -447,11 +455,69 @@ public class CalculateDeltaNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
node.onMsg(ctxMock, msg);
|
node.onMsg(ctxMock, msg);
|
||||||
|
|
||||||
// THEN
|
// THEN
|
||||||
|
testConfig.getVerificationMethod().accept(ctxMock, msg);
|
||||||
|
}
|
||||||
|
|
||||||
verify(ctxMock).tellSuccess(eq(msg));
|
static Stream<CalculateDeltaTestConfig> CalculateDeltaTestConfig() {
|
||||||
verify(ctxMock, never()).tellNext(any(), anyString());
|
return Stream.of(
|
||||||
verify(ctxMock, never()).tellNext(any(), anySet());
|
// delta = 0, tell failure if delta is negative is set to true and exclude zero deltas from outbound message is set to true so delta should filter out the message.
|
||||||
verify(ctxMock, never()).tellFailure(any(), any());
|
new CalculateDeltaTestConfig(true, true, 40, 40, (ctx, msg) -> {
|
||||||
|
verify(ctx).tellSuccess(eq(msg));
|
||||||
|
verify(ctx).getDbCallbackExecutor();
|
||||||
|
verifyNoMoreInteractions(ctx);
|
||||||
|
}),
|
||||||
|
// delta < 0, tell failure if delta is negative is set to true so it should throw exception.
|
||||||
|
new CalculateDeltaTestConfig(true, true, 40, 41, (ctx, msg) -> {
|
||||||
|
var errorCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||||
|
verify(ctx).tellFailure(eq(msg), errorCaptor.capture());
|
||||||
|
verify(ctx).getDbCallbackExecutor();
|
||||||
|
verifyNoMoreInteractions(ctx);
|
||||||
|
assertThat(errorCaptor.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage("Delta value is negative!");
|
||||||
|
}),
|
||||||
|
// delta < 0, exclude zero deltas from outbound message is set to true so it should return message with delta if delta is negative is set to false.
|
||||||
|
new CalculateDeltaTestConfig(false, true, 40, 41, (ctx, msg) -> {
|
||||||
|
var actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||||
|
verify(ctx).tellSuccess(actualMsgCaptor.capture());
|
||||||
|
verify(ctx).getDbCallbackExecutor();
|
||||||
|
verifyNoMoreInteractions(ctx);
|
||||||
|
String expectedMsgData = "{\"temperature\":40.0,\"airPressure\":123,\"delta\":-1}";
|
||||||
|
assertEquals(expectedMsgData, actualMsgCaptor.getValue().getData());
|
||||||
|
}),
|
||||||
|
// delta = 0, tell failure if delta is negative is set to false and exclude zero deltas from outbound message is set to true so delta should filter out the message.
|
||||||
|
new CalculateDeltaTestConfig(false, true, 40, 40, (ctx, msg) -> {
|
||||||
|
verify(ctx).tellSuccess(eq(msg));
|
||||||
|
verify(ctx).getDbCallbackExecutor();
|
||||||
|
verifyNoMoreInteractions(ctx);
|
||||||
|
}),
|
||||||
|
// delta > 0, exclude zero deltas from outbound message is set to true so it should return message with delta.
|
||||||
|
new CalculateDeltaTestConfig(false, true, 40, 39, (ctx, msg) -> {
|
||||||
|
var actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||||
|
verify(ctx).tellSuccess(actualMsgCaptor.capture());
|
||||||
|
verify(ctx).getDbCallbackExecutor();
|
||||||
|
verifyNoMoreInteractions(ctx);
|
||||||
|
String expectedMsgData = "{\"temperature\":40.0,\"airPressure\":123,\"delta\":1}";
|
||||||
|
assertEquals(expectedMsgData, actualMsgCaptor.getValue().getData());
|
||||||
|
}),
|
||||||
|
// delta > 0, exclude zero deltas from outbound message is set to false so it should return message with delta.
|
||||||
|
new CalculateDeltaTestConfig(false, false, 40, 39, (ctx, msg) -> {
|
||||||
|
var actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||||
|
verify(ctx).tellSuccess(actualMsgCaptor.capture());
|
||||||
|
verify(ctx).getDbCallbackExecutor();
|
||||||
|
verifyNoMoreInteractions(ctx);
|
||||||
|
String expectedMsgData = "{\"temperature\":40.0,\"airPressure\":123,\"delta\":1}";
|
||||||
|
assertEquals(expectedMsgData, actualMsgCaptor.getValue().getData());
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
static class CalculateDeltaTestConfig {
|
||||||
|
private final boolean tellFailureIfDeltaIsNegative;
|
||||||
|
private final boolean computeOnlyTrueDeltas;
|
||||||
|
private final double currentValue;
|
||||||
|
private final double prevValue;
|
||||||
|
private final BiConsumer<TbContext, TbMsg> verificationMethod;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mockFindLatest(TsKvEntry tsKvEntry) {
|
private void mockFindLatest(TsKvEntry tsKvEntry) {
|
||||||
@ -493,12 +559,12 @@ public class CalculateDeltaNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
Arguments.of(0,
|
Arguments.of(0,
|
||||||
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true}",
|
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true}",
|
||||||
true,
|
true,
|
||||||
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"onlyComputeTrueDeltas\":false}"),
|
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"excludeZeroDeltasFromOutboundMessage\":false}"),
|
||||||
// default config for version 1 with upgrade from version 0
|
// default config for version 1 with upgrade from version 0
|
||||||
Arguments.of(1,
|
Arguments.of(1,
|
||||||
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"onlyComputeTrueDeltas\":false}",
|
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"excludeZeroDeltasFromOutboundMessage\":false}",
|
||||||
false,
|
false,
|
||||||
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"onlyComputeTrueDeltas\":false}")
|
"{\"inputValueKey\":\"pulseCounter\",\"outputValueKey\":\"delta\",\"useCache\":true,\"addPeriodBetweenMsgs\":false, \"periodValueKey\":\"periodInMs\", \"round\":null,\"tellFailureIfDeltaIsNegative\":true, \"excludeZeroDeltasFromOutboundMessage\":false}")
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user