From e41d69b1a8512e2c555fbc1a13a9d2300c8a0916 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 18 Sep 2020 12:41:10 +0300 Subject: [PATCH] shared Attributes update improvements (If the update of the shared attributes is originated by the user's REST API call - we push it to the device, if the update arrives from the device, we will not push it back to the device by default.) --- .../DefaultSubscriptionManagerService.java | 7 ++++++- .../subscription/SubscriptionManagerService.java | 2 ++ .../DefaultTelemetrySubscriptionService.java | 11 ++++++++--- .../transport/service/DefaultTransportService.java | 1 + .../rule/engine/api/RuleEngineTelemetryService.java | 2 ++ .../rule/engine/telemetry/TbMsgAttributesNode.java | 10 +++++++++- 6 files changed, 28 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index fc08ebfac1..f40133808b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -216,6 +216,11 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @Override public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, TbCallback callback) { + onAttributesUpdate(tenantId, entityId, scope, attributes, callback, true); + } + + @Override + public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, TbCallback callback, boolean notifyDevice) { onLocalSubUpdate(entityId, s -> { if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { @@ -244,7 +249,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer deviceStateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L)); } } - } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope)) { + } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) { clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) , null); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java index e20b707348..4d9454ccce 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java @@ -35,4 +35,6 @@ public interface SubscriptionManagerService extends ApplicationListener attributes, TbCallback callback); + void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, TbCallback callback, boolean notifyDevice); + } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 665a787e6d..390c6f19b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -128,9 +128,14 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @Override public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, FutureCallback callback) { + saveAndNotify(tenantId, entityId, scope, attributes, callback, true); + } + + @Override + public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, FutureCallback callback, boolean notifyDevice) { ListenableFuture> saveFuture = attrService.save(tenantId, entityId, scope, attributes); addMainCallback(saveFuture, callback); - addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes)); + addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice)); } @Override @@ -157,11 +162,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio , System.currentTimeMillis())), callback); } - private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes) { + private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (currentPartitions.contains(tpi)) { if (subscriptionManagerService.isPresent()) { - subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbCallback.EMPTY); + subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbCallback.EMPTY, notifyDevice); } else { log.warn("Possible misconfiguration because subscriptionManagerService is null!"); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 2ca27b9dfe..6fbc5d8e8e 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -298,6 +298,7 @@ public class DefaultTransportService implements TransportService { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); + metaData.putValue("notifyDevice", "false"); TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, metaData, gson.toJson(json)); sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback)); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index d2e19652a2..5b54370e49 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -36,6 +36,8 @@ public interface RuleEngineTelemetryService { void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, FutureCallback callback); + void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, FutureCallback callback, boolean notifyDevice); + void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback callback); void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value, FutureCallback callback); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 8d80e17f87..4e0c2b3492 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.telemetry; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -63,7 +64,14 @@ public class TbMsgAttributesNode implements TbNode { } String src = msg.getData(); Set attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)); - ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg)); + String notifyDeviceStr = msg.getMetaData().getValue("notifyDevice"); + ctx.getTelemetryService().saveAndNotify( + ctx.getTenantId(), + msg.getOriginator(), + config.getScope(), + new ArrayList<>(attributes), + new TelemetryNodeCallback(ctx, msg), + StringUtils.isEmpty(notifyDeviceStr) || !notifyDeviceStr.equals("false")); } @Override