Merge pull request #9105 from smatvienko-tb/feature/attribute_node_save_only_changed

[3.5.2] 'Update on value change' for save attributes rule node feature
This commit is contained in:
Andrew Shvayka 2023-09-06 12:44:08 +03:00 committed by GitHub
commit 0f2ba5097e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 264 additions and 8 deletions

View File

@ -49,8 +49,11 @@
"name": "Save Client Attributes", "name": "Save Client Attributes",
"debugMode": false, "debugMode": false,
"configuration": { "configuration": {
"version": 1,
"scope": "CLIENT_SCOPE", "scope": "CLIENT_SCOPE",
"notifyDevice": "false" "notifyDevice": "false",
"sendAttributesUpdatedNotification": "false",
"updateAttributesOnlyOnValueChange": "true"
}, },
"externalId": null "externalId": null
}, },

View File

@ -33,7 +33,11 @@
"name": "Save Client Attributes", "name": "Save Client Attributes",
"debugMode": false, "debugMode": false,
"configuration": { "configuration": {
"scope": "CLIENT_SCOPE" "version": 1,
"scope": "CLIENT_SCOPE",
"notifyDevice": "false",
"sendAttributesUpdatedNotification": "false",
"updateAttributesOnlyOnValueChange": "true"
} }
}, },
{ {

View File

@ -32,8 +32,11 @@
"name": "Save Client Attributes", "name": "Save Client Attributes",
"debugMode": false, "debugMode": false,
"configuration": { "configuration": {
"version": 1,
"scope": "CLIENT_SCOPE", "scope": "CLIENT_SCOPE",
"notifyDevice": "false" "notifyDevice": "false",
"sendAttributesUpdatedNotification": "false",
"updateAttributesOnlyOnValueChange": "true"
} }
}, },
{ {

View File

@ -15,22 +15,33 @@
*/ */
package org.thingsboard.rule.engine.telemetry; package org.thingsboard.rule.engine.telemetry;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbVersionedNode;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY;
@ -42,17 +53,20 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R
type = ComponentType.ACTION, type = ComponentType.ACTION,
name = "save attributes", name = "save attributes",
configClazz = TbMsgAttributesNodeConfiguration.class, configClazz = TbMsgAttributesNodeConfiguration.class,
version = 1,
nodeDescription = "Saves attributes data", nodeDescription = "Saves attributes data",
nodeDetails = "Saves entity attributes based on configurable scope parameter. Expects messages with 'POST_ATTRIBUTES_REQUEST' message type. " + nodeDetails = "Saves entity attributes based on configurable scope parameter. Expects messages with 'POST_ATTRIBUTES_REQUEST' message type. " +
"If upsert(update/insert) operation is completed successfully rule node will send the incoming message via <b>Success</b> chain, otherwise, <b>Failure</b> chain is used. " + "If upsert(update/insert) operation is completed successfully rule node will send the incoming message via <b>Success</b> chain, otherwise, <b>Failure</b> chain is used. " +
"Additionally if checkbox <b>Send attributes updated notification</b> is set to true, rule node will put the \"Attributes Updated\" " + "Additionally if checkbox <b>Send attributes updated notification</b> is set to true, rule node will put the \"Attributes Updated\" " +
"event for <b>SHARED_SCOPE</b> and <b>SERVER_SCOPE</b> attributes updates to the corresponding rule engine queue.", "event for <b>SHARED_SCOPE</b> and <b>SERVER_SCOPE</b> attributes updates to the corresponding rule engine queue." +
"Performance checkbox 'Save attributes only if the value changes' will skip attributes overwrites for values with no changes (avoid concurrent writes because this check is not transactional; will not update 'Last updated time' for skipped attributes).",
uiResources = {"static/rulenode/rulenode-core-config.js"}, uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeAttributesConfig", configDirective = "tbActionNodeAttributesConfig",
icon = "file_upload" icon = "file_upload"
) )
public class TbMsgAttributesNode implements TbNode { public class TbMsgAttributesNode implements TbVersionedNode {
static final String UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY = "updateAttributesOnlyOnValueChange";
private TbMsgAttributesNodeConfiguration config; private TbMsgAttributesNodeConfiguration config;
@Override @Override
@ -70,13 +84,36 @@ public class TbMsgAttributesNode implements TbNode {
return; return;
} }
String src = msg.getData(); String src = msg.getData();
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); List<AttributeKvEntry> newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src)));
if (attributes.isEmpty()) { if (newAttributes.isEmpty()) {
ctx.tellSuccess(msg); ctx.tellSuccess(msg);
return; return;
} }
String scope = getScope(msg.getMetaData().getValue(SCOPE)); String scope = getScope(msg.getMetaData().getValue(SCOPE));
boolean sendAttributesUpdateNotification = checkSendNotification(scope); boolean sendAttributesUpdateNotification = checkSendNotification(scope);
if (!config.isUpdateAttributesOnlyOnValueChange()) {
saveAttr(newAttributes, ctx, msg, scope, sendAttributesUpdateNotification);
return;
}
List<String> keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
ListenableFuture<List<AttributeKvEntry>> findFuture = ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, keys);
DonAsynchron.withCallback(findFuture,
currentAttributes -> {
List<AttributeKvEntry> attributesChanged = filterChangedAttr(currentAttributes, newAttributes);
saveAttr(attributesChanged, ctx, msg, scope, sendAttributesUpdateNotification);
},
throwable -> ctx.tellFailure(msg, throwable),
MoreExecutors.directExecutor());
}
void saveAttr(List<AttributeKvEntry> attributes, TbContext ctx, TbMsg msg, String scope, boolean sendAttributesUpdateNotification) {
if (attributes.isEmpty()) {
ctx.tellSuccess(msg);
return;
}
ctx.getTelemetryService().saveAndNotify( ctx.getTelemetryService().saveAndNotify(
ctx.getTenantId(), ctx.getTenantId(),
msg.getOriginator(), msg.getOriginator(),
@ -89,6 +126,24 @@ public class TbMsgAttributesNode implements TbNode {
); );
} }
List<AttributeKvEntry> filterChangedAttr(List<AttributeKvEntry> currentAttributes, List<AttributeKvEntry> newAttributes) {
if (currentAttributes == null || currentAttributes.isEmpty()) {
return newAttributes;
}
Map<String, AttributeKvEntry> currentAttrMap = currentAttributes.stream()
.collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing));
return newAttributes.stream()
.filter(item -> {
AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey());
return cacheAttr == null
|| !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type
|| !Objects.equals(item.getDataType(), cacheAttr.getDataType());
})
.collect(Collectors.toList());
}
private boolean checkSendNotification(String scope) { private boolean checkSendNotification(String scope) {
return config.isSendAttributesUpdatedNotification() && !CLIENT_SCOPE.equals(scope); return config.isSendAttributesUpdatedNotification() && !CLIENT_SCOPE.equals(scope);
} }
@ -104,4 +159,20 @@ public class TbMsgAttributesNode implements TbNode {
return config.getScope(); return config.getScope();
} }
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (!oldConfiguration.has(UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY)) {
hasChanges = true;
((ObjectNode) oldConfiguration).put(UPDATE_ATTRIBUTES_ONLY_ON_VALUE_CHANGE_KEY, false);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
} }

