diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 04971abae2..c63af3262d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -18,9 +18,9 @@ package org.thingsboard.rule.engine.telemetry; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.util.Pair; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -36,7 +36,9 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; import java.util.ArrayList; import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; @@ -53,7 +55,8 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_R nodeDetails = "Saves entity attributes based on configurable scope parameter. Expects messages with 'POST_ATTRIBUTES_REQUEST' message type. " + "If upsert(update/insert) operation is completed successfully rule node will send the incoming message via Success chain, otherwise, Failure chain is used. " + "Additionally if checkbox Send attributes updated notification is set to true, rule node will put the \"Attributes Updated\" " + - "event for SHARED_SCOPE and SERVER_SCOPE attributes updates to the corresponding rule engine queue.", + "event for SHARED_SCOPE and SERVER_SCOPE attributes updates to the corresponding rule engine queue." + + "Performance checkbox 'Update Attributes On Value Change' will skip attributes overwrites for values with no changes (avoid concurrent writes because this check is not transactional; will not update 'Last updated time' for skipped attributes).", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeAttributesConfig", icon = "file_upload" @@ -84,49 +87,62 @@ public class TbMsgAttributesNode implements TbNode { } String scope = getScope(msg.getMetaData().getValue(SCOPE)); boolean sendAttributesUpdateNotification = checkSendNotification(scope); - ListenableFuture> findFuture; - if (config.isUpdateAttributesOnValueChange()) { - List keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); - findFuture = ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, keys); - } else { - findFuture = Futures.immediateFuture(null); + + if (!config.isUpdateAttributesOnValueChange()) { + saveAttr(newAttributes, ctx, msg, scope, sendAttributesUpdateNotification); + return; } + + List keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); + ListenableFuture> findFuture = ctx.getAttributesService().find(ctx.getTenantId(), msg.getOriginator(), scope, keys); + Futures.addCallback(findFuture, new FutureCallback<>() { @Override public void onSuccess(List currentAttributes) { - List attributes = newAttributes; - if (config.isUpdateAttributesOnValueChange() - && currentAttributes != null - && !currentAttributes.isEmpty()) { - Set> currentKeyValuePairs = currentAttributes.stream() - .map(item -> Pair.of(item.getKey(), item.getValue())) - .collect(Collectors.toSet()); - attributes = attributes.stream() - .filter(item -> !currentKeyValuePairs.contains(Pair.of(item.getKey(), item.getValue()))) - .collect(Collectors.toList()); - } - if (attributes.isEmpty()) { - ctx.tellSuccess(msg); - } else { - ctx.getTelemetryService().saveAndNotify( - ctx.getTenantId(), - msg.getOriginator(), - scope, - attributes, - checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)), - sendAttributesUpdateNotification ? - new AttributesUpdateNodeCallback(ctx, msg, scope, attributes) : - new TelemetryNodeCallback(ctx, msg) - ); - } + List attributesChanged = filterChangedAttr(currentAttributes, newAttributes); + saveAttr(attributesChanged, ctx, msg, scope, sendAttributesUpdateNotification); } @Override public void onFailure(Throwable throwable) { ctx.tellFailure(msg, throwable); } - }, ctx.getDbCallbackExecutor()); + }, MoreExecutors.directExecutor()); + } + void saveAttr(List attributes, TbContext ctx, TbMsg msg, String scope, boolean sendAttributesUpdateNotification) { + if (attributes.isEmpty()) { + ctx.tellSuccess(msg); + return; + } + ctx.getTelemetryService().saveAndNotify( + ctx.getTenantId(), + msg.getOriginator(), + scope, + attributes, + checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)), + sendAttributesUpdateNotification ? + new AttributesUpdateNodeCallback(ctx, msg, scope, attributes) : + new TelemetryNodeCallback(ctx, msg) + ); + } + + List filterChangedAttr(List currentAttributes, List newAttributes) { + if (currentAttributes == null || currentAttributes.isEmpty()) { + return newAttributes; + } + + Map currentAttrMap = currentAttributes.stream() + .collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing)); + + return newAttributes.stream() + .filter(item -> { + AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey()); + return cacheAttr == null + || !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type + || !Objects.equals(item.getDataType(), cacheAttr.getDataType()); + }) + .collect(Collectors.toList()); } private boolean checkSendNotification(String scope) {