From f27ce0686efa83a201a070d23d9ab1a02ec3fc4b Mon Sep 17 00:00:00 2001 From: artem Date: Wed, 10 Jan 2024 14:02:54 +0200 Subject: [PATCH 01/10] TbGpsGeofencingActionNode: added presenceMonitoringStrategyOnEachMessage + tests + node details --- .../engine/geo/TbGpsGeofencingActionNode.java | 39 ++- ...bGpsGeofencingActionNodeConfiguration.java | 3 + .../rule/engine/util/GpsGeofencingEvents.java | 23 ++ .../geo/TbGpsGeofencingActionNodeTest.java | 264 ++++++++++++++++++ 4 files changed, 325 insertions(+), 4 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/GpsGeofencingEvents.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNode.java index abdcc5b299..cd88a1eda7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNode.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.geo; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @@ -28,6 +30,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import java.util.Collections; @@ -39,6 +42,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.ENTERED; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.INSIDE; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.LEFT; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.OUTSIDE; + /** * Created by ashvayka on 19.01.18. */ @@ -46,15 +54,22 @@ import java.util.concurrent.TimeoutException; @RuleNode( type = ComponentType.ACTION, name = "gps geofencing events", + version = 1, configClazz = TbGpsGeofencingActionNodeConfiguration.class, relationTypes = {"Success", "Entered", "Left", "Inside", "Outside"}, nodeDescription = "Produces incoming messages using GPS based geofencing", - nodeDetails = "Extracts latitude and longitude parameters from incoming message and returns different events based on configuration parameters", + nodeDetails = "Extracts latitude and longitude parameters from incoming message and returns different events based on configuration parameters. " + + "

" + + "If an object with coordinates extracted from incoming message enters the geofence, sends a message with the type Entered. " + + "If an object leaves the geofence, sends a message with the type Left. " + + "If the presence monitoring strategy \"On first message\" is selected, sends messages with types Inside or Outside only the first time the geofencing and duration conditions are satisfied; otherwise Success. " + + "If the presence monitoring strategy \"On each message\" is selected, sends messages with types Inside or Outside every time the geofencing condition is satisfied.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeGpsGeofencingConfig" ) public class TbGpsGeofencingActionNode extends AbstractGeofencingNode { + private static final String PRESENCE_MONITORING_STRATEGY_ON_EACH_MESSAGE = "presenceMonitoringStrategyOnEachMessage"; private final Map entityStates = new HashMap<>(); private final Gson gson = new Gson(); private final JsonParser parser = new JsonParser(); @@ -83,18 +98,21 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode (entityState.isInside() ? TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) : TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) { setStaid(ctx, msg.getOriginator(), entityState); - ctx.tellNext(msg, entityState.isInside() ? "Inside" : "Outside"); + ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE); told = true; } } + } else { + ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE); + told = true; } if (!told) { ctx.tellSuccess(msg); @@ -127,4 +145,17 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode getConfigClazz() { return TbGpsGeofencingActionNodeConfiguration.class; } + + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + if (fromVersion == 0) { + if (!oldConfiguration.has(PRESENCE_MONITORING_STRATEGY_ON_EACH_MESSAGE)) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put(PRESENCE_MONITORING_STRATEGY_ON_EACH_MESSAGE, false); + } + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java index d0adad8996..ee375083b4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java @@ -31,6 +31,8 @@ public class TbGpsGeofencingActionNodeConfiguration extends TbGpsGeofencingFilte private String minInsideDurationTimeUnit; private String minOutsideDurationTimeUnit; + private boolean presenceMonitoringStrategyOnEachMessage; + @Override public TbGpsGeofencingActionNodeConfiguration defaultConfiguration() { TbGpsGeofencingActionNodeConfiguration configuration = new TbGpsGeofencingActionNodeConfiguration(); @@ -43,6 +45,7 @@ public class TbGpsGeofencingActionNodeConfiguration extends TbGpsGeofencingFilte configuration.setMinOutsideDurationTimeUnit(TimeUnit.MINUTES.name()); configuration.setMinInsideDuration(1); configuration.setMinOutsideDuration(1); + configuration.setPresenceMonitoringStrategyOnEachMessage(false); return configuration; } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/GpsGeofencingEvents.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/GpsGeofencingEvents.java new file mode 100644 index 0000000000..db5ddaf698 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/GpsGeofencingEvents.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +public class GpsGeofencingEvents { + public static final String ENTERED = "Entered"; + public static final String INSIDE = "Inside"; + public static final String LEFT = "Left"; + public static final String OUTSIDE = "Outside"; +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java new file mode 100644 index 0000000000..dd7536f4ee --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java @@ -0,0 +1,264 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.geo; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.util.concurrent.Futures; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.util.TbPair; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.attributes.AttributesService; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.ENTERED; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.INSIDE; +import static org.thingsboard.server.common.data.msg.TbNodeConnectionType.SUCCESS; + +class TbGpsGeofencingActionNodeTest { + private TbContext ctx; + private TbGpsGeofencingActionNode node; + private AttributesService attributesService; + + @BeforeEach + void setUp() { + ctx = mock(TbContext.class); + attributesService = mock(AttributesService.class); + node = new TbGpsGeofencingActionNode(); + } + + @AfterEach + void tearDown() { + node.destroy(); + } + + private static Stream givenPresenceMonitoringStrategyOnEachMessage_whenOnMsg_thenVerifyOutputMsgTypes() { + return Stream.of( + // default config with presenceMonitoringStrategyOnEachMessage false + Arguments.of(false, List.of( + Map.of(ENTERED, 0, INSIDE, 0, SUCCESS, 0), + Map.of(ENTERED, 1, INSIDE, 0, SUCCESS, 0), + Map.of(ENTERED, 1, INSIDE, 0, SUCCESS, 1), + Map.of(ENTERED, 1, INSIDE, 1, SUCCESS, 1), + Map.of(ENTERED, 1, INSIDE, 1, SUCCESS, 2) + )), + // default config with presenceMonitoringStrategyOnEachMessage true + Arguments.of(true, List.of( + Map.of(ENTERED, 0, INSIDE, 0, SUCCESS, 0), + Map.of(ENTERED, 1, INSIDE, 0, SUCCESS, 0), + Map.of(ENTERED, 1, INSIDE, 1, SUCCESS, 0), + Map.of(ENTERED, 1, INSIDE, 2, SUCCESS, 0), + Map.of(ENTERED, 1, INSIDE, 3, SUCCESS, 0) + )) + ); + } + + @ParameterizedTest + @MethodSource + void givenPresenceMonitoringStrategyOnEachMessage_whenOnMsg_thenVerifyOutputMsgTypes( + boolean presenceMonitoringStrategyOnEachMessage, + List> outputMsgTypesCountList + ) throws TbNodeException { + // GIVEN + var config = new TbGpsGeofencingActionNodeConfiguration().defaultConfiguration(); + config.setPresenceMonitoringStrategyOnEachMessage(presenceMonitoringStrategyOnEachMessage); + node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + TbMsgMetaData metadata = getMetadataForNewVersionPolygonPerimeter(); + TbMsg msg = getTbMsg(deviceId, metadata, + GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLatitude(), GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLongitude()); + + when(ctx.getAttributesService()).thenReturn(attributesService); + when(ctx + .getAttributesService() + .find(ctx.getTenantId(), msg.getOriginator(), DataConstants.SERVER_SCOPE, ctx.getServiceId())) + .thenReturn(Futures.immediateFuture(Optional.empty())); + + // WHEN + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + node.onMsg(ctx, msg); + + // THEN + verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(0)); + + // WHEN + msg = getTbMsg(deviceId, metadata, + GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLatitude(), GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLongitude()); + node.onMsg(ctx, msg); + + // THEN + verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(1)); + + // WHEN + node.onMsg(ctx, msg); + + // THEN + verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(2)); + + // WHEN + config.setMinInsideDuration(0); + node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + node.onMsg(ctx, msg); + + // THEN + verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(3)); + + // WHEN + node.onMsg(ctx, msg); + + // THEN + verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(4)); + } + + private TbMsg getTbMsg(EntityId entityId, TbMsgMetaData metadata, double latitude, double longitude) { + String data = "{\"latitude\": " + latitude + ", \"longitude\": " + longitude + "}"; + return TbMsg.newMsg(TbMsgType.POST_ATTRIBUTES_REQUEST, entityId, metadata, data); + } + + private TbMsgMetaData getMetadataForNewVersionPolygonPerimeter() { + var metadata = new TbMsgMetaData(); + metadata.putValue("ss_perimeter", GeoUtilTest.SIMPLE_RECT); + return metadata; + } + + private void verifyNodeOutputs(ArgumentCaptor newMsgCaptor, Map outputMsgTypesCount) { + verify(this.ctx, times(outputMsgTypesCount.get(ENTERED))).tellNext(newMsgCaptor.capture(), eq(ENTERED)); + verify(this.ctx, times(outputMsgTypesCount.get(INSIDE))).tellNext(newMsgCaptor.capture(), eq(INSIDE)); + verify(this.ctx, times(outputMsgTypesCount.get(SUCCESS))).tellSuccess(newMsgCaptor.capture()); + } + + // Rule nodes upgrade + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // default config for version 0 + Arguments.of(0, + "{\n" + + " \"minInsideDuration\": 1,\n" + + " \"minOutsideDuration\": 1,\n" + + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"latitudeKeyName\": \"latitude\",\n" + + " \"longitudeKeyName\": \"longitude\",\n" + + " \"perimeterType\": \"POLYGON\",\n" + + " \"fetchPerimeterInfoFromMessageMetadata\": true,\n" + + " \"perimeterKeyName\": \"ss_perimeter\",\n" + + " \"polygonsDefinition\": null,\n" + + " \"centerLatitude\": null,\n" + + " \"centerLongitude\": null,\n" + + " \"range\": null,\n" + + " \"rangeUnit\": null\n" + + "}\n", + true, + "{\n" + + " \"minInsideDuration\": 1,\n" + + " \"minOutsideDuration\": 1,\n" + + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"presenceMonitoringStrategyOnEachMessage\": false,\n" + + " \"latitudeKeyName\": \"latitude\",\n" + + " \"longitudeKeyName\": \"longitude\",\n" + + " \"perimeterType\": \"POLYGON\",\n" + + " \"fetchPerimeterInfoFromMessageMetadata\": true,\n" + + " \"perimeterKeyName\": \"ss_perimeter\",\n" + + " \"polygonsDefinition\": null,\n" + + " \"centerLatitude\": null,\n" + + " \"centerLongitude\": null,\n" + + " \"range\": null,\n" + + " \"rangeUnit\": null\n" + + "}\n"), + // default config for version 1 with upgrade from version 0 + Arguments.of(0, + "{\n" + + " \"minInsideDuration\": 1,\n" + + " \"minOutsideDuration\": 1,\n" + + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"presenceMonitoringStrategyOnEachMessage\": false,\n" + + " \"latitudeKeyName\": \"latitude\",\n" + + " \"longitudeKeyName\": \"longitude\",\n" + + " \"perimeterType\": \"POLYGON\",\n" + + " \"fetchPerimeterInfoFromMessageMetadata\": true,\n" + + " \"perimeterKeyName\": \"ss_perimeter\",\n" + + " \"polygonsDefinition\": null,\n" + + " \"centerLatitude\": null,\n" + + " \"centerLongitude\": null,\n" + + " \"range\": null,\n" + + " \"rangeUnit\": null\n" + + "}\n", + false, + "{\n" + + " \"minInsideDuration\": 1,\n" + + " \"minOutsideDuration\": 1,\n" + + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + + " \"presenceMonitoringStrategyOnEachMessage\": false,\n" + + " \"latitudeKeyName\": \"latitude\",\n" + + " \"longitudeKeyName\": \"longitude\",\n" + + " \"perimeterType\": \"POLYGON\",\n" + + " \"fetchPerimeterInfoFromMessageMetadata\": true,\n" + + " \"perimeterKeyName\": \"ss_perimeter\",\n" + + " \"polygonsDefinition\": null,\n" + + " \"centerLatitude\": null,\n" + + " \"centerLongitude\": null,\n" + + " \"range\": null,\n" + + " \"rangeUnit\": null\n" + + "}\n") + ); + } + + @ParameterizedTest + @MethodSource + void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException { + // GIVEN + JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr); + JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr); + + // WHEN + TbPair upgradeResult = node.upgrade(givenVersion, givenConfig); + + // THEN + assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges); + ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond(); + assertThat(upgradedConfig).isEqualTo(expectedConfig); + } + +} From 167d8758f6c1d16f6e45945ac13be7495e406913 Mon Sep 17 00:00:00 2001 From: artem Date: Wed, 24 Jan 2024 18:28:17 +0200 Subject: [PATCH 02/10] TbGpsGeofencingActionNode: renamed key to reportPresenceStatusOnEachMessage + removed told variable + tests refactored --- .../engine/geo/TbGpsGeofencingActionNode.java | 46 ++++--- ...bGpsGeofencingActionNodeConfiguration.java | 4 +- .../geo/GpsGeofencingActionTestCase.java | 38 ++++++ .../geo/TbGpsGeofencingActionNodeTest.java | 117 ++++++++---------- 4 files changed, 120 insertions(+), 85 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNode.java index cd88a1eda7..0a9d0deb1d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNode.java @@ -62,14 +62,16 @@ import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.OUTSIDE; "

" + "If an object with coordinates extracted from incoming message enters the geofence, sends a message with the type Entered. " + "If an object leaves the geofence, sends a message with the type Left. " + - "If the presence monitoring strategy \"On first message\" is selected, sends messages with types Inside or Outside only the first time the geofencing and duration conditions are satisfied; otherwise Success. " + - "If the presence monitoring strategy \"On each message\" is selected, sends messages with types Inside or Outside every time the geofencing condition is satisfied.", + "If the presence monitoring strategy \"On first message\" is selected, sends messages via rule node connection type Inside or Outside only the first time the geofencing and duration conditions are satisfied; otherwise sends messages via rule node connection type Success. " + + "If the presence monitoring strategy \"On each message\" is selected, sends messages via rule node connection type Inside or Outside every time the geofencing condition is satisfied. " + + "

" + + "Output connections: Entered, Left, Inside, Outside, Success", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeGpsGeofencingConfig" ) public class TbGpsGeofencingActionNode extends AbstractGeofencingNode { - private static final String PRESENCE_MONITORING_STRATEGY_ON_EACH_MESSAGE = "presenceMonitoringStrategyOnEachMessage"; + private static final String REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE = "reportPresenceStatusOnEachMessage"; private final Map entityStates = new HashMap<>(); private final Gson gson = new Gson(); private final JsonParser parser = new JsonParser(); @@ -95,28 +97,32 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode (entityState.isInside() ? - TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) : TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) { - setStaid(ctx, msg.getOriginator(), entityState); - ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE); - told = true; - } - } - } else { + return; + } + + if (config.isReportPresenceStatusOnEachMessage()) { ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE); - told = true; + return; } - if (!told) { + + if (entityState.isStayed()) { ctx.tellSuccess(msg); + return; } + + long stayTime = ts - entityState.getStateSwitchTime(); + if (stayTime > (entityState.isInside() ? + TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) : + TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) { + setStaid(ctx, msg.getOriginator(), entityState); + ctx.tellNext(msg, entityState.isInside() ? INSIDE : OUTSIDE); + return; + } + + ctx.tellSuccess(msg); } private void switchState(TbContext ctx, EntityId entityId, EntityGeofencingState entityState, boolean matches, long ts) { @@ -150,9 +156,9 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { boolean hasChanges = false; if (fromVersion == 0) { - if (!oldConfiguration.has(PRESENCE_MONITORING_STRATEGY_ON_EACH_MESSAGE)) { + if (!oldConfiguration.has(REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE)) { hasChanges = true; - ((ObjectNode) oldConfiguration).put(PRESENCE_MONITORING_STRATEGY_ON_EACH_MESSAGE, false); + ((ObjectNode) oldConfiguration).put(REPORT_PRESENCE_STATUS_ON_EACH_MESSAGE, false); } } return new TbPair<>(hasChanges, oldConfiguration); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java index ee375083b4..04bbab01b7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeConfiguration.java @@ -31,7 +31,7 @@ public class TbGpsGeofencingActionNodeConfiguration extends TbGpsGeofencingFilte private String minInsideDurationTimeUnit; private String minOutsideDurationTimeUnit; - private boolean presenceMonitoringStrategyOnEachMessage; + private boolean reportPresenceStatusOnEachMessage; @Override public TbGpsGeofencingActionNodeConfiguration defaultConfiguration() { @@ -45,7 +45,7 @@ public class TbGpsGeofencingActionNodeConfiguration extends TbGpsGeofencingFilte configuration.setMinOutsideDurationTimeUnit(TimeUnit.MINUTES.name()); configuration.setMinInsideDuration(1); configuration.setMinOutsideDuration(1); - configuration.setPresenceMonitoringStrategyOnEachMessage(false); + configuration.setReportPresenceStatusOnEachMessage(true); return configuration; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java new file mode 100644 index 0000000000..8444c9cf0d --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.geo; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; + +import java.util.HashMap; +import java.util.Map; + +@Data +public class GpsGeofencingActionTestCase { + private EntityId entityId; + private Map entityStates; + private boolean msgInside; + private boolean reportPresenceStatusOnEachMessage; + + public GpsGeofencingActionTestCase(EntityId entityId, boolean msgInside, boolean reportPresenceStatusOnEachMessage, EntityGeofencingState entityGeofencingState) { + this.entityId = entityId; + this.msgInside = msgInside; + this.reportPresenceStatusOnEachMessage = reportPresenceStatusOnEachMessage; + this.entityStates = new HashMap<>(); + this.entityStates.put(entityId, entityGeofencingState); + } +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java index dd7536f4ee..42d69f2593 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -37,8 +38,7 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.attributes.AttributesService; -import java.util.List; -import java.util.Map; +import java.time.Duration; import java.util.Optional; import java.util.UUID; import java.util.stream.Stream; @@ -51,6 +51,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.ENTERED; import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.INSIDE; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.LEFT; +import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.OUTSIDE; import static org.thingsboard.server.common.data.msg.TbNodeConnectionType.SUCCESS; class TbGpsGeofencingActionNodeTest { @@ -70,42 +72,47 @@ class TbGpsGeofencingActionNodeTest { node.destroy(); } - private static Stream givenPresenceMonitoringStrategyOnEachMessage_whenOnMsg_thenVerifyOutputMsgTypes() { + private static Stream givenReportPresenceStatusOnEachMessage_whenOnMsg_thenVerifyOutputMsgType() { + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long tsNow = System.currentTimeMillis(); + long tsNowMinusMinuteAndMillis = tsNow - Duration.ofMinutes(1).plusMillis(1).toMillis(); return Stream.of( - // default config with presenceMonitoringStrategyOnEachMessage false - Arguments.of(false, List.of( - Map.of(ENTERED, 0, INSIDE, 0, SUCCESS, 0), - Map.of(ENTERED, 1, INSIDE, 0, SUCCESS, 0), - Map.of(ENTERED, 1, INSIDE, 0, SUCCESS, 1), - Map.of(ENTERED, 1, INSIDE, 1, SUCCESS, 1), - Map.of(ENTERED, 1, INSIDE, 1, SUCCESS, 2) - )), - // default config with presenceMonitoringStrategyOnEachMessage true - Arguments.of(true, List.of( - Map.of(ENTERED, 0, INSIDE, 0, SUCCESS, 0), - Map.of(ENTERED, 1, INSIDE, 0, SUCCESS, 0), - Map.of(ENTERED, 1, INSIDE, 1, SUCCESS, 0), - Map.of(ENTERED, 1, INSIDE, 2, SUCCESS, 0), - Map.of(ENTERED, 1, INSIDE, 3, SUCCESS, 0) - )) + // default config with presenceMonitoringStrategyOnEachMessage false and msgInside true + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, new EntityGeofencingState(false, 0, false)), ENTERED), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, new EntityGeofencingState(true, tsNow, false)), SUCCESS), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, new EntityGeofencingState(true, tsNow, true)), SUCCESS), + // default config with presenceMonitoringStrategyOnEachMessage false and msgInside false + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, new EntityGeofencingState(false, 0, false)), LEFT), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, new EntityGeofencingState(false, tsNow, false)), SUCCESS), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, new EntityGeofencingState(false, tsNow, true)), SUCCESS), + // default config with presenceMonitoringStrategyOnEachMessage true and msgInside true + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, new EntityGeofencingState(false, 0, false)), ENTERED), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, new EntityGeofencingState(true, tsNow, false)), INSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE), + // default config with presenceMonitoringStrategyOnEachMessage true and msgInside false + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, new EntityGeofencingState(false, 0, false)), LEFT), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, new EntityGeofencingState(false, tsNow, false)), OUTSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE) ); } @ParameterizedTest @MethodSource - void givenPresenceMonitoringStrategyOnEachMessage_whenOnMsg_thenVerifyOutputMsgTypes( - boolean presenceMonitoringStrategyOnEachMessage, - List> outputMsgTypesCountList + void givenReportPresenceStatusOnEachMessage_whenOnMsg_thenVerifyOutputMsgType( + GpsGeofencingActionTestCase gpsGeofencingActionTestCase, + String expectedOutput ) throws TbNodeException { // GIVEN var config = new TbGpsGeofencingActionNodeConfiguration().defaultConfiguration(); - config.setPresenceMonitoringStrategyOnEachMessage(presenceMonitoringStrategyOnEachMessage); + config.setReportPresenceStatusOnEachMessage(gpsGeofencingActionTestCase.isReportPresenceStatusOnEachMessage()); + node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - DeviceId deviceId = new DeviceId(UUID.randomUUID()); - TbMsgMetaData metadata = getMetadataForNewVersionPolygonPerimeter(); - TbMsg msg = getTbMsg(deviceId, metadata, - GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLatitude(), GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLongitude()); + TbMsg msg = gpsGeofencingActionTestCase.isMsgInside() ? + getInsideRectangleTbMsg(gpsGeofencingActionTestCase.getEntityId()) : + getOutsideRectangleTbMsg(gpsGeofencingActionTestCase.getEntityId()); when(ctx.getAttributesService()).thenReturn(attributesService); when(ctx @@ -113,40 +120,30 @@ class TbGpsGeofencingActionNodeTest { .find(ctx.getTenantId(), msg.getOriginator(), DataConstants.SERVER_SCOPE, ctx.getServiceId())) .thenReturn(Futures.immediateFuture(Optional.empty())); + ReflectionTestUtils.setField(node, "entityStates", gpsGeofencingActionTestCase.getEntityStates()); + // WHEN ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); node.onMsg(ctx, msg); // THEN - verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(0)); + if (SUCCESS.equals(expectedOutput)) { + verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture()); + } else { + verify(ctx, times(1)).tellNext(newMsgCaptor.capture(), eq(expectedOutput)); + } + } - // WHEN - msg = getTbMsg(deviceId, metadata, - GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLatitude(), GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLongitude()); - node.onMsg(ctx, msg); + private TbMsg getOutsideRectangleTbMsg(EntityId entityId) { + return getTbMsg(entityId, getMetadataForNewVersionPolygonPerimeter(), + GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLatitude(), + GeoUtilTest.POINT_OUTSIDE_SIMPLE_RECT.getLongitude()); + } - // THEN - verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(1)); - - // WHEN - node.onMsg(ctx, msg); - - // THEN - verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(2)); - - // WHEN - config.setMinInsideDuration(0); - node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - node.onMsg(ctx, msg); - - // THEN - verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(3)); - - // WHEN - node.onMsg(ctx, msg); - - // THEN - verifyNodeOutputs(newMsgCaptor, outputMsgTypesCountList.get(4)); + private TbMsg getInsideRectangleTbMsg(EntityId entityId) { + return getTbMsg(entityId, getMetadataForNewVersionPolygonPerimeter(), + GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLatitude(), + GeoUtilTest.POINT_INSIDE_SIMPLE_RECT_CENTER.getLongitude()); } private TbMsg getTbMsg(EntityId entityId, TbMsgMetaData metadata, double latitude, double longitude) { @@ -160,12 +157,6 @@ class TbGpsGeofencingActionNodeTest { return metadata; } - private void verifyNodeOutputs(ArgumentCaptor newMsgCaptor, Map outputMsgTypesCount) { - verify(this.ctx, times(outputMsgTypesCount.get(ENTERED))).tellNext(newMsgCaptor.capture(), eq(ENTERED)); - verify(this.ctx, times(outputMsgTypesCount.get(INSIDE))).tellNext(newMsgCaptor.capture(), eq(INSIDE)); - verify(this.ctx, times(outputMsgTypesCount.get(SUCCESS))).tellSuccess(newMsgCaptor.capture()); - } - // Rule nodes upgrade private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { return Stream.of( @@ -193,7 +184,7 @@ class TbGpsGeofencingActionNodeTest { " \"minOutsideDuration\": 1,\n" + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + - " \"presenceMonitoringStrategyOnEachMessage\": false,\n" + + " \"reportPresenceStatusOnEachMessage\": false,\n" + " \"latitudeKeyName\": \"latitude\",\n" + " \"longitudeKeyName\": \"longitude\",\n" + " \"perimeterType\": \"POLYGON\",\n" + @@ -212,7 +203,7 @@ class TbGpsGeofencingActionNodeTest { " \"minOutsideDuration\": 1,\n" + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + - " \"presenceMonitoringStrategyOnEachMessage\": false,\n" + + " \"reportPresenceStatusOnEachMessage\": false,\n" + " \"latitudeKeyName\": \"latitude\",\n" + " \"longitudeKeyName\": \"longitude\",\n" + " \"perimeterType\": \"POLYGON\",\n" + @@ -230,7 +221,7 @@ class TbGpsGeofencingActionNodeTest { " \"minOutsideDuration\": 1,\n" + " \"minInsideDurationTimeUnit\": \"MINUTES\",\n" + " \"minOutsideDurationTimeUnit\": \"MINUTES\",\n" + - " \"presenceMonitoringStrategyOnEachMessage\": false,\n" + + " \"reportPresenceStatusOnEachMessage\": false,\n" + " \"latitudeKeyName\": \"latitude\",\n" + " \"longitudeKeyName\": \"longitude\",\n" + " \"perimeterType\": \"POLYGON\",\n" + From 54203ea1775ac91c402362e975babfb003f5552e Mon Sep 17 00:00:00 2001 From: artem Date: Wed, 24 Jan 2024 19:19:10 +0200 Subject: [PATCH 03/10] TbGpsGeofencingActionNodeTest: added asserts and additional verifications --- .../geo/TbGpsGeofencingActionNodeTest.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java index 42d69f2593..d61688d5a9 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java @@ -44,8 +44,10 @@ import java.util.UUID; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -123,15 +125,27 @@ class TbGpsGeofencingActionNodeTest { ReflectionTestUtils.setField(node, "entityStates", gpsGeofencingActionTestCase.getEntityStates()); // WHEN - ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); node.onMsg(ctx, msg); // THEN + verify(ctx, never()).tellFailure(any(), any(Throwable.class)); + verify(ctx, never()).enqueueForTellNext(any(), eq(expectedOutput), any(), any()); + verify(ctx, never()).ack(any()); + + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); if (SUCCESS.equals(expectedOutput)) { verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture()); } else { verify(ctx, times(1)).tellNext(newMsgCaptor.capture(), eq(expectedOutput)); } + + var actualMsg = newMsgCaptor.getValue(); + assertThat(actualMsg).isNotNull(); + assertThat(actualMsg).isSameAs(msg); + assertThat(actualMsg.getType()).isSameAs(msg.getType()); + assertThat(actualMsg.getData()).isSameAs(msg.getData()); + assertThat(actualMsg.getMetaData()).isEqualTo(msg.getMetaData()); + assertThat(actualMsg.getOriginator()).isSameAs(msg.getOriginator()); } private TbMsg getOutsideRectangleTbMsg(EntityId entityId) { From 8cdde9e67c3523b48436aba4466caae0c95f0da9 Mon Sep 17 00:00:00 2001 From: artem Date: Sat, 3 Feb 2024 21:59:33 +0200 Subject: [PATCH 04/10] TbGpsGeofencingActionNodeTest: added extends AbstractRuleNodeUpgradeTest added @ExtendWith(MockitoExtension.class) and refactored --- .../geo/TbGpsGeofencingActionNodeTest.java | 105 ++++++++---------- 1 file changed, 47 insertions(+), 58 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java index d61688d5a9..fe78f57f4e 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java @@ -15,40 +15,37 @@ */ package org.thingsboard.rule.engine.geo; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.util.concurrent.Futures; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.attributes.AttributesService; import java.time.Duration; -import java.util.Optional; import java.util.UUID; import java.util.stream.Stream; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.ENTERED; @@ -57,16 +54,17 @@ import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.LEFT; import static org.thingsboard.rule.engine.util.GpsGeofencingEvents.OUTSIDE; import static org.thingsboard.server.common.data.msg.TbNodeConnectionType.SUCCESS; -class TbGpsGeofencingActionNodeTest { +@ExtendWith(MockitoExtension.class) +class TbGpsGeofencingActionNodeTest extends AbstractRuleNodeUpgradeTest { + @Mock private TbContext ctx; - private TbGpsGeofencingActionNode node; + @Mock private AttributesService attributesService; + private TbGpsGeofencingActionNode node; @BeforeEach void setUp() { - ctx = mock(TbContext.class); - attributesService = mock(AttributesService.class); - node = new TbGpsGeofencingActionNode(); + node = spy(new TbGpsGeofencingActionNode()); } @AfterEach @@ -80,23 +78,37 @@ class TbGpsGeofencingActionNodeTest { long tsNowMinusMinuteAndMillis = tsNow - Duration.ofMinutes(1).plusMillis(1).toMillis(); return Stream.of( // default config with presenceMonitoringStrategyOnEachMessage false and msgInside true - Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, new EntityGeofencingState(false, 0, false)), ENTERED), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, new EntityGeofencingState(true, tsNow, false)), SUCCESS), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, new EntityGeofencingState(true, tsNow, true)), SUCCESS), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, + new EntityGeofencingState(false, 0, false)), ENTERED), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, + new EntityGeofencingState(true, tsNow, false)), SUCCESS), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, + new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, false, + new EntityGeofencingState(true, tsNow, true)), SUCCESS), // default config with presenceMonitoringStrategyOnEachMessage false and msgInside false - Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, new EntityGeofencingState(false, 0, false)), LEFT), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, new EntityGeofencingState(false, tsNow, false)), SUCCESS), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, new EntityGeofencingState(false, tsNow, true)), SUCCESS), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, + new EntityGeofencingState(false, 0, false)), LEFT), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, + new EntityGeofencingState(false, tsNow, false)), SUCCESS), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, + new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, false, + new EntityGeofencingState(false, tsNow, true)), SUCCESS), // default config with presenceMonitoringStrategyOnEachMessage true and msgInside true - Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, new EntityGeofencingState(false, 0, false)), ENTERED), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, new EntityGeofencingState(true, tsNow, false)), INSIDE), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, + new EntityGeofencingState(false, 0, false)), ENTERED), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, + new EntityGeofencingState(true, tsNow, false)), INSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, true, true, + new EntityGeofencingState(true, tsNowMinusMinuteAndMillis, false)), INSIDE), // default config with presenceMonitoringStrategyOnEachMessage true and msgInside false - Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, new EntityGeofencingState(false, 0, false)), LEFT), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, new EntityGeofencingState(false, tsNow, false)), OUTSIDE), - Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE) + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, + new EntityGeofencingState(false, 0, false)), LEFT), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, + new EntityGeofencingState(false, tsNow, false)), OUTSIDE), + Arguments.of(new GpsGeofencingActionTestCase(deviceId, false, true, + new EntityGeofencingState(false, tsNowMinusMinuteAndMillis, false)), OUTSIDE) ); } @@ -117,10 +129,6 @@ class TbGpsGeofencingActionNodeTest { getOutsideRectangleTbMsg(gpsGeofencingActionTestCase.getEntityId()); when(ctx.getAttributesService()).thenReturn(attributesService); - when(ctx - .getAttributesService() - .find(ctx.getTenantId(), msg.getOriginator(), DataConstants.SERVER_SCOPE, ctx.getServiceId())) - .thenReturn(Futures.immediateFuture(Optional.empty())); ReflectionTestUtils.setField(node, "entityStates", gpsGeofencingActionTestCase.getEntityStates()); @@ -128,24 +136,16 @@ class TbGpsGeofencingActionNodeTest { node.onMsg(ctx, msg); // THEN + verify(ctx.getAttributesService(), never()).find(any(), any(), any(), anyString()); verify(ctx, never()).tellFailure(any(), any(Throwable.class)); verify(ctx, never()).enqueueForTellNext(any(), eq(expectedOutput), any(), any()); verify(ctx, never()).ack(any()); - ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); if (SUCCESS.equals(expectedOutput)) { - verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture()); + verify(ctx).tellSuccess(eq(msg)); } else { - verify(ctx, times(1)).tellNext(newMsgCaptor.capture(), eq(expectedOutput)); + verify(ctx).tellNext(eq(msg), eq(expectedOutput)); } - - var actualMsg = newMsgCaptor.getValue(); - assertThat(actualMsg).isNotNull(); - assertThat(actualMsg).isSameAs(msg); - assertThat(actualMsg.getType()).isSameAs(msg.getType()); - assertThat(actualMsg.getData()).isSameAs(msg.getData()); - assertThat(actualMsg.getMetaData()).isEqualTo(msg.getMetaData()); - assertThat(actualMsg.getOriginator()).isSameAs(msg.getOriginator()); } private TbMsg getOutsideRectangleTbMsg(EntityId entityId) { @@ -250,20 +250,9 @@ class TbGpsGeofencingActionNodeTest { ); } - @ParameterizedTest - @MethodSource - void givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig(int givenVersion, String givenConfigStr, boolean hasChanges, String expectedConfigStr) throws TbNodeException { - // GIVEN - JsonNode givenConfig = JacksonUtil.toJsonNode(givenConfigStr); - JsonNode expectedConfig = JacksonUtil.toJsonNode(expectedConfigStr); - - // WHEN - TbPair upgradeResult = node.upgrade(givenVersion, givenConfig); - - // THEN - assertThat(upgradeResult.getFirst()).isEqualTo(hasChanges); - ObjectNode upgradedConfig = (ObjectNode) upgradeResult.getSecond(); - assertThat(upgradedConfig).isEqualTo(expectedConfig); + @Override + protected TbNode getTestNode() { + return node; } } From b0b8e56c97d63f2dcd6fc8b00706ef3ba0d3f312 Mon Sep 17 00:00:00 2001 From: artem Date: Sat, 3 Feb 2024 22:04:20 +0200 Subject: [PATCH 05/10] small changes --- .../thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java | 1 + .../rule/engine/geo/TbGpsGeofencingActionNodeTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java index 8444c9cf0d..804d30d28f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/GpsGeofencingActionTestCase.java @@ -23,6 +23,7 @@ import java.util.Map; @Data public class GpsGeofencingActionTestCase { + private EntityId entityId; private Map entityStates; private boolean msgInside; diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java index fe78f57f4e..5cd7e02c59 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/geo/TbGpsGeofencingActionNodeTest.java @@ -56,6 +56,7 @@ import static org.thingsboard.server.common.data.msg.TbNodeConnectionType.SUCCES @ExtendWith(MockitoExtension.class) class TbGpsGeofencingActionNodeTest extends AbstractRuleNodeUpgradeTest { + @Mock private TbContext ctx; @Mock From 7cd4d477ff98d66072ebca833afda3b596f9a0d5 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Thu, 8 Feb 2024 15:53:55 +0200 Subject: [PATCH 06/10] Fix edge cycling: update edge root rule chain, add test. Improve Edge rpc request to edge device. Add support for originator fields for rule nodes --- .../device/DeviceActorMessageProcessor.java | 15 ++-- .../RuleChainActorMessageProcessor.java | 2 +- .../service/edge/rpc/EdgeGrpcService.java | 2 + .../rule/RuleChainEdgeProcessor.java | 5 ++ .../service/queue/TbMsgPackCallback.java | 1 - .../server/edge/RuleChainEdgeTest.java | 23 ++++- .../RuleChainMsgConstructorTest.java | 87 +++++++++---------- .../notification/DefaultNotifications.java | 4 +- .../thingsboard/rest/client/RestClient.java | 5 +- .../util/EntitiesFieldsAsyncLoader.java | 4 + 10 files changed, 89 insertions(+), 59 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 58bd82abb3..9f92838712 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -66,7 +66,6 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNoti import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; @@ -90,7 +89,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; +import org.thingsboard.server.gen.transport.TransportProtos.UplinkNotificationMsg; import org.thingsboard.server.service.rpc.RpcSubmitStrategy; +import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.Nullable; @@ -173,7 +174,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private EdgeId findRelatedEdgeId() { List result = - systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); + systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); if (result != null && result.size() > 0) { EntityRelation relationToEdge = result.get(0); if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) { @@ -212,8 +213,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (systemContext.isEdgesEnabled() && edgeId != null) { log.debug("[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId); try { - saveRpcRequestToEdgeQueue(request, requestId).get(); - sent = true; + Optional edgeAttributeOpt = systemContext.getAttributesService().find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE).get(); + if (edgeAttributeOpt.isPresent() && edgeAttributeOpt.get().getBooleanValue().orElse(false)) { + saveRpcRequestToEdgeQueue(request, requestId).get(); + } else { + log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}. The Edge is currently offline or unreachable", tenantId, deviceId, edgeId.getId(), request); + } } catch (InterruptedException | ExecutionException e) { log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}", tenantId, deviceId, edgeId.getId(), request, e); } @@ -470,7 +475,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso callback.onSuccess(); } - private void processUplinkNotificationMsg(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg uplinkNotificationMsg) { + private void processUplinkNotificationMsg(SessionInfoProto sessionInfo, UplinkNotificationMsg uplinkNotificationMsg) { String nodeId = sessionInfo.getNodeId(); sessions.entrySet().stream() .filter(kv -> kv.getValue().getSessionInfo().getNodeId().equals(nodeId) && (kv.getValue().isSubscribedToAttributes() || kv.getValue().isSubscribedToRPC())) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index e14ed8203b..870ccda60a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -16,7 +16,6 @@ package org.thingsboard.server.actors.ruleChain; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorRef; @@ -29,6 +28,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; import org.thingsboard.server.common.data.relation.EntityRelation; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index c23fd98d8e..026ca51e8d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -477,6 +477,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i TbMsgMetaData md = new TbMsgMetaData(); if (!persistToTelemetry) { md.putValue(DataConstants.SCOPE, DataConstants.SERVER_SCOPE); + md.putValue("edgeName", edge.getName()); + md.putValue("edgeType", edge.getType()); } TbMsg tbMsg = TbMsg.newMsg(msgType, edgeId, md, TbMsgDataType.JSON, data); clusterService.pushMsgToRuleEngine(tenantId, edgeId, tbMsg, null); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java index 2a191dfaf4..118cc0ccd4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.processor.rule; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.rule.RuleChain; @@ -53,6 +54,10 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor { isRoot = Boolean.parseBoolean(edgeEvent.getBody().get(EDGE_IS_ROOT_BODY_KEY).asText()); } catch (Exception ignored) {} } + if (!isRoot) { + Edge edge = edgeService.findEdgeById(edgeEvent.getTenantId(), edgeEvent.getEdgeId()); + isRoot = edge.getRootRuleChainId().equals(ruleChainId); + } UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); RuleChainUpdateMsg ruleChainUpdateMsg = ((RuleChainMsgConstructor) ruleChainMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index d7ddd5d81b..1f9e85bdc7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.TbMsgCallback; -import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import java.util.UUID; import java.util.concurrent.TimeUnit; diff --git a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java index 4942dbdb9b..62d2c5eade 100644 --- a/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/RuleChainEdgeTest.java @@ -146,7 +146,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { } } - private void createRuleChainMetadata(RuleChain ruleChain) { + private RuleChainMetaData createRuleChainMetadata(RuleChain ruleChain) { RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); ruleChainMetaData.setRuleChainId(ruleChain.getId()); @@ -182,7 +182,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { ruleChainMetaData.addConnectionInfo(0, 2, "fail"); ruleChainMetaData.addConnectionInfo(1, 2, "success"); - doPost("/api/ruleChain/metadata", ruleChainMetaData, RuleChainMetaData.class); + return doPost("/api/ruleChain/metadata", ruleChainMetaData, RuleChainMetaData.class); } @Test @@ -193,9 +193,10 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { ruleChain.setType(RuleChainType.EDGE); RuleChain savedRuleChain = doPost("/api/ruleChain", ruleChain, RuleChain.class); - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); doPost("/api/edge/" + edge.getUuidId() + "/ruleChain/" + savedRuleChain.getUuidId(), RuleChain.class); + RuleChainMetaData metaData = createRuleChainMetadata(savedRuleChain); Assert.assertTrue(edgeImitator.waitForMessages()); // set new rule chain as root @@ -213,6 +214,22 @@ public class RuleChainEdgeTest extends AbstractEdgeTest { Assert.assertTrue(ruleChainMsg.isRoot()); Assert.assertEquals(savedRuleChain.getId(), ruleChainMsg.getId()); + // update metadata for root rule chain + edgeImitator.expectMessageAmount(1); + metaData.getNodes().forEach(n -> n.setDebugMode(true)); + doPost("/api/ruleChain/metadata", metaData, RuleChainMetaData.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + ruleChainUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainUpdateMsg.class); + Assert.assertTrue(ruleChainUpdateMsgOpt.isPresent()); + ruleChainUpdateMsg = ruleChainUpdateMsgOpt.get(); + ruleChainMsg = JacksonUtil.fromString(ruleChainUpdateMsg.getEntity(), RuleChain.class, true); + Assert.assertNotNull(ruleChainMsg); + Assert.assertTrue(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(ruleChainUpdateMsg.getMsgType()) || + UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE.equals(ruleChainUpdateMsg.getMsgType())); + Assert.assertEquals(savedRuleChain.getId(), ruleChainMsg.getId()); + Assert.assertEquals(savedRuleChain.getName(), ruleChainMsg.getName()); + Assert.assertTrue(ruleChainMsg.isRoot()); + // revert root rule chain edgeImitator.expectMessageAmount(1); doPost("/api/edge/" + edge.getUuidId() diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java index ce6ca753b3..d6bfbf1036 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainMsgConstructorTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.constructor; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; @@ -61,7 +60,7 @@ public class RuleChainMsgConstructorTest { } @Test - public void testConstructRuleChainMetadataUpdatedMsg_V_3_4_0() throws JsonProcessingException { + public void testConstructRuleChainMetadataUpdatedMsg_V_3_4_0() { RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); RuleChainMetaData ruleChainMetaData = createRuleChainMetaData( ruleChainId, 3, createRuleNodes(ruleChainId), createConnections()); @@ -80,7 +79,7 @@ public class RuleChainMsgConstructorTest { } @Test - public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_3() throws JsonProcessingException { + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_3() { RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); RuleChainMetaData ruleChainMetaData = createRuleChainMetaData( ruleChainId, 3, createRuleNodes(ruleChainId), createConnections()); @@ -120,7 +119,7 @@ public class RuleChainMsgConstructorTest { } @Test - public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0() throws JsonProcessingException { + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0() { RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); RuleChainMetaData ruleChainMetaData = createRuleChainMetaData(ruleChainId, 3, createRuleNodes(ruleChainId), createConnections()); RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = @@ -161,7 +160,7 @@ public class RuleChainMsgConstructorTest { } @Test - public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0_inDifferentOrder() throws JsonProcessingException { + public void testConstructRuleChainMetadataUpdatedMsg_V_3_3_0_inDifferentOrder() { // same rule chain metadata, but different order of rule nodes RuleChainId ruleChainId = new RuleChainId(UUID.randomUUID()); RuleChainMetaData ruleChainMetaData1 = createRuleChainMetaData(ruleChainId, 8, createRuleNodesInDifferentOrder(ruleChainId), createConnectionsInDifferentOrder()); @@ -254,7 +253,7 @@ public class RuleChainMsgConstructorTest { return result; } - private List createRuleNodes(RuleChainId ruleChainId) throws JsonProcessingException { + private List createRuleNodes(RuleChainId ruleChainId) { List result = new ArrayList<>(); result.add(getOutputNode(ruleChainId)); result.add(getAcknowledgeNode(ruleChainId)); @@ -301,7 +300,7 @@ public class RuleChainMsgConstructorTest { return result; } - private List createRuleNodesInDifferentOrder(RuleChainId ruleChainId) throws JsonProcessingException { + private List createRuleNodesInDifferentOrder(RuleChainId ruleChainId) { List result = new ArrayList<>(); result.add(getPushToAnalyticsNode(ruleChainId)); result.add(getPushToCloudNode(ruleChainId)); @@ -319,99 +318,99 @@ public class RuleChainMsgConstructorTest { } - private RuleNode getOutputNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getOutputNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.flow.TbRuleChainOutputNode", "Output node", - JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":592}")); + JacksonUtil.toJsonNode("{\"version\":0}"), + JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":178,\"layoutY\":592}")); } - private RuleNode getCheckpointNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getCheckpointNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.flow.TbCheckpointNode", "Checkpoint node", - JacksonUtil.OBJECT_MAPPER.readTree("{\"queueName\":\"HighPriority\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":178,\"layoutY\":647}")); + JacksonUtil.toJsonNode("{\"queueName\":\"HighPriority\"}"), + JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":178,\"layoutY\":647}")); } - private RuleNode getSaveTimeSeriesNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getSaveTimeSeriesNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "Save Timeseries", - JacksonUtil.OBJECT_MAPPER.readTree("{\"defaultTTL\":0}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":823,\"layoutY\":157}")); + JacksonUtil.toJsonNode("{\"defaultTTL\":0}"), + JacksonUtil.toJsonNode("{\"layoutX\":823,\"layoutY\":157}")); } - private RuleNode getMessageTypeSwitchNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getMessageTypeSwitchNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", "Message Type Switch", - JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":347,\"layoutY\":149}")); + JacksonUtil.toJsonNode("{\"version\":0}"), + JacksonUtil.toJsonNode("{\"layoutX\":347,\"layoutY\":149}")); } - private RuleNode getLogOtherNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getLogOtherNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.action.TbLogNode", "Log Other", - JacksonUtil.OBJECT_MAPPER.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":378}")); + JacksonUtil.toJsonNode("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), + JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":378}")); } - private RuleNode getPushToCloudNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getPushToCloudNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", "Push to cloud", - JacksonUtil.OBJECT_MAPPER.readTree("{\"scope\":\"SERVER_SCOPE\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":1129,\"layoutY\":52}")); + JacksonUtil.toJsonNode("{\"scope\":\"SERVER_SCOPE\"}"), + JacksonUtil.toJsonNode("{\"layoutX\":1129,\"layoutY\":52}")); } - private RuleNode getAcknowledgeNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getAcknowledgeNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.flow.TbAckNode", "Acknowledge node", - JacksonUtil.OBJECT_MAPPER.readTree("{\"version\":0}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":177,\"layoutY\":703}")); + JacksonUtil.toJsonNode("{\"version\":0}"), + JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":177,\"layoutY\":703}")); } - private RuleNode getDeviceProfileNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getDeviceProfileNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", "Device Profile Node", - JacksonUtil.OBJECT_MAPPER.readTree("{\"persistAlarmRulesState\":false,\"fetchAlarmRulesStateOnStart\":false}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \\\"Success\\\" relation type.\",\"layoutX\":187,\"layoutY\":468}")); + JacksonUtil.toJsonNode("{\"persistAlarmRulesState\":false,\"fetchAlarmRulesStateOnStart\":false}"), + JacksonUtil.toJsonNode("{\"description\":\"Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \\\"Success\\\" relation type.\",\"layoutX\":187,\"layoutY\":468}")); } - private RuleNode getSaveClientAttributesNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getSaveClientAttributesNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", "Save Client Attributes", - JacksonUtil.OBJECT_MAPPER.readTree("{\"scope\":\"CLIENT_SCOPE\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":52}")); + JacksonUtil.toJsonNode("{\"scope\":\"CLIENT_SCOPE\"}"), + JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":52}")); } - private RuleNode getLogRpcFromDeviceNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getLogRpcFromDeviceNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.action.TbLogNode", "Log RPC from Device", - JacksonUtil.OBJECT_MAPPER.readTree("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":825,\"layoutY\":266}")); + JacksonUtil.toJsonNode("{\"jsScript\":\"return '\\\\nIncoming message:\\\\n' + JSON.stringify(msg) + '\\\\nIncoming metadata:\\\\n' + JSON.stringify(metadata);\"}"), + JacksonUtil.toJsonNode("{\"layoutX\":825,\"layoutY\":266}")); } - private RuleNode getRpcCallRequestNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getRpcCallRequestNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", "RPC Call Request", - JacksonUtil.OBJECT_MAPPER.readTree("{\"timeoutInSeconds\":60}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"layoutX\":824,\"layoutY\":466}")); + JacksonUtil.toJsonNode("{\"timeoutInSeconds\":60}"), + JacksonUtil.toJsonNode("{\"layoutX\":824,\"layoutY\":466}")); } - private RuleNode getPushToAnalyticsNode(RuleChainId ruleChainId) throws JsonProcessingException { + private RuleNode getPushToAnalyticsNode(RuleChainId ruleChainId) { return createRuleNode(ruleChainId, "org.thingsboard.rule.engine.flow.TbRuleChainInputNode", "Push to Analytics", - JacksonUtil.OBJECT_MAPPER.readTree("{\"ruleChainId\":\"af588000-6c7c-11ec-bafd-c9a47a5c8d99\"}"), - JacksonUtil.OBJECT_MAPPER.readTree("{\"description\":\"\",\"layoutX\":477,\"layoutY\":560}")); + JacksonUtil.toJsonNode("{\"ruleChainId\":\"af588000-6c7c-11ec-bafd-c9a47a5c8d99\"}"), + JacksonUtil.toJsonNode("{\"description\":\"\",\"layoutX\":477,\"layoutY\":560}")); } -} \ No newline at end of file +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java b/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java index 1ef0006e87..fd17618bd1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java +++ b/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java @@ -40,9 +40,9 @@ import org.thingsboard.server.common.data.notification.rule.trigger.config.Alarm import org.thingsboard.server.common.data.notification.rule.trigger.config.ApiUsageLimitNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.DeviceActivityNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.DeviceActivityNotificationRuleTriggerConfig.DeviceEvent; +import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeCommunicationFailureNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeConnectionNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeConnectionNotificationRuleTriggerConfig.EdgeConnectivityEvent; -import org.thingsboard.server.common.data.notification.rule.trigger.config.EdgeCommunicationFailureNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.EntitiesLimitNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.EntityActionNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.NewPlatformVersionNotificationRuleTriggerConfig; @@ -347,7 +347,7 @@ public class DefaultNotifications { public static final DefaultNotification edgeCommunicationFailures = DefaultNotification.builder() .name("Edge communication failure notification") .type(NotificationType.EDGE_COMMUNICATION_FAILURE) - .subject("Edge '${edgeName}' communication failure occured") + .subject("Edge '${edgeName}' communication failure occurred") .text("Failure message: '${failureMsg}'") .icon("error").color(RED_COLOR) .button("Go to Edge").link("/edgeManagement/instances/${edgeId}") diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index f3a7ee77c7..8783770be3 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -525,12 +525,11 @@ public class RestClient implements Closeable { } public PageData getAlarmComments(AlarmId alarmId, PageLink pageLink) { - String urlSecondPart = "/api/alarm/{alarmId}/comment"; Map params = new HashMap<>(); params.put("alarmId", alarmId.getId().toString()); - + addPageLinkToParam(params, pageLink); return restTemplate.exchange( - baseURL + urlSecondPart + "&" + getUrlParams(pageLink), + baseURL + "/api/alarm/{alarmId}/comment?" + getUrlParams(pageLink), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference>() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java index 80d73b9478..4d651f48fa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.RuleChainId; @@ -63,6 +64,9 @@ public class EntitiesFieldsAsyncLoader { case ENTITY_VIEW: return toEntityFieldsDataAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) originatorId), EntityFieldsData::new, ctx); + case EDGE: + return toEntityFieldsDataAsync(ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) originatorId), + EntityFieldsData::new, ctx); default: return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType: " + originatorId.getEntityType())); } From 841514384209e5a976e9b3c64d53cd1d99de4fb2 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Thu, 8 Feb 2024 16:51:49 +0200 Subject: [PATCH 07/10] Add TO_SERVER_RPC_REQUEST to AbstractTbMsgPushNode --- .../thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 31fda250d0..4176ead31c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -47,6 +47,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_UPDATED; +import static org.thingsboard.server.common.data.msg.TbMsgType.TO_SERVER_RPC_REQUEST; @Slf4j public abstract class AbstractTbMsgPushNode implements TbNode { @@ -176,6 +177,6 @@ public abstract class AbstractTbMsgPushNode Date: Fri, 9 Feb 2024 09:52:25 +0200 Subject: [PATCH 08/10] Fix tests --- .../util/EntitiesFieldsAsyncLoaderTest.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java index 5967023354..f3e61dcffc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoaderTest.java @@ -37,10 +37,12 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; @@ -52,6 +54,7 @@ import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; @@ -95,6 +98,8 @@ public class EntitiesFieldsAsyncLoaderTest { private RuleChainService ruleChainServiceMock; @Mock private EntityViewService entityViewServiceMock; + @Mock + private EdgeService edgeServiceMock; @BeforeAll public static void setup() { @@ -108,7 +113,8 @@ public class EntitiesFieldsAsyncLoaderTest { EntityType.DEVICE, EntityType.ALARM, EntityType.RULE_CHAIN, - EntityType.ENTITY_VIEW + EntityType.ENTITY_VIEW, + EntityType.EDGE ); } @@ -228,6 +234,14 @@ public class EntitiesFieldsAsyncLoaderTest { when(ctxMock.getEntityViewService()).thenReturn(entityViewServiceMock); doReturn(entityView).when(entityViewServiceMock).findEntityViewByIdAsync(eq(TENANT_ID), any()); + break; + case EDGE: + var edge = Futures.immediateFuture(entityDoesNotExist ? null : new Edge(new EdgeId(RANDOM_UUID))); + + when(ctxMock.getDbCallbackExecutor()).thenReturn(DB_EXECUTOR); + when(ctxMock.getEdgeService()).thenReturn(edgeServiceMock); + doReturn(edge).when(edgeServiceMock).findEdgeByIdAsync(eq(TENANT_ID), any()); + break; default: throw new RuntimeException("Unexpected EntityType: " + entityType); @@ -252,6 +266,8 @@ public class EntitiesFieldsAsyncLoaderTest { return new RuleChain((RuleChainId) entityId); case ENTITY_VIEW: return new EntityView((EntityViewId) entityId); + case EDGE: + return new Edge((EdgeId) entityId); default: throw new RuntimeException("Unexpected EntityType: " + entityId.getEntityType()); } From c62a95835374ddc1dd2cd73ae4ae4380d2b2468c Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 9 Feb 2024 10:46:14 +0200 Subject: [PATCH 09/10] Improve DeviceEdgeTest --- .../server/edge/DeviceEdgeTest.java | 37 ++++++++++++++++++- .../server/edge/imitator/EdgeImitator.java | 7 ++-- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 488bd4ed70..458bb3240e 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -711,7 +711,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest { edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); Assert.assertTrue(edgeImitator.waitForResponses()); - Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(30, TimeUnit.SECONDS)); + Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(TIMEOUT, TimeUnit.SECONDS)); Assert.assertEquals(JacksonUtil.newObjectNode().put(attrKey, attrValue), JacksonUtil.fromBytes(onUpdateCallback.getPayloadBytes())); @@ -798,7 +798,21 @@ public class DeviceEdgeTest extends AbstractEdgeTest { // clean up stored edge events edgeEventService.cleanupEvents(1); - // perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly + // edge is disconnected: perform rpc call - no edge event saved + doPostAsync( + "/api/rpc/oneway/" + device.getId().getId().toString(), + JacksonUtil.toString(createDefaultRpc()), + String.class, + status().isOk()); + Awaitility.await() + .atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> { + PageData result = edgeEventService.findEdgeEvents(tenantId, tmpEdge.getId(), 0L, null, new TimePageLink(1)); + return result.getTotalElements() == 0; + }); + + // edge is connected: perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly + simulateEdgeActivation(tmpEdge); doPostAsync( "/api/rpc/oneway/" + device.getId().getId().toString(), JacksonUtil.toString(createDefaultRpc()), @@ -857,4 +871,23 @@ public class DeviceEdgeTest extends AbstractEdgeTest { return rpc; } + + private void simulateEdgeActivation(Edge edge) throws Exception { + ObjectNode attributes = JacksonUtil.newObjectNode(); + attributes.put("active", true); + doPost("/api/plugins/telemetry/EDGE/" + edge.getId() + "/attributes/" + DataConstants.SERVER_SCOPE, attributes); + Awaitility.await() + .atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> { + List> values = doGetAsyncTyped("/api/plugins/telemetry/EDGE/" + edge.getId() + + "/values/attributes/SERVER_SCOPE", new TypeReference<>() {}); + Optional> activeAttrOpt = values.stream().filter(att -> att.get("key").equals("active")).findFirst(); + if (activeAttrOpt.isEmpty()) { + return false; + } + Map activeAttr = activeAttrOpt.get(); + return "true".equals(activeAttr.get("value").toString()); + }); + } + } diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index f5e018ed3a..916e5790e9 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.edge.rpc.EdgeGrpcClient; import org.thingsboard.edge.rpc.EdgeRpcClient; +import org.thingsboard.server.controller.AbstractWebTest; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; @@ -72,8 +73,6 @@ import java.util.stream.Collectors; @Slf4j public class EdgeImitator { - public static final int TIMEOUT_IN_SECONDS = 30; - private String routingKey; private String routingSecret; @@ -344,7 +343,7 @@ public class EdgeImitator { } public boolean waitForMessages() throws InterruptedException { - return waitForMessages(TIMEOUT_IN_SECONDS); + return waitForMessages(AbstractWebTest.TIMEOUT); } public boolean waitForMessages(int timeoutInSeconds) throws InterruptedException { @@ -359,7 +358,7 @@ public class EdgeImitator { } public boolean waitForResponses() throws InterruptedException { - return responsesLatch.await(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); + return responsesLatch.await(AbstractWebTest.TIMEOUT, TimeUnit.SECONDS); } public void expectResponsesAmount(int messageAmount) { From a4c5617cc2a849bb74a1337172f65ec3fe1a6e95 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 12 Feb 2024 16:24:02 +0200 Subject: [PATCH 10/10] Improve edge service to find active state based on persistToTelemetry else attribute --- .../device/DeviceActorMessageProcessor.java | 3 +-- .../rpc/processor/BaseEdgeProcessorTest.java | 4 +++ .../server/dao/edge/EdgeService.java | 2 ++ .../server/dao/edge/EdgeServiceImpl.java | 26 ++++++++++++++++--- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 9f92838712..f1c0260124 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -213,8 +213,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (systemContext.isEdgesEnabled() && edgeId != null) { log.debug("[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId); try { - Optional edgeAttributeOpt = systemContext.getAttributesService().find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE).get(); - if (edgeAttributeOpt.isPresent() && edgeAttributeOpt.get().getBooleanValue().orElse(false)) { + if (systemContext.getEdgeService().isEdgeActiveAsync(tenantId, edgeId, DefaultDeviceStateService.ACTIVITY_STATE).get()) { saveRpcRequestToEdgeQueue(request, requestId).get(); } else { log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}. The Edge is currently offline or unreachable", tenantId, deviceId, edgeId.getId(), request); diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index 9c45977a7c..835b39caf9 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -55,6 +55,7 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; @@ -209,6 +210,9 @@ public abstract class BaseEdgeProcessorTest { @MockBean protected AttributesService attributesService; + @MockBean + protected TimeseriesService timeseriesService; + @MockBean protected TbClusterService tbClusterService; diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index 15e55d8713..d6d5551ccf 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -91,4 +91,6 @@ public interface EdgeService extends EntityDaoService { PageData findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink); String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId, String tbRuleChainInputNodeClassName); + + ListenableFuture isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String activityState); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 80eea0a478..136fd0485d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -30,10 +30,10 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; @@ -48,14 +48,15 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent; import org.thingsboard.server.dao.exception.DataValidationException; @@ -64,7 +65,7 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.Validator; -import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import javax.annotation.Nullable; @@ -105,7 +106,10 @@ public class EdgeServiceImpl extends AbstractCachedEntityService edgeValidator; @@ -113,6 +117,8 @@ public class EdgeServiceImpl extends AbstractCachedEntityService isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String key) { + ListenableFuture> futureKvEntry; + if (persistToTelemetry) { + futureKvEntry = timeseriesService.findLatest(tenantId, edgeId, key); + } else { + futureKvEntry = attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, key); + } + return Futures.transformAsync(futureKvEntry, kvEntryOpt -> + Futures.immediateFuture(kvEntryOpt.flatMap(KvEntry::getBooleanValue).orElse(false)), MoreExecutors.directExecutor()); + } + private List findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { List result = new ArrayList<>(); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);