diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 4498b51c6c..6f6c4dda85 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -661,10 +661,8 @@ public class TelemetryController extends BaseController { logAttributesDeleted(user, entityId, scope, keys, null); if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) { DeviceId deviceId = new DeviceId(entityId.getId()); - Set keysToNotify = new HashSet<>(); - keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key))); tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( - user.getTenantId(), deviceId, keysToNotify), null); + user.getTenantId(), deviceId, scope, keys), null); } result.setResult(new ResponseEntity<>(HttpStatus.OK)); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 722b33b426..f4d713b40f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -287,12 +287,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { List attributeNames = attributeDeleteMsg.getAttributeNamesList(); attributesService.removeAll(tenantId, entityId, scope, attributeNames); if (EntityType.DEVICE.name().equals(entityType)) { - Set attributeKeys = new HashSet<>(); - for (String attributeName : attributeNames) { - attributeKeys.add(new AttributeKey(scope, attributeName)); - } tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( - tenantId, (DeviceId) entityId, attributeKeys), new TbQueueCallback() { + tenantId, (DeviceId) entityId, scope, attributeNames), new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { futureToSet.set(null); diff --git a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java index 6a4cca181f..b4a469102f 100644 --- a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java @@ -360,9 +360,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { @Override public void onSuccess(@Nullable Void tmp) { log.trace("[{}] Success remove target {} attributes!", device.getId(), otaPackageType); - Set keysToNotify = new HashSet<>(); - attributesKeys.forEach(key -> keysToNotify.add(new AttributeKey(DataConstants.SHARED_SCOPE, key))); - tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), device.getId(), keysToNotify), null); + tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), device.getId(), DataConstants.SHARED_SCOPE, attributesKeys), null); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index d0af49b1e0..931acf908c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -490,7 +490,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService keys, TbCallback callback) { + public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, TbCallback callback) { onLocalTelemetrySubUpdate(entityId, s -> { if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { @@ -349,7 +349,13 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene return subscriptionUpdate; }, false); if (entityId.getEntityType() == EntityType.DEVICE) { - deleteDeviceInactivityTimeout(tenantId, entityId, keys); + if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope) + || TbAttributeSubscriptionScope.ANY_SCOPE.name().equalsIgnoreCase(scope)) { + deleteDeviceInactivityTimeout(tenantId, entityId, keys); + } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) { + clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(tenantId, + new DeviceId(entityId.getId()), scope, keys), null); + } } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java index 37c6181910..15aab86327 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java @@ -38,7 +38,7 @@ public interface SubscriptionManagerService extends ApplicationListener attributes, boolean notifyDevice, TbCallback callback); - void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List keys, TbCallback empty); + void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, TbCallback empty); void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index d8642f8f9a..f8359a21ee 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -236,7 +236,7 @@ public class TbSubscriptionUtils { return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); } - public static ToCoreMsg toAttributesDeleteProto(TenantId tenantId, EntityId entityId, String scope, List keys) { + public static ToCoreMsg toAttributesDeleteProto(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice) { TbAttributeDeleteProto.Builder builder = TbAttributeDeleteProto.newBuilder(); builder.setEntityType(entityId.getEntityType().name()); builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); @@ -245,6 +245,7 @@ public class TbSubscriptionUtils { builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); builder.setScope(scope); builder.addAllKeys(keys); + builder.setNotifyDevice(notifyDevice); SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder(); msgBuilder.setAttrDelete(builder); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index c8c76bb8f4..667ad5b881 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -271,14 +271,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List keys, FutureCallback callback) { checkInternalEntity(entityId); - deleteAndNotifyInternal(tenantId, entityId, scope, keys, callback); + deleteAndNotifyInternal(tenantId, entityId, scope, keys, false, callback); } @Override - public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List keys, FutureCallback callback) { + public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, FutureCallback callback) { + checkInternalEntity(entityId); + deleteAndNotifyInternal(tenantId, entityId, scope, keys, notifyDevice, callback); + } + + @Override + public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, FutureCallback callback) { ListenableFuture> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys); addVoidCallback(deleteFuture, callback); - addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys)); + addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys, notifyDevice)); } @Override @@ -382,16 +388,16 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } - private void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List keys) { + private void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (currentPartitions.contains(tpi)) { if (subscriptionManagerService.isPresent()) { - subscriptionManagerService.get().onAttributesDelete(tenantId, entityId, scope, keys, TbCallback.EMPTY); + subscriptionManagerService.get().onAttributesDelete(tenantId, entityId, scope, keys, notifyDevice, TbCallback.EMPTY); } else { log.warn("Possible misconfiguration because subscriptionManagerService is null!"); } } else { - TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesDeleteProto(tenantId, entityId, scope, keys); + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesDeleteProto(tenantId, entityId, scope, keys, notifyDevice); clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null); } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java index 035e262a53..298d894460 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java @@ -37,7 +37,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService { void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback); - void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List keys, FutureCallback callback); + void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, FutureCallback callback); void deleteLatestInternal(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index cb3443a71f..5a28d42546 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -600,6 +600,7 @@ message TbAttributeDeleteProto { int64 tenantIdLSB = 5; string scope = 6; repeated string keys = 7; + bool notifyDevice = 8; } message TbTimeSeriesDeleteProto { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 69c36697d3..970fd6177e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -28,6 +28,7 @@ public class DataConstants { public static final String CLIENT_SCOPE = "CLIENT_SCOPE"; public static final String SERVER_SCOPE = "SERVER_SCOPE"; public static final String SHARED_SCOPE = "SHARED_SCOPE"; + public static final String NOTIFY_DEVICE_METADATA_KEY = "notifyDevice"; public static final String LATEST_TS = "LATEST_TS"; public static final String IS_NEW_ALARM = "isNewAlarm"; public static final String IS_EXISTING_ALARM = "isExistingAlarm"; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index fac988ed8b..bf03122b18 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -32,6 +32,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; @@ -585,7 +586,7 @@ public class DefaultTransportService implements TransportService { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); - metaData.putValue("notifyDevice", "false"); + metaData.putValue(DataConstants.NOTIFY_DEVICE_METADATA_KEY, "false"); CustomerId customerId = getCustomerId(sessionInfo); sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback))); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index dbee45c230..7d47195d96 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -64,6 +64,8 @@ public interface RuleEngineTelemetryService { void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List keys, FutureCallback callback); + void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, FutureCallback callback); + void deleteLatest(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceAttributesEventNotificationMsg.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceAttributesEventNotificationMsg.java index e5e5a6a938..7da2695703 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceAttributesEventNotificationMsg.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceAttributesEventNotificationMsg.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -52,8 +53,10 @@ public class DeviceAttributesEventNotificationMsg implements ToDeviceActorNotifi return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, scope, values, false); } - public static DeviceAttributesEventNotificationMsg onDelete(TenantId tenantId, DeviceId deviceId, Set keys) { - return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, null, null, true); + public static DeviceAttributesEventNotificationMsg onDelete(TenantId tenantId, DeviceId deviceId, String scope, List keys) { + Set keysToNotify = new HashSet<>(); + keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key))); + return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keysToNotify, null, null, true); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 77a4492858..771f2a0d6d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.telemetry; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -34,6 +33,10 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; import java.util.ArrayList; import java.util.List; +import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; +import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; +import static org.thingsboard.server.common.data.DataConstants.SCOPE; + @Slf4j @RuleNode( type = ComponentType.ACTION, @@ -52,8 +55,6 @@ public class TbMsgAttributesNode implements TbNode { private TbMsgAttributesNodeConfiguration config; - private static final String SCOPE = "scope"; - @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgAttributesNodeConfiguration.class); @@ -74,26 +75,33 @@ public class TbMsgAttributesNode implements TbNode { ctx.tellSuccess(msg); return; } - String scope = msg.getMetaData().getValue(SCOPE); - if (StringUtils.isEmpty(scope)) { - scope = config.getScope(); - } - String notifyDeviceStr = msg.getMetaData().getValue("notifyDevice"); + String scope = getScope(msg.getMetaData().getValue(SCOPE)); boolean sendAttributesUpdateNotification = checkSendNotification(scope); ctx.getTelemetryService().saveAndNotify( ctx.getTenantId(), msg.getOriginator(), scope, attributes, - config.getNotifyDevice() || StringUtils.isEmpty(notifyDeviceStr) || Boolean.parseBoolean(notifyDeviceStr), + checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)), sendAttributesUpdateNotification ? - new AttributesUpdateNodeCallback(ctx, msg, config.getScope(), attributes) : + new AttributesUpdateNodeCallback(ctx, msg, scope, attributes) : new TelemetryNodeCallback(ctx, msg) ); } - private boolean checkSendNotification(String scope){ - return config.isSendAttributesUpdatedNotification() && !DataConstants.CLIENT_SCOPE.equals(scope); + private boolean checkSendNotification(String scope) { + return config.isSendAttributesUpdatedNotification() && !CLIENT_SCOPE.equals(scope); + } + + private boolean checkNotifyDevice(String notifyDeviceMdValue) { + return config.getNotifyDevice() || StringUtils.isEmpty(notifyDeviceMdValue) || Boolean.parseBoolean(notifyDeviceMdValue); + } + + private String getScope(String mdScopeValue) { + if (StringUtils.isNotEmpty(mdScopeValue)) { + return mdScopeValue; + } + return config.getScope(); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributes.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java similarity index 76% rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributes.java rename to rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java index aad7616470..5a623bd1b9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributes.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java @@ -30,11 +30,15 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; +import static org.thingsboard.server.common.data.DataConstants.SCOPE; +import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; + @Slf4j @RuleNode( type = ComponentType.ACTION, name = "delete attributes", - configClazz = TbMsgDeleteAttributesConfiguration.class, + configClazz = TbMsgDeleteAttributesNodeConfiguration.class, nodeDescription = "Delete attributes for Message Originator.", nodeDetails = "Attempt to remove attributes by selected keys. If msg originator doesn't have an attribute with " + " a key selected in the configuration, it will be ignored. If delete operation is completed successfully, " + @@ -44,16 +48,14 @@ import java.util.stream.Collectors; configDirective = "tbActionNodeDeleteAttributesConfig", icon = "remove_circle" ) -public class TbMsgDeleteAttributes implements TbNode { +public class TbMsgDeleteAttributesNode implements TbNode { - private TbMsgDeleteAttributesConfiguration config; - private String scope; + private TbMsgDeleteAttributesNodeConfiguration config; private List keys; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { - this.config = TbNodeUtils.convert(configuration, TbMsgDeleteAttributesConfiguration.class); - this.scope = config.getScope(); + this.config = TbNodeUtils.convert(configuration, TbMsgDeleteAttributesNodeConfiguration.class); this.keys = config.getKeys(); } @@ -67,15 +69,29 @@ public class TbMsgDeleteAttributes implements TbNode { if (keysToDelete.isEmpty()) { ctx.tellSuccess(msg); } else { + String scope = getScope(msg.getMetaData().getValue(SCOPE)); ctx.getTelemetryService().deleteAndNotify( ctx.getTenantId(), msg.getOriginator(), scope, keysToDelete, + checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope), config.isSendAttributesDeletedNotification() ? new AttributesDeleteNodeCallback(ctx, msg, scope, keysToDelete) : new TelemetryNodeCallback(ctx, msg) ); } } + + private String getScope(String mdScopeValue) { + if (StringUtils.isNotEmpty(mdScopeValue)) { + return mdScopeValue; + } + return config.getScope(); + } + + private boolean checkNotifyDevice(String notifyDeviceMdValue, String scope) { + return SHARED_SCOPE.equals(scope) && (config.isNotifyDevice() || Boolean.parseBoolean(notifyDeviceMdValue)); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeConfiguration.java similarity index 75% rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesConfiguration.java rename to rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeConfiguration.java index a33bcda3f2..02e084c5a2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeConfiguration.java @@ -23,18 +23,20 @@ import java.util.Collections; import java.util.List; @Data -public class TbMsgDeleteAttributesConfiguration implements NodeConfiguration { +public class TbMsgDeleteAttributesNodeConfiguration implements NodeConfiguration { private String scope; private List keys; private boolean sendAttributesDeletedNotification; + private boolean notifyDevice; @Override - public TbMsgDeleteAttributesConfiguration defaultConfiguration() { - TbMsgDeleteAttributesConfiguration configuration = new TbMsgDeleteAttributesConfiguration(); + public TbMsgDeleteAttributesNodeConfiguration defaultConfiguration() { + TbMsgDeleteAttributesNodeConfiguration configuration = new TbMsgDeleteAttributesNodeConfiguration(); configuration.setScope(DataConstants.SERVER_SCOPE); configuration.setKeys(Collections.emptyList()); configuration.setSendAttributesDeletedNotification(false); + configuration.setNotifyDevice(false); return configuration; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java similarity index 67% rename from rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesTest.java rename to rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java index a329f890af..e641e0ded4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java @@ -20,15 +20,11 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.junit.MockitoJUnitRunner; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -42,8 +38,10 @@ import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.mock; @@ -51,14 +49,18 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; +import static org.thingsboard.server.common.data.DataConstants.SCOPE; +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; +import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; @Slf4j -public class TbMsgDeleteAttributesTest { +public class TbMsgDeleteAttributesNodeTest { final ObjectMapper mapper = new ObjectMapper(); DeviceId deviceId; - TbMsgDeleteAttributes node; - TbMsgDeleteAttributesConfiguration config; + TbMsgDeleteAttributesNode node; + TbMsgDeleteAttributesNodeConfiguration config; TbNodeConfiguration nodeConfiguration; TbContext ctx; TbMsgCallback callback; @@ -70,20 +72,20 @@ public class TbMsgDeleteAttributesTest { deviceId = new DeviceId(UUID.randomUUID()); callback = mock(TbMsgCallback.class); ctx = mock(TbContext.class); - config = new TbMsgDeleteAttributesConfiguration().defaultConfiguration(); + config = new TbMsgDeleteAttributesNodeConfiguration().defaultConfiguration(); config.setKeys(List.of("${TestAttribute_1}", "$[TestAttribute_2]", "$[TestAttribute_3]", "TestAttribute_4")); nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); - node = spy(new TbMsgDeleteAttributes()); + node = spy(new TbMsgDeleteAttributesNode()); node.init(ctx, nodeConfiguration); telemetryService = mock(RuleEngineTelemetryService.class); willReturn(telemetryService).given(ctx).getTelemetryService(); willAnswer(invocation -> { - TelemetryNodeCallback callBack = invocation.getArgument(4); + TelemetryNodeCallback callBack = invocation.getArgument(5); callBack.onSuccess(null); return null; }).given(telemetryService).deleteAndNotify( - any(), any(), anyString(), anyList(), any()); + any(), any(), anyString(), anyList(), anyBoolean(), any()); } @AfterEach @@ -93,30 +95,51 @@ public class TbMsgDeleteAttributesTest { @Test void givenDefaultConfig_whenVerify_thenOK() { - TbMsgDeleteAttributesConfiguration defaultConfig = new TbMsgDeleteAttributesConfiguration().defaultConfiguration(); - assertThat(defaultConfig.getScope()).isEqualTo(DataConstants.SERVER_SCOPE); + TbMsgDeleteAttributesNodeConfiguration defaultConfig = new TbMsgDeleteAttributesNodeConfiguration().defaultConfiguration(); + assertThat(defaultConfig.getScope()).isEqualTo(SERVER_SCOPE); assertThat(defaultConfig.getKeys()).isEqualTo(Collections.emptyList()); } @Test - void givenMsg_whenOnMsg_thenVerifyOutput_NoSendAttributesDeletedNotification() throws Exception { - onMsg_thenVerifyOutput(false); + void givenMsg_whenOnMsg_thenVerifyOutput_NoSendAttributesDeletedNotification_NoNotifyDevice() throws Exception { + onMsg_thenVerifyOutput(false, false, false); } @Test - void givenMsg_whenOnMsg_thenVerifyOutput_SendAttributesDeletedNotification() throws Exception { + void givenMsg_whenOnMsg_thenVerifyOutput_SendAttributesDeletedNotification_NoNotifyDevice() throws Exception { config.setSendAttributesDeletedNotification(true); nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); node.init(ctx, nodeConfiguration); - onMsg_thenVerifyOutput(true); + onMsg_thenVerifyOutput(true, false, false); } - void onMsg_thenVerifyOutput(boolean sendAttributesDeletedNotification) throws Exception { + @Test + void givenMsg_whenOnMsg_thenVerifyOutput_SendAttributesDeletedNotification_NotifyDevice() throws Exception { + config.setSendAttributesDeletedNotification(true); + config.setNotifyDevice(true); + config.setScope(SHARED_SCOPE); + nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + node.init(ctx, nodeConfiguration); + onMsg_thenVerifyOutput(true, true, false); + } + + @Test + void givenMsg_whenOnMsg_thenVerifyOutput_NoSendAttributesDeletedNotification_NotifyDeviceMetadata() throws Exception { + nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config)); + node.init(ctx, nodeConfiguration); + onMsg_thenVerifyOutput(false, false, true); + } + + void onMsg_thenVerifyOutput(boolean sendAttributesDeletedNotification, boolean notifyDevice, boolean notifyDeviceMetadata) throws Exception { final Map mdMap = Map.of( "TestAttribute_1", "temperature", "city", "NY" ); - final TbMsgMetaData metaData = new TbMsgMetaData(mdMap); + TbMsgMetaData metaData = new TbMsgMetaData(mdMap); + if (notifyDeviceMetadata) { + metaData.putValue(NOTIFY_DEVICE_METADATA_KEY, "true"); + metaData.putValue(SCOPE, SHARED_SCOPE); + } final String data = "{\"TestAttribute_2\": \"humidity\", \"TestAttribute_3\": \"voltage\"}"; TbMsg msg = TbMsg.newMsg("POST_ATTRIBUTES_REQUEST", deviceId, metaData, data, callback); @@ -133,6 +156,6 @@ public class TbMsgDeleteAttributesTest { } verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture()); verify(ctx, never()).tellFailure(any(), any()); - verify(telemetryService, times(1)).deleteAndNotify(any(), any(), anyString(), anyList(), any()); + verify(telemetryService, times(1)).deleteAndNotify(any(), any(), anyString(), anyList(), eq(notifyDevice || notifyDeviceMetadata), any()); } }