From 80f1ec2fcd0e205b8c59d2414a160c605d82a314 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 10 Apr 2023 17:52:05 +0300 Subject: [PATCH] Handle properly cases when state persistent to timeseries --- .../engine/edge/AbstractTbMsgPushNode.java | 22 ++++++--- .../engine/edge/TbMsgPushToEdgeNodeTest.java | 49 ++++++++++++------- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index b918cba883..318b38aa39 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -77,9 +77,9 @@ public abstract class AbstractTbMsgPushNode entityBody = new HashMap<>(); Map metadata = msg.getMetaData().getData(); + EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType, metadata); + Map entityBody = new HashMap<>(); JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); switch (actionType) { case ATTRIBUTES_UPDATED: @@ -148,21 +148,27 @@ public abstract class AbstractTbMsgPushNode metadata) { EdgeEventActionType actionType; if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType) || DataConstants.TIMESERIES_UPDATED.equals(msgType)) { actionType = EdgeEventActionType.TIMESERIES_UPDATED; - } else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType) - || DataConstants.CONNECT_EVENT.equals(msgType) - || DataConstants.DISCONNECT_EVENT.equals(msgType) - || DataConstants.ACTIVITY_EVENT.equals(msgType) - || DataConstants.INACTIVITY_EVENT.equals(msgType)) { + } else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) { actionType = EdgeEventActionType.ATTRIBUTES_UPDATED; } else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) { actionType = EdgeEventActionType.POST_ATTRIBUTES; } else if (DataConstants.ATTRIBUTES_DELETED.equals(msgType)) { actionType = EdgeEventActionType.ATTRIBUTES_DELETED; + } else if (DataConstants.CONNECT_EVENT.equals(msgType) + || DataConstants.DISCONNECT_EVENT.equals(msgType) + || DataConstants.ACTIVITY_EVENT.equals(msgType) + || DataConstants.INACTIVITY_EVENT.equals(msgType)) { + String scope = metadata.get(SCOPE); + if ( StringUtils.isEmpty(scope)) { + actionType = EdgeEventActionType.TIMESERIES_UPDATED; + } else { + actionType = EdgeEventActionType.ATTRIBUTES_UPDATED; + } } else { log.warn("Unsupported msg type [{}]", msgType); throw new IllegalArgumentException("Unsupported msg type: " + msgType); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeTest.java index 339404b104..84deafe7e4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeTest.java @@ -115,23 +115,38 @@ public class TbMsgPushToEdgeNodeTest { List miscEvents = List.of(DataConstants.CONNECT_EVENT, DataConstants.DISCONNECT_EVENT, DataConstants.ACTIVITY_EVENT, DataConstants.INACTIVITY_EVENT); for (String event : miscEvents) { - Mockito.when(ctx.getTenantId()).thenReturn(tenantId); - Mockito.when(ctx.getEdgeService()).thenReturn(edgeService); - Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService); - Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); - Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create()); - - TbMsg msg = TbMsg.newMsg(event, new EdgeId(UUID.randomUUID()), new TbMsgMetaData(), - TbMsgDataType.JSON, "{\"lastConnectTs\":1}", null, null); - - node.onMsg(ctx, msg); - - ArgumentMatcher eventArgumentMatcher = edgeEvent -> - edgeEvent.getAction().equals(EdgeEventActionType.ATTRIBUTES_UPDATED) - && edgeEvent.getBody().get("kv").get("lastConnectTs").asInt() == 1; - verify(edgeEventService).saveAsync(Mockito.argThat(eventArgumentMatcher)); - - Mockito.reset(ctx, edgeEventService); + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue(DataConstants.SCOPE, DataConstants.SERVER_SCOPE); + testEvent(event, metaData, EdgeEventActionType.ATTRIBUTES_UPDATED, "kv"); } } + + @Test + public void testMiscEventsProcessedAsTimeseriesUpdated() { + List miscEvents = List.of(DataConstants.CONNECT_EVENT, DataConstants.DISCONNECT_EVENT, + DataConstants.ACTIVITY_EVENT, DataConstants.INACTIVITY_EVENT); + for (String event : miscEvents) { + testEvent(event, new TbMsgMetaData(), EdgeEventActionType.TIMESERIES_UPDATED, "data"); + } + } + + private void testEvent(String event, TbMsgMetaData metaData, EdgeEventActionType expectedType, String dataKey) { + Mockito.when(ctx.getTenantId()).thenReturn(tenantId); + Mockito.when(ctx.getEdgeService()).thenReturn(edgeService); + Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService); + Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); + Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create()); + + TbMsg msg = TbMsg.newMsg(event, new EdgeId(UUID.randomUUID()), metaData, + TbMsgDataType.JSON, "{\"lastConnectTs\":1}", null, null); + + node.onMsg(ctx, msg); + + ArgumentMatcher eventArgumentMatcher = edgeEvent -> + edgeEvent.getAction().equals(expectedType) + && edgeEvent.getBody().get(dataKey).get("lastConnectTs").asInt() == 1; + verify(edgeEventService).saveAsync(Mockito.argThat(eventArgumentMatcher)); + + Mockito.reset(ctx, edgeEventService); + } }