From fa8c3cc6e5f1406d7d0c61a24c41c26c9bbefb57 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 10 Apr 2023 17:35:42 +0300 Subject: [PATCH] Push edge connect/disconnect events to rule chain --- .../service/edge/rpc/EdgeGrpcService.java | 41 +++++++++++++++++-- .../engine/edge/AbstractTbMsgPushNode.java | 12 +++++- .../engine/edge/TbMsgPushToEdgeNodeTest.java | 28 +++++++++++++ 3 files changed, 76 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index f695fa1d0e..946db6e38f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.edge.rpc; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import io.grpc.Server; @@ -25,6 +26,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; @@ -35,6 +37,9 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.EdgeSessionMsg; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; @@ -66,6 +71,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT; +import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT; +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; + @Service @Slf4j @ConditionalOnProperty(prefix = "edges", value = "enabled", havingValue = "true") @@ -261,7 +270,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i newEventLock.unlock(); } save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true); - save(edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, System.currentTimeMillis()); + long lastConnectTs = System.currentTimeMillis(); + save(edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, lastConnectTs); + pushRuleEngineMessage(edgeGrpcSession.getEdge().getTenantId(), edgeId, lastConnectTs, CONNECT_EVENT); cancelScheduleEdgeEventsCheck(edgeId); scheduleEdgeEventsCheck(edgeGrpcSession); } @@ -365,7 +376,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void onEdgeDisconnect(EdgeId edgeId) { log.info("[{}] edge disconnected!", edgeId); - sessions.remove(edgeId); + EdgeGrpcSession removed = sessions.remove(edgeId); final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); newEventLock.lock(); try { @@ -374,7 +385,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i newEventLock.unlock(); } save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false); - save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, System.currentTimeMillis()); + long lastDisconnectTs = System.currentTimeMillis(); + save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, lastDisconnectTs); + pushRuleEngineMessage(removed.getEdge().getTenantId(), edgeId, lastDisconnectTs, DISCONNECT_EVENT); cancelScheduleEdgeEventsCheck(edgeId); } @@ -423,4 +436,26 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.warn("[{}] Failed to update attribute [{}] with value [{}]", edgeId, key, value, t); } } + + private void pushRuleEngineMessage(TenantId tenantId, EdgeId edgeId, long ts, String msgType) { + try { + ObjectNode edgeState = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + if (msgType.equals(CONNECT_EVENT)) { + edgeState.put(DefaultDeviceStateService.ACTIVITY_STATE, true); + edgeState.put(DefaultDeviceStateService.LAST_CONNECT_TIME, ts); + } else { + edgeState.put(DefaultDeviceStateService.ACTIVITY_STATE, false); + edgeState.put(DefaultDeviceStateService.LAST_DISCONNECT_TIME, ts); + } + String data = JacksonUtil.toString(edgeState); + TbMsgMetaData md = new TbMsgMetaData(); + if (!persistToTelemetry) { + md.putValue(DataConstants.SCOPE, SERVER_SCOPE); + } + TbMsg tbMsg = TbMsg.newMsg(msgType, edgeId, md, TbMsgDataType.JSON, data); + clusterService.pushMsgToRuleEngine(tenantId, edgeId, tbMsg, null); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push {}", tenantId, edgeId, msgType, e); + } + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 7be7dae2de..b918cba883 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -153,7 +153,11 @@ public abstract class AbstractTbMsgPushNode miscEvents = List.of(DataConstants.CONNECT_EVENT, DataConstants.DISCONNECT_EVENT, + DataConstants.ACTIVITY_EVENT, DataConstants.INACTIVITY_EVENT); + for (String event : miscEvents) { + Mockito.when(ctx.getTenantId()).thenReturn(tenantId); + Mockito.when(ctx.getEdgeService()).thenReturn(edgeService); + Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService); + Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); + Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create()); + + TbMsg msg = TbMsg.newMsg(event, new EdgeId(UUID.randomUUID()), new TbMsgMetaData(), + TbMsgDataType.JSON, "{\"lastConnectTs\":1}", null, null); + + node.onMsg(ctx, msg); + + ArgumentMatcher eventArgumentMatcher = edgeEvent -> + edgeEvent.getAction().equals(EdgeEventActionType.ATTRIBUTES_UPDATED) + && edgeEvent.getBody().get("kv").get("lastConnectTs").asInt() == 1; + verify(edgeEventService).saveAsync(Mockito.argThat(eventArgumentMatcher)); + + Mockito.reset(ctx, edgeEventService); + } + } }