Handle properly cases when state persistent to timeseries
This commit is contained in:
parent
fa8c3cc6e5
commit
80f1ec2fcd
@ -77,9 +77,9 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
||||
EdgeEventActionType actionType = getAlarmActionType(msg);
|
||||
return buildEvent(ctx.getTenantId(), actionType, getUUIDFromMsgData(msg), getAlarmEventType(), null);
|
||||
} else {
|
||||
EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType);
|
||||
Map<String, Object> entityBody = new HashMap<>();
|
||||
Map<String, String> metadata = msg.getMetaData().getData();
|
||||
EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType, metadata);
|
||||
Map<String, Object> entityBody = new HashMap<>();
|
||||
JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData());
|
||||
switch (actionType) {
|
||||
case ATTRIBUTES_UPDATED:
|
||||
@ -148,21 +148,27 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
|
||||
return scope;
|
||||
}
|
||||
|
||||
protected EdgeEventActionType getEdgeEventActionTypeByMsgType(String msgType) {
|
||||
protected EdgeEventActionType getEdgeEventActionTypeByMsgType(String msgType, Map<String, String> metadata) {
|
||||
EdgeEventActionType actionType;
|
||||
if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)
|
||||
|| DataConstants.TIMESERIES_UPDATED.equals(msgType)) {
|
||||
actionType = EdgeEventActionType.TIMESERIES_UPDATED;
|
||||
} else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)
|
||||
|| DataConstants.CONNECT_EVENT.equals(msgType)
|
||||
|| DataConstants.DISCONNECT_EVENT.equals(msgType)
|
||||
|| DataConstants.ACTIVITY_EVENT.equals(msgType)
|
||||
|| DataConstants.INACTIVITY_EVENT.equals(msgType)) {
|
||||
} else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) {
|
||||
actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
|
||||
} else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
|
||||
actionType = EdgeEventActionType.POST_ATTRIBUTES;
|
||||
} else if (DataConstants.ATTRIBUTES_DELETED.equals(msgType)) {
|
||||
actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
|
||||
} else if (DataConstants.CONNECT_EVENT.equals(msgType)
|
||||
|| DataConstants.DISCONNECT_EVENT.equals(msgType)
|
||||
|| DataConstants.ACTIVITY_EVENT.equals(msgType)
|
||||
|| DataConstants.INACTIVITY_EVENT.equals(msgType)) {
|
||||
String scope = metadata.get(SCOPE);
|
||||
if ( StringUtils.isEmpty(scope)) {
|
||||
actionType = EdgeEventActionType.TIMESERIES_UPDATED;
|
||||
} else {
|
||||
actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
|
||||
}
|
||||
} else {
|
||||
log.warn("Unsupported msg type [{}]", msgType);
|
||||
throw new IllegalArgumentException("Unsupported msg type: " + msgType);
|
||||
|
||||
@ -115,23 +115,38 @@ public class TbMsgPushToEdgeNodeTest {
|
||||
List<String> miscEvents = List.of(DataConstants.CONNECT_EVENT, DataConstants.DISCONNECT_EVENT,
|
||||
DataConstants.ACTIVITY_EVENT, DataConstants.INACTIVITY_EVENT);
|
||||
for (String event : miscEvents) {
|
||||
Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
|
||||
Mockito.when(ctx.getEdgeService()).thenReturn(edgeService);
|
||||
Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService);
|
||||
Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor);
|
||||
Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create());
|
||||
|
||||
TbMsg msg = TbMsg.newMsg(event, new EdgeId(UUID.randomUUID()), new TbMsgMetaData(),
|
||||
TbMsgDataType.JSON, "{\"lastConnectTs\":1}", null, null);
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentMatcher<EdgeEvent> eventArgumentMatcher = edgeEvent ->
|
||||
edgeEvent.getAction().equals(EdgeEventActionType.ATTRIBUTES_UPDATED)
|
||||
&& edgeEvent.getBody().get("kv").get("lastConnectTs").asInt() == 1;
|
||||
verify(edgeEventService).saveAsync(Mockito.argThat(eventArgumentMatcher));
|
||||
|
||||
Mockito.reset(ctx, edgeEventService);
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
metaData.putValue(DataConstants.SCOPE, DataConstants.SERVER_SCOPE);
|
||||
testEvent(event, metaData, EdgeEventActionType.ATTRIBUTES_UPDATED, "kv");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMiscEventsProcessedAsTimeseriesUpdated() {
|
||||
List<String> miscEvents = List.of(DataConstants.CONNECT_EVENT, DataConstants.DISCONNECT_EVENT,
|
||||
DataConstants.ACTIVITY_EVENT, DataConstants.INACTIVITY_EVENT);
|
||||
for (String event : miscEvents) {
|
||||
testEvent(event, new TbMsgMetaData(), EdgeEventActionType.TIMESERIES_UPDATED, "data");
|
||||
}
|
||||
}
|
||||
|
||||
private void testEvent(String event, TbMsgMetaData metaData, EdgeEventActionType expectedType, String dataKey) {
|
||||
Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
|
||||
Mockito.when(ctx.getEdgeService()).thenReturn(edgeService);
|
||||
Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService);
|
||||
Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor);
|
||||
Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create());
|
||||
|
||||
TbMsg msg = TbMsg.newMsg(event, new EdgeId(UUID.randomUUID()), metaData,
|
||||
TbMsgDataType.JSON, "{\"lastConnectTs\":1}", null, null);
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentMatcher<EdgeEvent> eventArgumentMatcher = edgeEvent ->
|
||||
edgeEvent.getAction().equals(expectedType)
|
||||
&& edgeEvent.getBody().get(dataKey).get("lastConnectTs").asInt() == 1;
|
||||
verify(edgeEventService).saveAsync(Mockito.argThat(eventArgumentMatcher));
|
||||
|
||||
Mockito.reset(ctx, edgeEventService);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user