diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java index 2e525b56ed..e364ea2c18 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java @@ -93,7 +93,7 @@ public abstract class TbAbstractGetAttributesNode clientAttributes; + private List serverAttributes; + private List sharedAttributes; + private List tsKeys; + + @Before + public void before() throws TbNodeException { + dbExecutor = new AbstractListeningExecutor() { + @Override + protected int getThreadPollSize() { + return 3; + } + }; + dbExecutor.init(); + + Mockito.reset(ctx); + Mockito.reset(attributesService); + Mockito.reset(tsService); + + Mockito.reset(ctx); + Mockito.reset(attributesService); + Mockito.reset(tsService); + + lenient().when(ctx.getAttributesService()).thenReturn(attributesService); + lenient().when(ctx.getTimeseriesService()).thenReturn(tsService); + lenient().when(ctx.getTenantId()).thenReturn(tenantId); + lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor); + + clientAttributes = getAttributeNames("client"); + serverAttributes = getAttributeNames("server"); + sharedAttributes = getAttributeNames("shared"); + tsKeys = List.of("temperature", "humidity", "unknown"); + + Mockito.when(attributesService.find(tenantId, originator, DataConstants.CLIENT_SCOPE, clientAttributes)) + .thenReturn(Futures.immediateFuture(getListAttributeKvEntry(clientAttributes))); + + + Mockito.when(attributesService.find(tenantId, originator, DataConstants.SERVER_SCOPE, serverAttributes)) + .thenReturn(Futures.immediateFuture(getListAttributeKvEntry(serverAttributes))); + + + Mockito.when(attributesService.find(tenantId, originator, DataConstants.SHARED_SCOPE, sharedAttributes)) + .thenReturn(Futures.immediateFuture(getListAttributeKvEntry(sharedAttributes))); + + Mockito.when(tsService.findLatest(tenantId, originator, tsKeys)) + .thenReturn(Futures.immediateFuture(getListTelemetryKvEntry(tsKeys))); + } + + @After + public void after() { + dbExecutor.destroy(); + } + + @Test + public void fetchToMetadata_whenOnMsg_then_success() throws Exception { + TbGetAttributesNode node = initNode(false, false, false); + TbMsg msg = getTbMsg(originator); + node.onMsg(ctx, msg); + + TbMsg resultMsg = checkMsg(); + TbMsgMetaData msgMetaData = resultMsg.getMetaData(); + + //check attributes + checkAttributes(clientAttributes, "cs_", false, msgMetaData, null); + checkAttributes(serverAttributes, "ss_", false, msgMetaData, null); + checkAttributes(sharedAttributes, "shared_", false, msgMetaData, null); + + //check timeseries + checkTs(tsKeys, false, false, msgMetaData, null); + } + + @Test + public void fetchToData_whenOnMsg_then_success() throws Exception { + TbGetAttributesNode node = initNode(true, true, false); + TbMsg msg = getTbMsg(originator); + node.onMsg(ctx, msg); + + TbMsg resultMsg = checkMsg(); + JsonNode msgData = JacksonUtil.toJsonNode(resultMsg.getData()); + + //check attributes + checkAttributes(clientAttributes, "cs_", true, null, msgData); + checkAttributes(serverAttributes, "ss_", true, null, msgData); + checkAttributes(sharedAttributes, "shared_", true, null, msgData); + + //check timeseries with ts + checkTs(tsKeys, true, true, null, msgData); + } + + @Test + public void fetchToData_whenOnMsg_then_failure() throws Exception { + TbGetAttributesNode node = initNode(true, true, true); + TbMsg msg = getTbMsg(originator); + node.onMsg(ctx, msg); + + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + Mockito.verify(ctx, never()).tellSuccess(any()); + Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture()); + + Assert.assertSame(newMsgCaptor.getValue(), msg); + Assert.assertNotNull(exceptionCaptor.getValue()); + } + + @Test + public void fetchToData_whenOnMsg_then_data_not_object_failure() throws Exception { + TbGetAttributesNode node = initNode(true, true, true); + TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), "[]"); + node.onMsg(ctx, msg); + + ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + Mockito.verify(ctx, never()).tellSuccess(any()); + Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture()); + + Assert.assertSame(newMsgCaptor.getValue(), msg); + Assert.assertNotNull(exceptionCaptor.getValue()); + } + + private TbMsg checkMsg() { + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); + Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture()); + + TbMsg resultMsg = msgCaptor.getValue(); + Assert.assertNotNull(resultMsg); + Assert.assertNotNull(resultMsg.getMetaData()); + Assert.assertNotNull(resultMsg.getData()); + return resultMsg; + } + + private void checkAttributes(List attributes, String prefix, boolean fetchToData, TbMsgMetaData msgMetaData, JsonNode msgData) { + attributes.stream() + .filter(attribute -> !attribute.equals("unknown")) + .forEach(attribute -> { + String result; + if (fetchToData) { + result = msgData.get(prefix + attribute).asText(); + } else { + result = msgMetaData.getValue(prefix + attribute); + } + Assert.assertNotNull(result); + Assert.assertEquals(attribute + "_value", result); + }); + } + + private void checkTs(List tsKeys, boolean fetchToData, boolean getLatestValueWithTs, TbMsgMetaData msgMetaData, JsonNode msgData) { + long tsValue = 1L; + for (String key : tsKeys) { + if (key.equals("unknown")) { + continue; + } + String result; + if (fetchToData) { + if (getLatestValueWithTs) { + JsonNode resultTs = msgData.get(key); + Assert.assertNotNull(resultTs); + Assert.assertNotNull(resultTs.get("value")); + Assert.assertNotNull(resultTs.get("ts")); + result = resultTs.get("value").asText(); + } else { + result = msgData.get(key).asText(); + } + } else { + result = msgMetaData.getValue(key); + } + Assert.assertNotNull(result); + Assert.assertEquals(String.valueOf(tsValue), result); + tsValue++; + } + } + + private TbGetAttributesNode initNode(boolean fetchToData, boolean getLatestValueWithTs, boolean isTellFailureIfAbsent) throws TbNodeException { + TbGetAttributesNodeConfiguration config = new TbGetAttributesNodeConfiguration(); + config.setClientAttributeNames(List.of("client_attr_1", "client_attr_2", "${client_attr_metadata}", "unknown")); + config.setServerAttributeNames(List.of("server_attr_1", "server_attr_2", "${server_attr_metadata}", "unknown")); + config.setSharedAttributeNames(List.of("shared_attr_1", "shared_attr_2", "$[shared_attr_data]", "unknown")); + config.setLatestTsKeyNames(List.of("temperature", "humidity", "unknown")); + config.setFetchToData(fetchToData); + config.setGetLatestValueWithTs(getLatestValueWithTs); + config.setTellFailureIfAbsent(isTellFailureIfAbsent); + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + TbGetAttributesNode node = new TbGetAttributesNode(); + node.init(ctx, nodeConfiguration); + return node; + } + + private TbMsg getTbMsg(EntityId entityId) { + ObjectNode msgData = JacksonUtil.newObjectNode(); + msgData.put("shared_attr_data", "shared_attr_3"); + + TbMsgMetaData msgMetaData = new TbMsgMetaData(); + msgMetaData.putValue("client_attr_metadata", "client_attr_3"); + msgMetaData.putValue("server_attr_metadata", "server_attr_3"); + + return TbMsg.newMsg("TEST", entityId, msgMetaData, msgData.toString()); + } + + private List getAttributeNames(String prefix) { + return List.of(prefix + "_attr_1", prefix + "_attr_2", prefix + "_attr_3", "unknown"); + } + + private List getListAttributeKvEntry(List attributes) { + List attributeKvEntries = new ArrayList<>(); + attributes.stream().filter(attribute -> !attribute.equals("unknown")).forEach(attribute -> attributeKvEntries.add(new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(attribute, attribute + "_value")))); + return attributeKvEntries; + } + + private List getListTelemetryKvEntry(List keys) { + long value = 1L; + List kvEntries = new ArrayList<>(); + for (String key : keys) { + if (key.equals("unknown")) { + continue; + } + kvEntries.add(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value))); + value++; + } + return kvEntries; + } + +} \ No newline at end of file