Push edge connect/disconnect events to rule chain
This commit is contained in:
parent
eb9cd79c7e
commit
fa8c3cc6e5
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import io.grpc.Server;
|
||||
@ -25,6 +26,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
@ -35,6 +37,9 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgDataType;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
|
||||
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
|
||||
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
||||
@ -66,6 +71,10 @@ import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
|
||||
import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT;
|
||||
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(prefix = "edges", value = "enabled", havingValue = "true")
|
||||
@ -261,7 +270,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
newEventLock.unlock();
|
||||
}
|
||||
save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true);
|
||||
save(edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, System.currentTimeMillis());
|
||||
long lastConnectTs = System.currentTimeMillis();
|
||||
save(edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, lastConnectTs);
|
||||
pushRuleEngineMessage(edgeGrpcSession.getEdge().getTenantId(), edgeId, lastConnectTs, CONNECT_EVENT);
|
||||
cancelScheduleEdgeEventsCheck(edgeId);
|
||||
scheduleEdgeEventsCheck(edgeGrpcSession);
|
||||
}
|
||||
@ -365,7 +376,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
|
||||
private void onEdgeDisconnect(EdgeId edgeId) {
|
||||
log.info("[{}] edge disconnected!", edgeId);
|
||||
sessions.remove(edgeId);
|
||||
EdgeGrpcSession removed = sessions.remove(edgeId);
|
||||
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
|
||||
newEventLock.lock();
|
||||
try {
|
||||
@ -374,7 +385,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
newEventLock.unlock();
|
||||
}
|
||||
save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false);
|
||||
save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, System.currentTimeMillis());
|
||||
long lastDisconnectTs = System.currentTimeMillis();
|
||||
save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, lastDisconnectTs);
|
||||
pushRuleEngineMessage(removed.getEdge().getTenantId(), edgeId, lastDisconnectTs, DISCONNECT_EVENT);
|
||||
cancelScheduleEdgeEventsCheck(edgeId);
|
||||
}
|
||||
|
||||
@ -423,4 +436,26 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
log.warn("[{}] Failed to update attribute [{}] with value [{}]", edgeId, key, value, t);
|
||||
}
|
||||
}
|
||||
|
||||
private void pushRuleEngineMessage(TenantId tenantId, EdgeId edgeId, long ts, String msgType) {
|
||||
try {
|
||||
ObjectNode edgeState = JacksonUtil.OBJECT_MAPPER.createObjectNode();
|
||||
if (msgType.equals(CONNECT_EVENT)) {
|
||||
edgeState.put(DefaultDeviceStateService.ACTIVITY_STATE, true);
|
||||
edgeState.put(DefaultDeviceStateService.LAST_CONNECT_TIME, ts);
|
||||
} else {
|
||||
edgeState.put(DefaultDeviceStateService.ACTIVITY_STATE, false);
|
||||
edgeState.put(DefaultDeviceStateService.LAST_DISCONNECT_TIME, ts);
|
||||
}
|
||||
String data = JacksonUtil.toString(edgeState);
|
||||
TbMsgMetaData md = new TbMsgMetaData();
|
||||
if (!persistToTelemetry) {
|
||||
md.putValue(DataConstants.SCOPE, SERVER_SCOPE);
|
||||
}
|
||||
TbMsg tbMsg = TbMsg.newMsg(msgType, edgeId, md, TbMsgDataType.JSON, data);
|
||||
clusterService.pushMsgToRuleEngine(tenantId, edgeId, tbMsg, null);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}][{}] Failed to push {}", tenantId, edgeId, msgType, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,7 +153,11 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
||||
if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)
|
||||
|| DataConstants.TIMESERIES_UPDATED.equals(msgType)) {
|
||||
actionType = EdgeEventActionType.TIMESERIES_UPDATED;
|
||||
} else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) {
|
||||
} else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)
|
||||
|| DataConstants.CONNECT_EVENT.equals(msgType)
|
||||
|| DataConstants.DISCONNECT_EVENT.equals(msgType)
|
||||
|| DataConstants.ACTIVITY_EVENT.equals(msgType)
|
||||
|| DataConstants.INACTIVITY_EVENT.equals(msgType)) {
|
||||
actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
|
||||
} else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
|
||||
actionType = EdgeEventActionType.POST_ATTRIBUTES;
|
||||
@ -172,7 +176,11 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
||||
|| DataConstants.ATTRIBUTES_UPDATED.equals(msgType)
|
||||
|| DataConstants.ATTRIBUTES_DELETED.equals(msgType)
|
||||
|| DataConstants.TIMESERIES_UPDATED.equals(msgType)
|
||||
|| DataConstants.ALARM.equals(msgType);
|
||||
|| DataConstants.ALARM.equals(msgType)
|
||||
|| DataConstants.CONNECT_EVENT.equals(msgType)
|
||||
|| DataConstants.DISCONNECT_EVENT.equals(msgType)
|
||||
|| DataConstants.ACTIVITY_EVENT.equals(msgType)
|
||||
|| DataConstants.INACTIVITY_EVENT.equals(msgType);
|
||||
}
|
||||
|
||||
protected boolean isSupportedOriginator(EntityType entityType) {
|
||||
|
||||
@ -19,6 +19,7 @@ import com.google.common.util.concurrent.SettableFuture;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
@ -28,6 +29,8 @@ import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -106,4 +109,29 @@ public class TbMsgPushToEdgeNodeTest {
|
||||
|
||||
verify(edgeEventService).saveAsync(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMiscEventsProcessedAsAttributesUpdated() {
|
||||
List<String> miscEvents = List.of(DataConstants.CONNECT_EVENT, DataConstants.DISCONNECT_EVENT,
|
||||
DataConstants.ACTIVITY_EVENT, DataConstants.INACTIVITY_EVENT);
|
||||
for (String event : miscEvents) {
|
||||
Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
|
||||
Mockito.when(ctx.getEdgeService()).thenReturn(edgeService);
|
||||
Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService);
|
||||
Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor);
|
||||
Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create());
|
||||
|
||||
TbMsg msg = TbMsg.newMsg(event, new EdgeId(UUID.randomUUID()), new TbMsgMetaData(),
|
||||
TbMsgDataType.JSON, "{\"lastConnectTs\":1}", null, null);
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentMatcher<EdgeEvent> eventArgumentMatcher = edgeEvent ->
|
||||
edgeEvent.getAction().equals(EdgeEventActionType.ATTRIBUTES_UPDATED)
|
||||
&& edgeEvent.getBody().get("kv").get("lastConnectTs").asInt() == 1;
|
||||
verify(edgeEventService).saveAsync(Mockito.argThat(eventArgumentMatcher));
|
||||
|
||||
Mockito.reset(ctx, edgeEventService);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user