View File

@ -26,6 +26,7 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration<TbMsg
private Boolean notifyDevice; private Boolean notifyDevice;
private boolean sendAttributesUpdatedNotification; private boolean sendAttributesUpdatedNotification;
private boolean updateAttributesOnlyOnValueChange;
@Override @Override
public TbMsgAttributesNodeConfiguration defaultConfiguration() { public TbMsgAttributesNodeConfiguration defaultConfiguration() {
@ -33,6 +34,8 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration<TbMsg
configuration.setScope(DataConstants.SERVER_SCOPE); configuration.setScope(DataConstants.SERVER_SCOPE);
configuration.setNotifyDevice(false); configuration.setNotifyDevice(false);
configuration.setSendAttributesUpdatedNotification(false); configuration.setSendAttributesUpdatedNotification(false);
//Since version 1. For an existing rule nodes for version 0. See the TbVersionedNode implementation
configuration.setUpdateAttributesOnlyOnValueChange(true);
return configuration; return configuration;
} }
} }

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class TbMsgAttributesNodeConfigurationTest {
@Test
void testDefaultConfig_givenupdateAttributesOnlyOnValueChange_thenTrue_sinceVersion1() {
assertThat(new TbMsgAttributesNodeConfiguration().defaultConfiguration().isUpdateAttributesOnlyOnValueChange()).isTrue();
}
}

View File

@ -0,0 +1,143 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.util.TbPair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.BDDMockito.willCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@Slf4j
class TbMsgAttributesNodeTest {
final String updateAttributesOnlyOnValueChangeKey = "updateAttributesOnlyOnValueChange";
@Test
void testFilterChangedAttr_whenCurrentAttributesEmpty_thenReturnNewAttributes() {
TbMsgAttributesNode node = spy(TbMsgAttributesNode.class);
List<AttributeKvEntry> newAttributes = new ArrayList<>();
List<AttributeKvEntry> filtered = node.filterChangedAttr(Collections.emptyList(), newAttributes);
assertThat(filtered).isSameAs(newAttributes);
}
@Test
void testFilterChangedAttr_whenCurrentAttributesContainsInAnyOrderNewAttributes_thenReturnEmptyList() {
TbMsgAttributesNode node = spy(TbMsgAttributesNode.class);
List<AttributeKvEntry> currentAttributes = List.of(
new BaseAttributeKvEntry(1694000000L, new StringDataEntry("address", "Peremohy ave 1")),
new BaseAttributeKvEntry(1694000000L, new BooleanDataEntry("valid", true)),
new BaseAttributeKvEntry(1694000000L, new LongDataEntry("counter", 100L)),
new BaseAttributeKvEntry(1694000000L, new DoubleDataEntry("temp", -18.35)),
new BaseAttributeKvEntry(1694000000L, new JsonDataEntry("json", "{\"warning\":\"out of paper\"}"))
);
List<AttributeKvEntry> newAttributes = new ArrayList<>(currentAttributes);
newAttributes.add(newAttributes.get(0));
newAttributes.remove(0);
assertThat(newAttributes).hasSize(currentAttributes.size());
assertThat(currentAttributes).isNotEmpty();
assertThat(newAttributes).containsExactlyInAnyOrderElementsOf(currentAttributes);
List<AttributeKvEntry> filtered = node.filterChangedAttr(currentAttributes, newAttributes);
assertThat(filtered).isEmpty(); //no changes
}
@Test
void testFilterChangedAttr_whenCurrentAttributesContainsInAnyOrderNewAttributes_thenReturnExpectedList() {
TbMsgAttributesNode node = spy(TbMsgAttributesNode.class);
List<AttributeKvEntry> currentAttributes = List.of(
new BaseAttributeKvEntry(1694000000L, new StringDataEntry("address", "Peremohy ave 1")),
new BaseAttributeKvEntry(1694000000L, new BooleanDataEntry("valid", true)),
new BaseAttributeKvEntry(1694000000L, new LongDataEntry("counter", 100L)),
new BaseAttributeKvEntry(1694000000L, new DoubleDataEntry("temp", -18.35)),
new BaseAttributeKvEntry(1694000000L, new JsonDataEntry("json", "{\"warning\":\"out of paper\"}"))
);
List<AttributeKvEntry> newAttributes = List.of(
new BaseAttributeKvEntry(1694000999L, new JsonDataEntry("json", "{\"status\":\"OK\"}")), // value changed, reordered
new BaseAttributeKvEntry(1694000999L, new StringDataEntry("valid", "true")), //type changed
new BaseAttributeKvEntry(1694000999L, new LongDataEntry("counter", 101L)), //value changed
new BaseAttributeKvEntry(1694000999L, new DoubleDataEntry("temp", -18.35)),
new BaseAttributeKvEntry(1694000999L, new StringDataEntry("address", "Peremohy ave 1")) // reordered
);
List<AttributeKvEntry> expected = List.of(
new BaseAttributeKvEntry(1694000999L, new StringDataEntry("valid", "true")),
new BaseAttributeKvEntry(1694000999L, new LongDataEntry("counter", 101L)),
new BaseAttributeKvEntry(1694000999L, new JsonDataEntry("json", "{\"status\":\"OK\"}"))
);
List<AttributeKvEntry> filtered = node.filterChangedAttr(currentAttributes, newAttributes);
assertThat(filtered).containsExactlyInAnyOrderElementsOf(expected);
}
@Test
void testUpgrade_fromVersion0() throws TbNodeException {
TbMsgAttributesNode node = mock(TbMsgAttributesNode.class);
willCallRealMethod().given(node).upgrade(anyInt(), any());
ObjectNode jsonNode = (ObjectNode) JacksonUtil.valueToTree(new TbMsgAttributesNodeConfiguration().defaultConfiguration());
jsonNode.remove(updateAttributesOnlyOnValueChangeKey);
assertThat(jsonNode.has(updateAttributesOnlyOnValueChangeKey)).as("pre condition has no " + updateAttributesOnlyOnValueChangeKey).isFalse();
TbPair<Boolean, JsonNode> upgradeResult = node.upgrade(0, jsonNode);
ObjectNode resultNode = (ObjectNode) upgradeResult.getSecond();
assertThat(upgradeResult.getFirst()).as("upgrade result has changes").isTrue();
assertThat(resultNode.has(updateAttributesOnlyOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnlyOnValueChangeKey).isTrue();
assertThat(resultNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("upgrade result value [false] for key " + updateAttributesOnlyOnValueChangeKey).isFalse();
}
@Test
void testUpgrade_fromVersion0_alreadyHasupdateAttributesOnlyOnValueChange() throws TbNodeException {
TbMsgAttributesNode node = mock(TbMsgAttributesNode.class);
willCallRealMethod().given(node).upgrade(anyInt(), any());
ObjectNode jsonNode = (ObjectNode) JacksonUtil.valueToTree(new TbMsgAttributesNodeConfiguration().defaultConfiguration());
jsonNode.remove(updateAttributesOnlyOnValueChangeKey);
jsonNode.put(updateAttributesOnlyOnValueChangeKey, true);
assertThat(jsonNode.has(updateAttributesOnlyOnValueChangeKey)).as("pre condition has no " + updateAttributesOnlyOnValueChangeKey).isTrue();
assertThat(jsonNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("pre condition has [true] for key " + updateAttributesOnlyOnValueChangeKey).isTrue();
TbPair<Boolean, JsonNode> upgradeResult = node.upgrade(0, jsonNode);
ObjectNode resultNode = (ObjectNode) upgradeResult.getSecond();
assertThat(upgradeResult.getFirst()).as("upgrade result has changes").isFalse();
assertThat(resultNode.has(updateAttributesOnlyOnValueChangeKey)).as("upgrade result has key " + updateAttributesOnlyOnValueChangeKey).isTrue();
assertThat(resultNode.get(updateAttributesOnlyOnValueChangeKey).asBoolean()).as("upgrade result value [true] for key " + updateAttributesOnlyOnValueChangeKey).isTrue();
}
}